linera_core/client/
mod.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    cmp::Ordering,
7    collections::{BTreeMap, BTreeSet, HashSet},
8    slice,
9    sync::{Arc, RwLock},
10};
11
12use custom_debug_derive::Debug;
13use futures::{
14    future::Future,
15    stream::{self, AbortHandle, FuturesUnordered, StreamExt},
16};
17#[cfg(with_metrics)]
18use linera_base::prometheus_util::MeasureLatency as _;
19use linera_base::{
20    crypto::{CryptoHash, Signer as _, ValidatorPublicKey},
21    data_types::{
22        ArithmeticError, Blob, BlockHeight, ChainDescription, Epoch, Round, TimeDelta, Timestamp,
23    },
24    ensure,
25    identifiers::{AccountOwner, BlobId, BlobType, ChainId, EventId, StreamId},
26    time::Duration,
27};
28#[cfg(not(target_arch = "wasm32"))]
29use linera_base::{data_types::Bytecode, identifiers::ModuleId, vm::VmRuntime};
30use linera_chain::{
31    data_types::{
32        BlockExecutionOutcome, BlockProposal, BundleExecutionPolicy, ChainAndHeight, LiteVote,
33        ProposedBlock,
34    },
35    manager::LockingBlock,
36    types::{
37        Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
38        LiteCertificate, ValidatedBlock, ValidatedBlockCertificate,
39    },
40    ChainError,
41};
42use linera_execution::committee::Committee;
43use linera_storage::{Clock as _, ResultReadCertificates, Storage as _};
44use rand::seq::SliceRandom;
45use received_log::ReceivedLogs;
46use serde::{Deserialize, Serialize};
47use tokio::sync::mpsc;
48use tracing::{debug, error, info, instrument, trace, warn};
49
50use crate::{
51    data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
52    environment::Environment,
53    local_node::{LocalNodeClient, LocalNodeError},
54    node::{CrossChainMessageDelivery, NodeError, ValidatorNode as _, ValidatorNodeProvider as _},
55    notifier::{ChannelNotifier, Notifier as _},
56    remote_node::RemoteNode,
57    updater::{communicate_with_quorum, CommunicateAction, ValidatorUpdater},
58    worker::{Notification, ProcessableCertificate, Reason, WorkerError, WorkerState},
59    ChainWorkerConfig, CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
60};
61
62pub mod chain_client;
63pub use chain_client::ChainClient;
64
65pub use crate::data_types::ClientOutcome;
66
67#[cfg(test)]
68#[path = "../unit_tests/client_tests.rs"]
69mod client_tests;
70pub mod requests_scheduler;
71
72pub use requests_scheduler::{RequestsScheduler, RequestsSchedulerConfig, ScoringWeights};
73mod received_log;
74mod validator_trackers;
75
76#[cfg(with_metrics)]
77mod metrics {
78    use std::sync::LazyLock;
79
80    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
81    use prometheus::HistogramVec;
82
83    pub static PROCESS_INBOX_WITHOUT_PREPARE_LATENCY: LazyLock<HistogramVec> =
84        LazyLock::new(|| {
85            register_histogram_vec(
86                "process_inbox_latency",
87                "process_inbox latency",
88                &[],
89                exponential_bucket_latencies(10_000.0),
90            )
91        });
92
93    pub static PREPARE_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
94        register_histogram_vec(
95            "prepare_chain_latency",
96            "prepare_chain latency",
97            &[],
98            exponential_bucket_latencies(10_000.0),
99        )
100    });
101
102    pub static SYNCHRONIZE_CHAIN_STATE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
103        register_histogram_vec(
104            "synchronize_chain_state_latency",
105            "synchronize_chain_state latency",
106            &[],
107            exponential_bucket_latencies(10_000.0),
108        )
109    });
110
111    pub static EXECUTE_BLOCK_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
112        register_histogram_vec(
113            "execute_block_latency",
114            "execute_block latency",
115            &[],
116            exponential_bucket_latencies(10_000.0),
117        )
118    });
119
120    pub static FIND_RECEIVED_CERTIFICATES_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
121        register_histogram_vec(
122            "find_received_certificates_latency",
123            "find_received_certificates latency",
124            &[],
125            exponential_bucket_latencies(10_000.0),
126        )
127    });
128}
129
130pub static DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE: u64 = 500;
131pub static DEFAULT_CERTIFICATE_UPLOAD_BATCH_SIZE: u64 = 500;
132pub static DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE: usize = 20_000;
133pub static DEFAULT_MAX_EVENT_STREAM_QUERIES: usize = 1000;
134
135#[derive(Debug, Clone, Copy)]
136pub enum TimingType {
137    ExecuteOperations,
138    ExecuteBlock,
139    SubmitBlockProposal,
140    UpdateValidators,
141}
142
143/// Defines what type of notifications we should process for a chain:
144/// - do we fully participate in consensus and download sender chains?
145/// - or do we only follow the chain's blocks without participating?
146/// - or do we only care about blocks containing events from some particular streams?
147#[derive(Debug, Clone, PartialEq, Eq)]
148pub enum ListeningMode {
149    /// Listen to everything: all blocks for the chain and all blocks from sender chains,
150    /// and participate in rounds.
151    FullChain,
152    /// Listen to all blocks for the chain, but don't download sender chain blocks or participate
153    /// in rounds. Use this when interested in the chain's state but not intending to propose
154    /// blocks (e.g., because we're not a chain owner).
155    FollowChain,
156    /// Only listen to blocks which contain events from those streams.
157    EventsOnly(BTreeSet<StreamId>),
158}
159
160impl PartialOrd for ListeningMode {
161    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
162        match (self, other) {
163            (ListeningMode::FullChain, ListeningMode::FullChain) => Some(Ordering::Equal),
164            (ListeningMode::FullChain, _) => Some(Ordering::Greater),
165            (_, ListeningMode::FullChain) => Some(Ordering::Less),
166            (ListeningMode::FollowChain, ListeningMode::FollowChain) => Some(Ordering::Equal),
167            (ListeningMode::FollowChain, ListeningMode::EventsOnly(_)) => Some(Ordering::Greater),
168            (ListeningMode::EventsOnly(_), ListeningMode::FollowChain) => Some(Ordering::Less),
169            (ListeningMode::EventsOnly(a), ListeningMode::EventsOnly(b)) => {
170                if a == b {
171                    Some(Ordering::Equal)
172                } else if a.is_superset(b) {
173                    Some(Ordering::Greater)
174                } else if b.is_superset(a) {
175                    Some(Ordering::Less)
176                } else {
177                    None
178                }
179            }
180        }
181    }
182}
183
184impl ListeningMode {
185    /// Returns whether a notification with this reason should be processed under this listening
186    /// mode.
187    pub fn is_relevant(&self, reason: &Reason) -> bool {
188        match (reason, self) {
189            // NewEvents is only processed in EventsOnly mode, the other modes depend on
190            // the NewBlock notification.
191            (Reason::NewEvents { .. }, ListeningMode::FollowChain | ListeningMode::FullChain) => {
192                false
193            }
194            // FullChain processes everything.
195            (_, ListeningMode::FullChain) => true,
196            // FollowChain processes new blocks on the chain itself, including blocks that
197            // produced events.
198            (Reason::NewBlock { .. }, ListeningMode::FollowChain) => true,
199            (_, ListeningMode::FollowChain) => false,
200            // EventsOnly only processes events from relevant streams.
201            (Reason::NewEvents { event_streams, .. }, ListeningMode::EventsOnly(relevant)) => {
202                relevant.intersection(event_streams).next().is_some()
203            }
204            (_, ListeningMode::EventsOnly(_)) => false,
205        }
206    }
207
208    pub fn extend(&mut self, other: Option<ListeningMode>) {
209        match (self, other) {
210            (_, None) => (),
211            (ListeningMode::FullChain, _) => (),
212            (mode, Some(ListeningMode::FullChain)) => {
213                *mode = ListeningMode::FullChain;
214            }
215            (ListeningMode::FollowChain, _) => (),
216            (mode, Some(ListeningMode::FollowChain)) => {
217                *mode = ListeningMode::FollowChain;
218            }
219            (
220                ListeningMode::EventsOnly(self_events),
221                Some(ListeningMode::EventsOnly(other_events)),
222            ) => {
223                self_events.extend(other_events);
224            }
225        }
226    }
227
228    /// Returns whether this mode implies follow-only behavior (i.e., not participating in
229    /// consensus rounds).
230    pub fn is_follow_only(&self) -> bool {
231        !matches!(self, ListeningMode::FullChain)
232    }
233
234    /// Returns whether this is a full chain mode (synchronizing sender chains and updating
235    /// inboxes).
236    pub fn is_full(&self) -> bool {
237        matches!(self, ListeningMode::FullChain)
238    }
239
240    pub fn should_sync_chain_state(&self) -> bool {
241        match self {
242            ListeningMode::FullChain | ListeningMode::FollowChain => true,
243            ListeningMode::EventsOnly(_) => false,
244        }
245    }
246}
247
248/// A builder that creates [`ChainClient`]s which share the cache and notifiers.
249pub struct Client<Env: Environment> {
250    environment: Env,
251    /// Local node to manage the execution state and the local storage of the chains that we are
252    /// tracking.
253    pub local_node: LocalNodeClient<Env::Storage>,
254    /// Manages the requests sent to validator nodes.
255    requests_scheduler: RequestsScheduler<Env>,
256    /// The admin chain ID.
257    admin_chain_id: ChainId,
258    /// Chains that should be tracked by the client, along with their listening mode.
259    /// The presence of a chain in this map means it is tracked by the local node.
260    chain_modes: Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>,
261    /// References to clients waiting for chain notifications.
262    notifier: Arc<ChannelNotifier<Notification>>,
263    /// Chain state for the managed chains.
264    chains: papaya::HashMap<ChainId, chain_client::State>,
265    /// Configuration options.
266    options: chain_client::Options,
267}
268
269impl<Env: Environment> Client<Env> {
270    /// Creates a new `Client` with a new cache and notifiers.
271    #[instrument(level = "trace", skip_all)]
272    #[expect(clippy::too_many_arguments)]
273    pub fn new(
274        environment: Env,
275        admin_chain_id: ChainId,
276        long_lived_services: bool,
277        chain_modes: impl IntoIterator<Item = (ChainId, ListeningMode)>,
278        name: impl Into<String>,
279        chain_worker_ttl: Option<Duration>,
280        sender_chain_worker_ttl: Option<Duration>,
281        options: chain_client::Options,
282        block_cache_size: usize,
283        execution_state_cache_size: usize,
284        requests_scheduler_config: &requests_scheduler::RequestsSchedulerConfig,
285    ) -> Self {
286        let chain_modes = Arc::new(RwLock::new(chain_modes.into_iter().collect()));
287        let config = ChainWorkerConfig {
288            nickname: name.into(),
289            long_lived_services,
290            allow_inactive_chains: true,
291            allow_messages_from_deprecated_epochs: true,
292            ttl: chain_worker_ttl,
293            sender_chain_ttl: sender_chain_worker_ttl,
294            block_cache_size,
295            execution_state_cache_size,
296            ..ChainWorkerConfig::default()
297        };
298        let state = WorkerState::new(
299            environment.storage().clone(),
300            config,
301            Some(chain_modes.clone()),
302        );
303        let local_node = LocalNodeClient::new(state);
304        let requests_scheduler = RequestsScheduler::new(vec![], requests_scheduler_config);
305
306        Self {
307            environment,
308            local_node,
309            requests_scheduler,
310            chains: papaya::HashMap::new(),
311            admin_chain_id,
312            chain_modes,
313            notifier: Arc::new(ChannelNotifier::default()),
314            options,
315        }
316    }
317
318    /// Returns the chain ID of the admin chain.
319    pub fn admin_chain_id(&self) -> ChainId {
320        self.admin_chain_id
321    }
322
323    /// Subscribes to notifications for the given chain IDs.
324    pub fn subscribe(
325        &self,
326        chain_ids: Vec<ChainId>,
327    ) -> tokio::sync::mpsc::UnboundedReceiver<Notification> {
328        self.notifier.subscribe(chain_ids)
329    }
330
331    /// Adds additional chain IDs to an existing subscription.
332    pub fn subscribe_extra(
333        &self,
334        chain_ids: Vec<ChainId>,
335        sender: &tokio::sync::mpsc::UnboundedSender<Notification>,
336    ) {
337        self.notifier.add_sender(chain_ids, sender);
338    }
339
340    /// Returns the storage client used by this client's local node.
341    pub fn storage_client(&self) -> &Env::Storage {
342        self.environment.storage()
343    }
344
345    /// Tries to read a certificate from local storage, using the hash if available
346    /// (fast path) or falling back to a height-based lookup.
347    async fn try_read_local_certificate(
348        &self,
349        chain_id: ChainId,
350        height: BlockHeight,
351        hash: Option<CryptoHash>,
352    ) -> Result<Option<Arc<ConfirmedBlockCertificate>>, chain_client::Error> {
353        if let Some(hash) = hash {
354            return Ok(self.storage_client().read_certificate(hash).await?);
355        }
356        let results = self
357            .storage_client()
358            .read_certificates_by_heights(chain_id, &[height])
359            .await?;
360        Ok(results.into_iter().next().flatten())
361    }
362
363    pub fn validator_node_provider(&self) -> &Env::Network {
364        self.environment.network()
365    }
366
367    pub(crate) fn options(&self) -> &chain_client::Options {
368        &self.options
369    }
370
371    /// Handles any pending local cross-chain requests, notifying subscribers.
372    pub async fn retry_pending_cross_chain_requests(
373        &self,
374        sender_chain: ChainId,
375    ) -> Result<(), LocalNodeError> {
376        self.local_node
377            .retry_pending_cross_chain_requests(sender_chain, &self.notifier)
378            .await
379    }
380
381    /// Returns a reference to the client's [`Signer`][crate::environment::Signer].
382    #[instrument(level = "trace", skip(self))]
383    pub fn signer(&self) -> &Env::Signer {
384        self.environment.signer()
385    }
386
387    /// Returns whether the signer has a key for the given owner.
388    pub async fn has_key_for(&self, owner: &AccountOwner) -> Result<bool, chain_client::Error> {
389        self.signer()
390            .contains_key(owner)
391            .await
392            .map_err(chain_client::Error::signer_failure)
393    }
394
395    /// Returns a reference to the client's [`Wallet`][crate::environment::Wallet].
396    pub fn wallet(&self) -> &Env::Wallet {
397        self.environment.wallet()
398    }
399
400    /// Extends the listening mode for a chain, combining with the existing mode if present.
401    /// Returns the resulting mode.
402    #[instrument(level = "trace", skip(self))]
403    pub fn extend_chain_mode(&self, chain_id: ChainId, mode: ListeningMode) -> ListeningMode {
404        let mut chain_modes = self
405            .chain_modes
406            .write()
407            .expect("Panics should not happen while holding a lock to `chain_modes`");
408        let entry = chain_modes.entry(chain_id).or_insert_with(|| mode.clone());
409        entry.extend(Some(mode));
410        entry.clone()
411    }
412
413    /// Returns the listening mode for a chain, if it is tracked.
414    pub fn chain_mode(&self, chain_id: ChainId) -> Option<ListeningMode> {
415        self.chain_modes
416            .read()
417            .expect("Panics should not happen while holding a lock to `chain_modes`")
418            .get(&chain_id)
419            .cloned()
420    }
421
422    /// Returns whether a chain is fully tracked by the local node.
423    pub fn is_tracked(&self, chain_id: ChainId) -> bool {
424        self.chain_modes
425            .read()
426            .expect("Panics should not happen while holding a lock to `chain_modes`")
427            .get(&chain_id)
428            .is_some_and(ListeningMode::is_full)
429    }
430
431    /// Creates a new `ChainClient`.
432    #[expect(clippy::too_many_arguments)]
433    #[instrument(level = "trace", skip_all, fields(chain_id, next_block_height))]
434    pub fn create_chain_client(
435        self: &Arc<Self>,
436        chain_id: ChainId,
437        block_hash: Option<CryptoHash>,
438        next_block_height: BlockHeight,
439        pending_proposal: &Option<PendingProposal>,
440        preferred_owner: Option<AccountOwner>,
441        timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
442        follow_only: bool,
443    ) -> ChainClient<Env> {
444        // If the entry already exists we assume that the entry is more up to date than
445        // the arguments: If they were read from the wallet file, they might be stale.
446        self.chains.pin().get_or_insert_with(chain_id, || {
447            chain_client::State::new(pending_proposal.clone(), follow_only)
448        });
449
450        ChainClient::new(
451            self.clone(),
452            chain_id,
453            self.options.clone(),
454            block_hash,
455            next_block_height,
456            preferred_owner,
457            timing_sender,
458        )
459    }
460
461    /// Returns whether the given chain is in follow-only mode.
462    fn is_chain_follow_only(&self, chain_id: ChainId) -> bool {
463        self.chains
464            .pin()
465            .get(&chain_id)
466            .is_some_and(|state| state.is_follow_only())
467    }
468
469    /// Sets whether the given chain is in follow-only mode.
470    pub fn set_chain_follow_only(&self, chain_id: ChainId, follow_only: bool) {
471        self.chains
472            .pin()
473            .update(chain_id, |state| state.with_follow_only(follow_only));
474    }
475
476    /// Fetches the chain description blob if needed, and returns the chain info.
477    async fn fetch_chain_info(
478        &self,
479        chain_id: ChainId,
480        validators: &[RemoteNode<Env::ValidatorNode>],
481    ) -> Result<Box<ChainInfo>, chain_client::Error> {
482        match self.local_node.chain_info(chain_id).await {
483            Ok(info) => Ok(info),
484            Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
485                // Make sure the admin chain is up to date.
486                self.synchronize_chain_state(self.admin_chain_id).await?;
487                // If the chain is missing then the error is a WorkerError
488                // and so a BlobsNotFound
489                self.update_local_node_with_blobs_from(blob_ids, validators)
490                    .await?;
491                Ok(self.local_node.chain_info(chain_id).await?)
492            }
493            Err(err) => Err(err.into()),
494        }
495    }
496
497    /// Downloads and processes all certificates up to (excluding) the specified height.
498    #[instrument(level = "trace", skip(self))]
499    async fn download_certificates(
500        &self,
501        chain_id: ChainId,
502        target_next_block_height: BlockHeight,
503    ) -> Result<Box<ChainInfo>, chain_client::Error> {
504        let validators = self.validator_nodes().await?;
505        let mut info = Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
506        if target_next_block_height <= info.next_block_height {
507            return Ok(info);
508        }
509        info = self
510            .load_local_certificates(chain_id, target_next_block_height, None)
511            .await?;
512        let mut next_height = info.next_block_height;
513        // Download remaining batches using all validators with staggered fallback.
514        while next_height < target_next_block_height {
515            let limit = u64::from(target_next_block_height)
516                .checked_sub(u64::from(next_height))
517                .ok_or(ArithmeticError::Overflow)?
518                .min(self.options.certificate_download_batch_size);
519            let certificates = self
520                .requests_scheduler
521                .download_certificates_from_validators(
522                    &validators,
523                    chain_id,
524                    next_height,
525                    limit,
526                    self.options.certificate_batch_download_timeout,
527                )
528                .await?;
529            let Some(new_info) = self
530                .process_certificates(&validators, certificates, None)
531                .await?
532            else {
533                break;
534            };
535            assert!(new_info.next_block_height > next_height);
536            next_height = new_info.next_block_height;
537            info = new_info;
538        }
539        ensure!(
540            target_next_block_height <= info.next_block_height,
541            chain_client::Error::CannotDownloadCertificates {
542                chain_id,
543                target_next_block_height,
544            }
545        );
546        Ok(info)
547    }
548
549    /// Loads and processes certificates from local storage for the given chain, from the
550    /// current local height up to `end`. Returns the chain info after processing.
551    /// If `until_block_time` is `Some`, stops before processing any certificate whose
552    /// block timestamp is >= the given value (exclusive).
553    async fn load_local_certificates(
554        &self,
555        chain_id: ChainId,
556        end: BlockHeight,
557        until_block_time: Option<Timestamp>,
558    ) -> Result<Box<ChainInfo>, chain_client::Error> {
559        let mut last_info = self.local_node.chain_info(chain_id).await?;
560        let next_height = last_info.next_block_height;
561        let hashes = self
562            .local_node
563            .get_preprocessed_block_hashes(chain_id, next_height, end)
564            .await?;
565        let certificates = self.storage_client().read_certificates(&hashes).await?;
566        let certificates = match ResultReadCertificates::new(certificates, hashes) {
567            ResultReadCertificates::Certificates(certificates) => certificates,
568            ResultReadCertificates::InvalidHashes(hashes) => {
569                return Err(chain_client::Error::ReadCertificatesError(hashes))
570            }
571        };
572        for certificate in certificates {
573            if let Some(until) = until_block_time {
574                if certificate.value().block().header.timestamp >= until {
575                    break;
576                }
577            }
578            last_info = self.handle_certificate(certificate).await?.info;
579        }
580        Ok(last_info)
581    }
582
583    /// Downloads and processes certificates from the given validator.
584    ///
585    /// Stops when either condition is met:
586    /// - `stop`: the local chain has reached that height (exclusive).
587    /// - `until_block_time`: the next block's timestamp is >= that value (exclusive).
588    #[instrument(level = "trace", skip_all)]
589    async fn download_certificates_from(
590        &self,
591        remote_node: &RemoteNode<Env::ValidatorNode>,
592        chain_id: ChainId,
593        stop: BlockHeight,
594        until_block_time: Option<Timestamp>,
595    ) -> Result<Box<ChainInfo>, chain_client::Error> {
596        let mut last_info = self
597            .load_local_certificates(chain_id, stop, until_block_time)
598            .await?;
599        let mut next_height = last_info.next_block_height;
600        // Now download the rest in batches from the remote node.
601        while next_height < stop {
602            // TODO(#2045): Analyze network errors instead of using a fixed batch size.
603            let limit = u64::from(stop)
604                .checked_sub(u64::from(next_height))
605                .ok_or(ArithmeticError::Overflow)?
606                .min(self.options.certificate_download_batch_size);
607
608            let certificates = self
609                .requests_scheduler
610                .download_certificates(remote_node, chain_id, next_height, limit)
611                .await?;
612            let Some(info) = self
613                .process_certificates(slice::from_ref(remote_node), certificates, until_block_time)
614                .await?
615            else {
616                break;
617            };
618            assert!(info.next_block_height > next_height);
619            next_height = info.next_block_height;
620            last_info = info;
621        }
622        Ok(last_info)
623    }
624
625    async fn download_blobs(
626        &self,
627        remote_nodes: &[RemoteNode<Env::ValidatorNode>],
628        blob_ids: &[BlobId],
629    ) -> Result<(), chain_client::Error> {
630        let blobs = &self
631            .requests_scheduler
632            .download_blobs(remote_nodes, blob_ids, self.options.blob_download_timeout)
633            .await?
634            .ok_or_else(|| {
635                chain_client::Error::RemoteNodeError(NodeError::BlobsNotFound(blob_ids.to_vec()))
636            })?;
637        self.local_node.store_blobs(blobs).await.map_err(Into::into)
638    }
639
640    /// Downloads the publisher chain certificates that contain the given events,
641    /// using the event block height index on validators. Queries a validator for
642    /// the block heights, downloads those certificates, and processes them — all
643    /// as one atomic unit per validator attempt, with staggered fallback.
644    #[instrument(level = "trace", skip_all)]
645    async fn download_certificates_for_events(
646        &self,
647        event_ids: &[EventId],
648    ) -> Result<(), chain_client::Error> {
649        let mut validators = self.validator_nodes().await?;
650        let timeout = self.options.certificate_batch_download_timeout;
651        let (max_epoch, committees) = self.admin_committees().await?;
652        let committees_ref = &committees;
653        let mut remaining_event_ids = event_ids.to_vec();
654
655        while !remaining_event_ids.is_empty() {
656            let remaining_ref = &remaining_event_ids;
657            validators.shuffle(&mut rand::thread_rng());
658            let result = communicate_concurrently(
659                &validators,
660                move |remote_node| {
661                    let validator_key = remote_node.public_key;
662                    let validator_address = remote_node.address();
663                    Box::pin(async move {
664                        // Query this validator for the block heights.
665                        let heights = remote_node
666                            .node
667                            .event_block_heights(remaining_ref.to_vec())
668                            .await?;
669
670                        // Separate resolved and unresolved events.
671                        let mut chain_heights = BTreeMap::<_, BTreeSet<_>>::new();
672                        let mut expected_events = BTreeMap::<_, HashSet<EventId>>::new();
673                        let mut unresolved = Vec::new();
674                        for (event_id, maybe_height) in remaining_ref.iter().zip(heights) {
675                            if let Some(height) = maybe_height {
676                                chain_heights
677                                    .entry(event_id.chain_id)
678                                    .or_default()
679                                    .insert(height);
680                                expected_events
681                                    .entry((event_id.chain_id, height))
682                                    .or_default()
683                                    .insert(event_id.clone());
684                            } else {
685                                unresolved.push(event_id.clone());
686                            }
687                        }
688                        if chain_heights.is_empty() {
689                            // This validator has no useful information.
690                            return Err(chain_client::Error::from(NodeError::EventsNotFound(remaining_ref.clone())));
691                        }
692
693                        // Download certificates and verify them.
694                        let mut checked_certificates = Vec::<ConfirmedBlockCertificate>::new();
695                        for (chain_id, heights) in chain_heights {
696                            let heights_vec = heights.into_iter().collect::<Vec<_>>();
697                            let certificates = self
698                                .requests_scheduler
699                                .download_certificates_by_heights(
700                                    &remote_node,
701                                    chain_id,
702                                    heights_vec,
703                                )
704                                .await?;
705                            for cert in &certificates {
706                                // Verify the block contains the expected events.
707                                let block = cert.block();
708                                let block_event_ids = block.event_ids().collect::<HashSet<_>>();
709                                if let Some(expected_event_ids) =
710                                    expected_events.get(&(chain_id, block.header.height))
711                                {
712                                    if !expected_event_ids.is_subset(&block_event_ids) {
713                                        tracing::debug!(
714                                            %validator_address, ?expected_event_ids, ?block_event_ids,
715                                            "validator lied about events in block."
716                                        );
717                                        return Err(NodeError::UnexpectedCertificateValue.into());
718                                    }
719                                }
720                            }
721                            for cert in certificates {
722                                Self::check_certificate(max_epoch, committees_ref, &cert)
723                                    .map_err(|error| {
724                                        tracing::debug!(
725                                            %validator_address, %error,
726                                            "invalid certificate"
727                                        );
728                                        error
729                                    })?
730                                    .into_result()
731                                    .map_err(|error| {
732                                        tracing::debug!(
733                                            %validator_address, %error,
734                                            "could not check certificate"
735                                        );
736                                        error
737                                    })?;
738                                checked_certificates.push(cert);
739                            }
740                        }
741                        Ok((checked_certificates, unresolved, validator_key))
742                    })
743                },
744                |errors| {
745                    errors
746                        .into_iter()
747                        .map(|(validator, _error)| validator)
748                        .collect::<BTreeSet<_>>()
749                },
750                timeout,
751            )
752            .await;
753
754            match result {
755                Ok((certificates, unresolved, validator_key)) => {
756                    for certificate in certificates {
757                        let mode = ReceiveCertificateMode::AlreadyChecked;
758                        self.receive_sender_certificate(
759                            self.storage_client().cache_certificate(certificate),
760                            mode,
761                            None,
762                        )
763                        .await?;
764                    }
765                    validators.retain(|node| node.public_key != validator_key);
766                    remaining_event_ids = unresolved;
767                }
768                Err(_) => {
769                    // All validators failed; no point retrying.
770                    return Err(NodeError::EventsNotFound(remaining_event_ids).into());
771                }
772            }
773        }
774        Ok(())
775    }
776
777    /// Tries to process all the certificates, requesting any missing blobs from the given nodes.
778    /// Returns the chain info of the last successfully processed certificate.
779    /// If `until_block_time` is `Some`, stops before processing any certificate whose
780    /// block timestamp is greater or equal than the given value.
781    #[instrument(level = "trace", skip_all)]
782    async fn process_certificates(
783        &self,
784        remote_nodes: &[RemoteNode<Env::ValidatorNode>],
785        certificates: Vec<ConfirmedBlockCertificate>,
786        until_block_time: Option<Timestamp>,
787    ) -> Result<Option<Box<ChainInfo>>, chain_client::Error> {
788        let mut info = None;
789        // Blobs created by these certs are already embedded in the downloaded
790        // block bodies, so they don't need to be fetched from a validator. The
791        // chain worker resolves them from `Block::created_blobs()` during
792        // `handle_certificate`.
793        let created_blob_ids: BTreeSet<BlobId> = certificates
794            .iter()
795            .flat_map(|certificate| certificate.value().block().created_blob_ids())
796            .collect();
797        let required_blob_ids: Vec<_> = certificates
798            .iter()
799            .flat_map(|certificate| certificate.value().required_blob_ids())
800            .filter(|blob_id| !created_blob_ids.contains(blob_id))
801            .collect();
802
803        match self
804            .local_node
805            .read_blob_states_from_storage(&required_blob_ids)
806            .await
807        {
808            Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
809                self.download_blobs(remote_nodes, &blob_ids).await?;
810            }
811            x => {
812                x?;
813            }
814        }
815
816        for certificate in certificates {
817            if let Some(until) = until_block_time {
818                if certificate.value().block().header.timestamp >= until {
819                    break;
820                }
821            }
822            info = Some(
823                match self.handle_certificate(certificate.clone()).await {
824                    Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
825                        self.download_blobs(remote_nodes, &blob_ids).await?;
826                        self.handle_certificate(certificate).await?
827                    }
828                    x => x?,
829                }
830                .info,
831            );
832        }
833
834        Ok(info)
835    }
836
837    async fn handle_certificate<T: ProcessableCertificate>(
838        &self,
839        certificate: GenericCertificate<T>,
840    ) -> Result<ChainInfoResponse, LocalNodeError> {
841        self.local_node
842            .handle_certificate(certificate, &self.notifier)
843            .await
844    }
845
846    async fn chain_info_with_committees(
847        &self,
848        chain_id: ChainId,
849    ) -> Result<Box<ChainInfo>, LocalNodeError> {
850        let query = ChainInfoQuery::new(chain_id).with_committees();
851        let info = self.local_node.handle_chain_info_query(query).await?.info;
852        Ok(info)
853    }
854
855    /// Obtains all the committees trusted by any of the given chains. Also returns the highest
856    /// of their epochs.
857    #[instrument(level = "trace", skip_all)]
858    async fn admin_committees(
859        &self,
860    ) -> Result<(Epoch, BTreeMap<Epoch, Arc<Committee>>), LocalNodeError> {
861        let info = self.chain_info_with_committees(self.admin_chain_id).await?;
862        let hashes = info
863            .requested_committees
864            .ok_or(LocalNodeError::InvalidChainInfoResponse)?;
865        let committees =
866            futures::future::try_join_all(hashes.into_iter().map(|(epoch, hash)| async move {
867                let committee = self
868                    .storage_client()
869                    .get_or_load_committee_by_hash(hash)
870                    .await?;
871                Ok::<_, LocalNodeError>((epoch, committee))
872            }))
873            .await?
874            .into_iter()
875            .collect();
876        Ok((info.epoch, committees))
877    }
878
879    /// Obtains the committee for the latest epoch on the admin chain.
880    pub async fn admin_committee(&self) -> Result<(Epoch, Arc<Committee>), LocalNodeError> {
881        let info = self.chain_info_with_committees(self.admin_chain_id).await?;
882        let hash = info
883            .requested_committees
884            .as_ref()
885            .ok_or(LocalNodeError::InvalidChainInfoResponse)?
886            .get(&info.epoch)
887            .copied()
888            .ok_or(LocalNodeError::InactiveChain(self.admin_chain_id))?;
889        let committee = self
890            .storage_client()
891            .get_or_load_committee_by_hash(hash)
892            .await?;
893        Ok((info.epoch, committee))
894    }
895
896    /// Obtains the validators for the latest epoch.
897    async fn validator_nodes(
898        &self,
899    ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, chain_client::Error> {
900        let (_, committee) = self.admin_committee().await?;
901        Ok(self.make_nodes(&committee)?)
902    }
903
904    /// Creates a [`RemoteNode`] for each validator in the committee.
905    fn make_nodes(
906        &self,
907        committee: &Committee,
908    ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, NodeError> {
909        Ok(self
910            .validator_node_provider()
911            .make_nodes(committee)?
912            .map(|(public_key, node)| RemoteNode { public_key, node })
913            .collect())
914    }
915
916    /// Ensures that the client has the `ChainDescription` blob corresponding to this
917    /// client's `ChainId`, and returns the chain description blob.
918    pub async fn get_chain_description_blob(
919        &self,
920        chain_id: ChainId,
921    ) -> Result<Arc<Blob>, chain_client::Error> {
922        let chain_desc_id = BlobId::new(chain_id.0, BlobType::ChainDescription);
923        let blob = self
924            .local_node
925            .storage_client()
926            .read_blob(chain_desc_id)
927            .await?;
928        if let Some(blob) = blob {
929            // We have the blob - return it.
930            return Ok(blob);
931        }
932        // Recover history from the current validators, according to the admin chain.
933        self.synchronize_chain_state(self.admin_chain_id).await?;
934        let nodes = self.validator_nodes().await?;
935        Ok(self
936            .update_local_node_with_blobs_from(vec![chain_desc_id], &nodes)
937            .await?
938            .pop()
939            .unwrap()) // Returns exactly as many blobs as passed-in IDs.
940    }
941
942    /// Ensures that the client has the `ChainDescription` blob corresponding to this
943    /// client's `ChainId`, and returns the chain description.
944    pub async fn get_chain_description(
945        &self,
946        chain_id: ChainId,
947    ) -> Result<ChainDescription, chain_client::Error> {
948        let blob = self.get_chain_description_blob(chain_id).await?;
949        Ok(bcs::from_bytes(blob.bytes())?)
950    }
951
952    /// Submits a validated block for finalization and returns the confirmed block certificate.
953    #[instrument(level = "trace", skip_all)]
954    pub(crate) async fn finalize_block(
955        self: &Arc<Self>,
956        committee: &Committee,
957        certificate: ValidatedBlockCertificate,
958    ) -> Result<ConfirmedBlockCertificate, chain_client::Error> {
959        debug!(round = %certificate.round, "Submitting block for confirmation");
960        let hashed_value = ConfirmedBlock::new(certificate.inner().block().clone());
961        let finalize_action = CommunicateAction::FinalizeBlock {
962            certificate: Box::new(certificate),
963            delivery: self.options.cross_chain_message_delivery,
964        };
965        let certificate = self
966            .communicate_chain_action(committee, finalize_action, hashed_value)
967            .await?;
968        self.receive_certificate_with_checked_signatures(certificate.clone())
969            .await?;
970        Ok(certificate)
971    }
972
973    /// Submits a block proposal to the validators.
974    #[instrument(level = "trace", skip_all)]
975    async fn submit_block_proposal<T: ProcessableCertificate>(
976        self: &Arc<Self>,
977        committee: Arc<Committee>,
978        proposal: Box<BlockProposal>,
979        value: T,
980    ) -> Result<GenericCertificate<T>, chain_client::Error> {
981        debug!(
982            round = %proposal.content.round,
983            "Submitting block proposal to validators"
984        );
985
986        // Check if the block timestamp is in the future and log INFO.
987        let block_timestamp = proposal.content.block.timestamp;
988        let local_time = self.local_node.storage_client().clock().current_time();
989        if block_timestamp > local_time {
990            info!(
991                chain_id = %proposal.content.block.chain_id,
992                %block_timestamp,
993                %local_time,
994                "Block timestamp is in the future; waiting until it can be proposed",
995            );
996        }
997
998        // Create channel for clock skew reports from validators.
999        let (clock_skew_sender, mut clock_skew_receiver) = mpsc::unbounded_channel();
1000        let submit_action = CommunicateAction::SubmitBlock {
1001            proposal,
1002            blob_ids: value.required_blob_ids().into_iter().collect(),
1003            clock_skew_sender,
1004        };
1005
1006        // Spawn a task to monitor clock skew reports and warn if threshold is reached.
1007        let validity_threshold = committee.validity_threshold();
1008        let committee_clone = committee.clone();
1009        let clock_skew_check_handle = linera_base::Task::spawn(async move {
1010            let mut skew_weight = 0u64;
1011            let mut min_skew = TimeDelta::MAX;
1012            let mut max_skew = TimeDelta::ZERO;
1013            while let Some((public_key, clock_skew)) = clock_skew_receiver.recv().await {
1014                if clock_skew.as_micros() > 0 {
1015                    skew_weight += committee_clone.weight(&public_key);
1016                    min_skew = min_skew.min(clock_skew);
1017                    max_skew = max_skew.max(clock_skew);
1018                    if skew_weight >= validity_threshold {
1019                        warn!(
1020                            skew_weight,
1021                            validity_threshold,
1022                            min_skew_ms = min_skew.as_micros() / 1000,
1023                            max_skew_ms = max_skew.as_micros() / 1000,
1024                            "A validity threshold of validators reported clock skew; \
1025                             consider checking your system clock",
1026                        );
1027                        return;
1028                    }
1029                }
1030            }
1031        });
1032
1033        let certificate = self
1034            .communicate_chain_action(&committee, submit_action, value)
1035            .await?;
1036
1037        clock_skew_check_handle.await;
1038
1039        self.handle_certificate(certificate.clone()).await?;
1040        Ok(certificate)
1041    }
1042
1043    /// Broadcasts certified blocks to validators.
1044    #[instrument(level = "trace", skip_all, fields(chain_id, block_height, delivery))]
1045    async fn communicate_chain_updates(
1046        self: &Arc<Self>,
1047        committee: &Committee,
1048        chain_id: ChainId,
1049        height: BlockHeight,
1050        delivery: CrossChainMessageDelivery,
1051        latest_certificate: Option<Arc<GenericCertificate<ConfirmedBlock>>>,
1052    ) -> Result<(), chain_client::Error> {
1053        let nodes = self.make_nodes(committee)?;
1054        communicate_with_quorum(
1055            &nodes,
1056            committee,
1057            |_: &()| (),
1058            |remote_node| {
1059                let mut updater = ValidatorUpdater {
1060                    remote_node,
1061                    client: self.clone(),
1062                    admin_chain_id: self.admin_chain_id,
1063                };
1064                let certificate = latest_certificate.clone();
1065                Box::pin(async move {
1066                    updater
1067                        .send_chain_information(chain_id, height, delivery, certificate)
1068                        .await
1069                })
1070            },
1071            self.options.quorum_grace_period,
1072        )
1073        .await?;
1074        Ok(())
1075    }
1076
1077    /// Broadcasts certified blocks and optionally a block proposal, certificate or
1078    /// leader timeout request.
1079    ///
1080    /// In that case, it verifies that the validator votes are for the provided value,
1081    /// and returns a certificate.
1082    #[instrument(level = "trace", skip_all)]
1083    async fn communicate_chain_action<T: CertificateValue>(
1084        self: &Arc<Self>,
1085        committee: &Committee,
1086        action: CommunicateAction,
1087        value: T,
1088    ) -> Result<GenericCertificate<T>, chain_client::Error> {
1089        let nodes = self.make_nodes(committee)?;
1090        let ((votes_hash, votes_round), votes) = communicate_with_quorum(
1091            &nodes,
1092            committee,
1093            |vote: &LiteVote| (vote.value.value_hash, vote.round),
1094            |remote_node| {
1095                let mut updater = ValidatorUpdater {
1096                    remote_node,
1097                    client: self.clone(),
1098                    admin_chain_id: self.admin_chain_id,
1099                };
1100                let action = action.clone();
1101                Box::pin(async move { updater.send_chain_update(action).await })
1102            },
1103            self.options.quorum_grace_period,
1104        )
1105        .await?;
1106        ensure!(
1107            (votes_hash, votes_round) == (value.hash(), action.round()),
1108            chain_client::Error::UnexpectedQuorum {
1109                hash: votes_hash,
1110                round: votes_round,
1111                expected_hash: value.hash(),
1112                expected_round: action.round(),
1113            }
1114        );
1115        // Certificate is valid because
1116        // * `communicate_with_quorum` ensured a sufficient "weight" of
1117        // (non-error) answers were returned by validators.
1118        // * each answer is a vote signed by the expected validator.
1119        let certificate = LiteCertificate::try_from_votes(votes)
1120            .ok_or_else(|| {
1121                chain_client::Error::InternalError(
1122                    "Vote values or rounds don't match; this is a bug",
1123                )
1124            })?
1125            .with_value(value)
1126            .ok_or_else(|| {
1127                chain_client::Error::ProtocolError("A quorum voted for an unexpected value")
1128            })?;
1129        Ok(certificate)
1130    }
1131
1132    /// Processes the confirmed block certificate in the local node without checking signatures.
1133    /// Also downloads and processes all ancestors that are still missing.
1134    #[instrument(level = "trace", skip_all)]
1135    async fn receive_certificate_with_checked_signatures(
1136        &self,
1137        certificate: ConfirmedBlockCertificate,
1138    ) -> Result<(), chain_client::Error> {
1139        let block = certificate.block();
1140        // Recover history from the network.
1141        self.download_certificates(block.header.chain_id, block.header.height)
1142            .await?;
1143        // Process the received operations. Download required hashed certificate values if
1144        // necessary.
1145        if let Err(err) = self.handle_certificate(certificate.clone()).await {
1146            match &err {
1147                LocalNodeError::BlobsNotFound(blob_ids) => {
1148                    self.download_blobs(&self.validator_nodes().await?, blob_ids)
1149                        .await
1150                        .map_err(|_| err)?;
1151                    self.handle_certificate(certificate).await?;
1152                }
1153                _ => {
1154                    // The certificate is not as expected. Give up.
1155                    warn!("Failed to process network hashed certificate value");
1156                    return Err(err.into());
1157                }
1158            }
1159        }
1160
1161        Ok(())
1162    }
1163
1164    /// Processes the confirmed block in the local node, possibly without executing it.
1165    #[instrument(level = "trace", skip_all)]
1166    async fn receive_sender_certificate(
1167        &self,
1168        certificate: Arc<ConfirmedBlockCertificate>,
1169        mode: ReceiveCertificateMode,
1170        nodes: Option<Vec<RemoteNode<Env::ValidatorNode>>>,
1171    ) -> Result<(), chain_client::Error> {
1172        // Verify the certificate before doing any expensive networking.
1173        let (max_epoch, committees) = self.admin_committees().await?;
1174        if let ReceiveCertificateMode::NeedsCheck = mode {
1175            Self::check_certificate(max_epoch, &committees, &certificate)?.into_result()?;
1176        }
1177        // Recover history from the network.
1178        let nodes = if let Some(nodes) = nodes {
1179            nodes
1180        } else {
1181            self.validator_nodes().await?
1182        };
1183        if let Err(err) = self.handle_certificate((*certificate).clone()).await {
1184            match &err {
1185                LocalNodeError::BlobsNotFound(blob_ids) => {
1186                    self.download_blobs(&nodes, blob_ids).await?;
1187                    self.handle_certificate(Arc::unwrap_or_clone(certificate))
1188                        .await?;
1189                }
1190                _ => {
1191                    // The certificate is not as expected. Give up.
1192                    warn!("Failed to process network hashed certificate value");
1193                    return Err(err.into());
1194                }
1195            }
1196        }
1197
1198        Ok(())
1199    }
1200
1201    /// Downloads and processes certificates for sender chain blocks.
1202    #[instrument(level = "debug", skip_all, fields(chain_id = %sender_chain_id))]
1203    async fn download_and_process_sender_chain(
1204        &self,
1205        sender_chain_id: ChainId,
1206        nodes: &[RemoteNode<Env::ValidatorNode>],
1207        received_log: &ReceivedLogs,
1208        mut remote_heights: Vec<BlockHeight>,
1209        sender: mpsc::UnboundedSender<ChainAndHeight>,
1210    ) {
1211        let (max_epoch, committees) = match self.admin_committees().await {
1212            Ok(result) => result,
1213            Err(error) => {
1214                error!(%error, %sender_chain_id, "could not read admin committees");
1215                return;
1216            }
1217        };
1218        let committees_ref = &committees;
1219        let mut nodes = nodes.to_vec();
1220        while !remote_heights.is_empty() {
1221            // Check local storage first — certificates may already be available from
1222            // a prior sync cycle, another receiver chain, or a concurrent notification.
1223            if let Ok(local_certs) = self
1224                .storage_client()
1225                .read_certificates_by_heights(sender_chain_id, &remote_heights)
1226                .await
1227            {
1228                let mut still_needed = Vec::new();
1229                for (height, maybe_cert) in remote_heights.iter().copied().zip(local_certs) {
1230                    if let Some(certificate) = maybe_cert {
1231                        let chain_id = certificate.block().header.chain_id;
1232                        if let Err(error) = sender.send(ChainAndHeight { chain_id, height }) {
1233                            error!(
1234                                %chain_id, %height, %error,
1235                                "failed to send chain and height over the channel",
1236                            );
1237                        }
1238                    } else {
1239                        still_needed.push(height);
1240                    }
1241                }
1242                remote_heights = still_needed;
1243                if remote_heights.is_empty() {
1244                    break;
1245                }
1246            }
1247
1248            let remote_heights_ref = &remote_heights;
1249            let certificates = match communicate_concurrently(
1250                &nodes,
1251                async move |remote_node| {
1252                    let mut remote_heights = remote_heights_ref.clone();
1253                    // No need trying to download certificates the validator didn't have in their
1254                    // log - we'll retry downloading the remaining ones next time we loop.
1255                    remote_heights.retain(|height| {
1256                        received_log.validator_has_block(
1257                            &remote_node.public_key,
1258                            sender_chain_id,
1259                            *height,
1260                        )
1261                    });
1262                    if remote_heights.is_empty() {
1263                        // It makes no sense to return `Ok(_)` if we aren't going to try downloading
1264                        // anything from the validator - let the function try the other validators
1265                        return Err(NodeError::MissingCertificateValue);
1266                    }
1267                    let certificates = self
1268                        .requests_scheduler
1269                        .download_certificates_by_heights(
1270                            &remote_node,
1271                            sender_chain_id,
1272                            remote_heights,
1273                        )
1274                        .await?;
1275                    let mut certificates_with_check_results = vec![];
1276                    for cert in certificates {
1277                        let check_result =
1278                            Self::check_certificate(max_epoch, committees_ref, &cert)?;
1279                        certificates_with_check_results
1280                            .push((cert, check_result.into_result().is_ok()));
1281                    }
1282                    Ok(certificates_with_check_results)
1283                },
1284                |errors| {
1285                    errors
1286                        .into_iter()
1287                        .map(|(validator, _error)| validator)
1288                        .collect::<BTreeSet<_>>()
1289                },
1290                self.options.certificate_batch_download_timeout,
1291            )
1292            .await
1293            {
1294                Ok(certificates_with_check_results) => certificates_with_check_results,
1295                Err(faulty_validators) => {
1296                    // filter out faulty validators and retry if any are left
1297                    nodes.retain(|node| !faulty_validators.contains(&node.public_key));
1298                    if nodes.is_empty() {
1299                        info!(
1300                            chain_id = %sender_chain_id,
1301                            "could not download certificates for chain - no more correct validators left"
1302                        );
1303                        return;
1304                    }
1305                    continue;
1306                }
1307            };
1308
1309            trace!(
1310                num_certificates = %certificates.len(),
1311                "received certificates",
1312            );
1313
1314            let mut to_remove_from_queue = BTreeSet::new();
1315
1316            for (certificate, check_result) in certificates {
1317                let hash = certificate.hash();
1318                let chain_id = certificate.block().header.chain_id;
1319                let height = certificate.block().header.height;
1320                if !check_result {
1321                    // The certificate was correctly signed, but we were missing a committee to
1322                    // validate it properly - do not receive it, but also do not attempt to
1323                    // re-download it.
1324                    to_remove_from_queue.insert(height);
1325                    continue;
1326                }
1327                // We checked the certificates right after downloading them.
1328                let mode = ReceiveCertificateMode::AlreadyChecked;
1329                if let Err(error) = self
1330                    .receive_sender_certificate(
1331                        self.storage_client().cache_certificate(certificate),
1332                        mode,
1333                        None,
1334                    )
1335                    .await
1336                {
1337                    warn!(%error, %hash, "Received invalid certificate");
1338                } else {
1339                    to_remove_from_queue.insert(height);
1340                    if let Err(error) = sender.send(ChainAndHeight { chain_id, height }) {
1341                        error!(
1342                            %chain_id,
1343                            %height,
1344                            %error,
1345                            "failed to send chain and height over the channel",
1346                        );
1347                    }
1348                }
1349            }
1350
1351            remote_heights.retain(|height| !to_remove_from_queue.contains(height));
1352        }
1353        trace!("find_received_certificates: finished processing chain");
1354    }
1355
1356    /// Downloads the log of received messages for a chain from a validator.
1357    #[instrument(level = "trace", skip(self))]
1358    async fn get_received_log_from_validator(
1359        &self,
1360        chain_id: ChainId,
1361        remote_node: &RemoteNode<Env::ValidatorNode>,
1362        tracker: u64,
1363    ) -> Result<Vec<ChainAndHeight>, chain_client::Error> {
1364        let mut offset = tracker;
1365
1366        // Retrieve the list of newly received certificates from this validator.
1367        let mut remote_log = Vec::new();
1368        loop {
1369            trace!("get_received_log_from_validator: looping");
1370            let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_n(offset);
1371            let info = remote_node.handle_chain_info_query(query).await?;
1372            let received_entries = info.requested_received_log.len();
1373            offset += received_entries as u64;
1374            remote_log.extend(info.requested_received_log);
1375            trace!(
1376                remote_node = remote_node.address(),
1377                %received_entries,
1378                "get_received_log_from_validator: received log batch",
1379            );
1380            if received_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES {
1381                break;
1382            }
1383        }
1384
1385        trace!(
1386            remote_node = remote_node.address(),
1387            num_entries = remote_log.len(),
1388            "get_received_log_from_validator: returning downloaded log",
1389        );
1390
1391        Ok(remote_log)
1392    }
1393
1394    /// Downloads a specific sender block and recursively downloads any earlier blocks
1395    /// that also sent a message to our chain, based on `previous_message_blocks`.
1396    ///
1397    /// This ensures that we have all the sender blocks needed to preprocess the target block
1398    /// and put the messages to our chain into the outbox.
1399    async fn download_sender_block_with_sending_ancestors(
1400        &self,
1401        receiver_chain_id: ChainId,
1402        sender_chain_id: ChainId,
1403        height: BlockHeight,
1404        remote_node: &RemoteNode<Env::ValidatorNode>,
1405    ) -> Result<(), chain_client::Error> {
1406        let next_outbox_height = self
1407            .local_node
1408            .next_outbox_heights(&[sender_chain_id], receiver_chain_id)
1409            .await?
1410            .get(&sender_chain_id)
1411            .copied()
1412            .unwrap_or(BlockHeight::ZERO);
1413        let (max_epoch, committees) = self.admin_committees().await?;
1414
1415        // Recursively collect all certificates we need, following
1416        // the chain of previous_message_blocks back to next_outbox_height.
1417        let mut certificates = BTreeMap::new();
1418        let mut current_height = height;
1419        // On the first iteration we only have a height; subsequent iterations
1420        // also carry the hash from `previous_message_blocks`.
1421        let mut current_hash: Option<CryptoHash> = None;
1422
1423        // Stop if we've reached the height we've already processed.
1424        while current_height >= next_outbox_height {
1425            // Try local storage first — avoids a validator round-trip when
1426            // the certificate was already downloaded by a prior sync cycle,
1427            // another receiver chain, or a concurrent notification handler.
1428            let certificate = if let Some(local) = self
1429                .try_read_local_certificate(sender_chain_id, current_height, current_hash)
1430                .await?
1431            {
1432                local
1433            } else {
1434                let downloaded = self
1435                    .requests_scheduler
1436                    .download_certificates_by_heights(
1437                        remote_node,
1438                        sender_chain_id,
1439                        vec![current_height],
1440                    )
1441                    .await?;
1442                let Some(certificate) = downloaded.into_iter().next() else {
1443                    return Err(chain_client::Error::CannotDownloadMissingSenderBlock {
1444                        chain_id: sender_chain_id,
1445                        height: current_height,
1446                    });
1447                };
1448                self.storage_client().cache_certificate(certificate)
1449            };
1450
1451            // Validate the certificate.
1452            Client::<Env>::check_certificate(max_epoch, &committees, &certificate)?
1453                .into_result()?;
1454
1455            // Check if there's a previous message block to our chain.
1456            let block = certificate.block();
1457            let next = block
1458                .body
1459                .previous_message_blocks
1460                .get(&receiver_chain_id)
1461                .map(|(prev_hash, prev_height)| (*prev_hash, *prev_height));
1462
1463            // Store this certificate.
1464            certificates.insert(current_height, certificate);
1465
1466            if let Some((prev_hash, prev_height)) = next {
1467                // Continue with the previous block (now with its hash for local lookup).
1468                current_height = prev_height;
1469                current_hash = Some(prev_hash);
1470            } else {
1471                // No more dependencies.
1472                break;
1473            }
1474        }
1475
1476        if certificates.is_empty() {
1477            self.retry_pending_cross_chain_requests(sender_chain_id)
1478                .await?;
1479        }
1480
1481        // Process certificates in ascending block height order (BTreeMap keeps them sorted).
1482        for certificate in certificates.into_values() {
1483            self.receive_sender_certificate(
1484                certificate,
1485                ReceiveCertificateMode::AlreadyChecked,
1486                Some(vec![remote_node.clone()]),
1487            )
1488            .await?;
1489        }
1490
1491        Ok(())
1492    }
1493
1494    /// Downloads event-bearing blocks for the given streams by walking the
1495    /// `previous_event_blocks` linked list backwards from `height`, stopping when we
1496    /// reach blocks that are already executed locally or whose events we already track.
1497    async fn download_event_bearing_blocks(
1498        &self,
1499        publisher_chain_id: ChainId,
1500        initial_blocks: BTreeSet<(BlockHeight, CryptoHash)>,
1501        local_next_block_height: BlockHeight,
1502        subscribed_streams: &BTreeSet<StreamId>,
1503        remote_node: &RemoteNode<Env::ValidatorNode>,
1504    ) -> Result<(), chain_client::Error> {
1505        if initial_blocks.is_empty() {
1506            return Ok(());
1507        }
1508        let (max_epoch, committees) = self.admin_committees().await?;
1509
1510        let mut certificates = BTreeMap::new();
1511        let mut blocks_to_fetch = initial_blocks;
1512        let next_expected_events = self
1513            .local_node
1514            .next_expected_events(
1515                publisher_chain_id,
1516                subscribed_streams.iter().cloned().collect(),
1517            )
1518            .await?;
1519
1520        while let Some((current_height, current_hash)) = blocks_to_fetch.pop_last() {
1521            if current_height < local_next_block_height {
1522                continue; // Already executed locally.
1523            }
1524            if certificates.contains_key(&current_height) {
1525                continue;
1526            }
1527
1528            let certificate = if let Some(certificate) =
1529                self.storage_client().read_certificate(current_hash).await?
1530            {
1531                certificate
1532            } else {
1533                let downloaded = self
1534                    .requests_scheduler
1535                    .download_certificates(remote_node, publisher_chain_id, current_height, 1)
1536                    .await?;
1537                let Some(certificate) = downloaded.into_iter().next() else {
1538                    tracing::debug!(
1539                        validator = remote_node.address(),
1540                        %publisher_chain_id,
1541                        height = %current_height,
1542                        "failed to download event publisher block"
1543                    );
1544                    continue;
1545                };
1546
1547                Client::<Env>::check_certificate(max_epoch, &committees, &certificate)?
1548                    .into_result()?;
1549
1550                self.storage_client().cache_certificate(certificate)
1551            };
1552
1553            let block = certificate.block();
1554            // Walk previous_event_blocks for subscribed streams.
1555            for stream_id in subscribed_streams {
1556                if let Some((prev_hash, prev_height)) =
1557                    block.body.previous_event_blocks.get(stream_id)
1558                {
1559                    if next_expected_events.get(stream_id).is_some_and(|index| {
1560                        block
1561                            .body
1562                            .events
1563                            .iter()
1564                            .flatten()
1565                            .find(|event| event.stream_id == *stream_id)
1566                            .is_some_and(|event| event.index == *index)
1567                    }) {
1568                        continue;
1569                    }
1570                    if !certificates.contains_key(prev_height) {
1571                        blocks_to_fetch.insert((*prev_height, *prev_hash));
1572                    }
1573                }
1574            }
1575
1576            certificates.insert(current_height, certificate);
1577        }
1578
1579        // Process in ascending height order.
1580        for certificate in certificates.into_values() {
1581            self.receive_sender_certificate(
1582                certificate,
1583                ReceiveCertificateMode::AlreadyChecked,
1584                Some(vec![remote_node.clone()]),
1585            )
1586            .await?;
1587        }
1588
1589        Ok(())
1590    }
1591
1592    /// Queries a validator for event-bearing blocks for the given streams, then downloads
1593    /// them.
1594    async fn sync_events_from_node(
1595        &self,
1596        chain_id: ChainId,
1597        stream_ids: &BTreeSet<StreamId>,
1598        remote_node: &RemoteNode<Env::ValidatorNode>,
1599    ) -> Result<(), chain_client::Error> {
1600        let stream_ids_vec: Vec<_> = stream_ids.iter().cloned().collect();
1601        let mut initial_blocks = BTreeSet::new();
1602        for chunk in stream_ids_vec.chunks(self.options.max_event_stream_queries) {
1603            let query = ChainInfoQuery::new(chain_id).with_previous_event_blocks(chunk.to_vec());
1604            let info = remote_node.handle_chain_info_query(query).await?;
1605            initial_blocks.extend(info.requested_previous_event_blocks.values().copied());
1606        }
1607        let local_height = match self.local_node.chain_info(chain_id).await {
1608            Ok(info) => info.next_block_height,
1609            Err(LocalNodeError::InactiveChain(_) | LocalNodeError::BlobsNotFound(_)) => {
1610                BlockHeight::ZERO
1611            }
1612            Err(error) => return Err(error.into()),
1613        };
1614        self.download_event_bearing_blocks(
1615            chain_id,
1616            initial_blocks,
1617            local_height,
1618            stream_ids,
1619            remote_node,
1620        )
1621        .await
1622    }
1623
1624    #[instrument(
1625        level = "trace", skip_all,
1626        fields(certificate_hash = ?incoming_certificate.hash()),
1627    )]
1628    fn check_certificate(
1629        highest_known_epoch: Epoch,
1630        committees: &BTreeMap<Epoch, Arc<Committee>>,
1631        incoming_certificate: &ConfirmedBlockCertificate,
1632    ) -> Result<CheckCertificateResult, NodeError> {
1633        let block = incoming_certificate.block();
1634        // Check that certificates are valid w.r.t one of our trusted committees.
1635        if block.header.epoch > highest_known_epoch {
1636            return Ok(CheckCertificateResult::FutureEpoch);
1637        }
1638        if let Some(known_committee) = committees.get(&block.header.epoch) {
1639            // This epoch is recognized by our chain. Let's verify the
1640            // certificate.
1641            incoming_certificate.check(known_committee)?;
1642            Ok(CheckCertificateResult::New)
1643        } else {
1644            // We don't accept a certificate from a committee that was retired.
1645            Ok(CheckCertificateResult::OldEpoch)
1646        }
1647    }
1648
1649    /// Downloads and processes any certificates we are missing for the given chain.
1650    ///
1651    /// Whether manager values are fetched depends on the chain's follow-only state.
1652    #[instrument(level = "trace", skip_all)]
1653    async fn synchronize_chain_state(
1654        &self,
1655        chain_id: ChainId,
1656    ) -> Result<Box<ChainInfo>, chain_client::Error> {
1657        let (_, committee) = self.admin_committee().await?;
1658        self.synchronize_chain_from_committee(chain_id, committee)
1659            .await
1660    }
1661
1662    /// Downloads certificates for the given chain from the given committee.
1663    ///
1664    /// If the chain is not in follow-only mode, also fetches and processes manager values
1665    /// (timeout certificates, proposals, locking blocks) for consensus participation.
1666    #[instrument(level = "trace", skip_all)]
1667    pub(crate) async fn synchronize_chain_from_committee(
1668        &self,
1669        chain_id: ChainId,
1670        committee: Arc<Committee>,
1671    ) -> Result<Box<ChainInfo>, chain_client::Error> {
1672        #[cfg(with_metrics)]
1673        let _latency = if !self.is_chain_follow_only(chain_id) {
1674            Some(metrics::SYNCHRONIZE_CHAIN_STATE_LATENCY.measure_latency())
1675        } else {
1676            None
1677        };
1678
1679        let validators = self.make_nodes(&committee)?;
1680        Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
1681        communicate_with_quorum(
1682            &validators,
1683            &committee,
1684            |_: &()| (),
1685            |remote_node| async move {
1686                self.synchronize_chain_state_from(&remote_node, chain_id)
1687                    .await
1688            },
1689            self.options.quorum_grace_period,
1690        )
1691        .await?;
1692
1693        self.local_node
1694            .chain_info(chain_id)
1695            .await
1696            .map_err(Into::into)
1697    }
1698
1699    /// Downloads any certificates from the specified validator that we are missing for the given
1700    /// chain.
1701    ///
1702    /// If the chain is not in follow-only mode, also fetches and processes manager values
1703    /// (timeout certificates, proposals, locking blocks) for consensus participation.
1704    #[instrument(level = "trace", skip(self, remote_node, chain_id))]
1705    pub(crate) async fn synchronize_chain_state_from(
1706        &self,
1707        remote_node: &RemoteNode<Env::ValidatorNode>,
1708        chain_id: ChainId,
1709    ) -> Result<(), chain_client::Error> {
1710        let with_manager_values = !self.is_chain_follow_only(chain_id);
1711        let query = if with_manager_values {
1712            ChainInfoQuery::new(chain_id).with_manager_values()
1713        } else {
1714            ChainInfoQuery::new(chain_id)
1715        };
1716        let remote_info = remote_node.handle_chain_info_query(query).await?;
1717        let local_info = self
1718            .download_certificates_from(remote_node, chain_id, remote_info.next_block_height, None)
1719            .await?;
1720
1721        if !with_manager_values {
1722            return Ok(());
1723        }
1724
1725        // If we are at the same height as the remote node, we also update our chain manager.
1726        let local_height = local_info.next_block_height;
1727        if local_height != remote_info.next_block_height {
1728            debug!(
1729                remote_node = remote_node.address(),
1730                remote_height = %remote_info.next_block_height,
1731                local_height = %local_height,
1732                "synced from validator, but remote height and local height are different",
1733            );
1734            return Ok(());
1735        };
1736
1737        if let Some(timeout) = remote_info.manager.timeout {
1738            self.handle_certificate(*timeout).await?;
1739        }
1740        let mut proposals = Vec::new();
1741        if let Some(proposal) = remote_info.manager.requested_signed_proposal {
1742            proposals.push(*proposal);
1743        }
1744        if let Some(proposal) = remote_info.manager.requested_proposed {
1745            proposals.push(*proposal);
1746        }
1747        if let Some(locking) = remote_info.manager.requested_locking {
1748            match *locking {
1749                LockingBlock::Fast(proposal) => {
1750                    proposals.push(proposal);
1751                }
1752                LockingBlock::Regular(cert) => {
1753                    let hash = cert.hash();
1754                    if let Err(error) = self.try_process_locking_block_from(remote_node, cert).await
1755                    {
1756                        debug!(
1757                            remote_node = remote_node.address(),
1758                            %hash,
1759                            height = %local_height,
1760                            %error,
1761                            "skipping locked block from validator",
1762                        );
1763                    }
1764                }
1765            }
1766        }
1767        'proposal_loop: for proposal in proposals {
1768            let owner: AccountOwner = proposal.owner();
1769            if let Err(mut err) = self
1770                .local_node
1771                .handle_block_proposal(proposal.clone())
1772                .await
1773            {
1774                if let LocalNodeError::BlobsNotFound(_) = &err {
1775                    let required_blob_ids = proposal.required_blob_ids().collect::<Vec<_>>();
1776                    if !required_blob_ids.is_empty() {
1777                        let mut blobs = Vec::new();
1778                        for blob_id in required_blob_ids {
1779                            let blob_content = match self
1780                                .requests_scheduler
1781                                .download_pending_blob(remote_node, chain_id, blob_id)
1782                                .await
1783                            {
1784                                Ok(content) => content,
1785                                Err(error) => {
1786                                    info!(
1787                                        remote_node = remote_node.address(),
1788                                        height = %local_height,
1789                                        proposer = %owner,
1790                                        %blob_id,
1791                                        %error,
1792                                        "skipping proposal from validator; failed to download blob",
1793                                    );
1794                                    continue 'proposal_loop;
1795                                }
1796                            };
1797                            blobs.push(Blob::new(blob_content));
1798                        }
1799                        self.local_node
1800                            .handle_pending_blobs(chain_id, blobs)
1801                            .await?;
1802                        // We found the missing blobs: retry.
1803                        if let Err(new_err) = self
1804                            .local_node
1805                            .handle_block_proposal(proposal.clone())
1806                            .await
1807                        {
1808                            err = new_err;
1809                        } else {
1810                            continue;
1811                        }
1812                    }
1813                    if let LocalNodeError::BlobsNotFound(blob_ids) = &err {
1814                        self.update_local_node_with_blobs_from(
1815                            blob_ids.clone(),
1816                            slice::from_ref(remote_node),
1817                        )
1818                        .await?;
1819                        // We found the missing blobs: retry.
1820                        if let Err(new_err) = self
1821                            .local_node
1822                            .handle_block_proposal(proposal.clone())
1823                            .await
1824                        {
1825                            err = new_err;
1826                        } else {
1827                            continue;
1828                        }
1829                    }
1830                }
1831                while let LocalNodeError::WorkerError(WorkerError::ChainError(chain_err)) = &err {
1832                    if let ChainError::MissingCrossChainUpdate {
1833                        chain_id,
1834                        origin,
1835                        height,
1836                    } = &**chain_err
1837                    {
1838                        self.download_sender_block_with_sending_ancestors(
1839                            *chain_id,
1840                            *origin,
1841                            *height,
1842                            remote_node,
1843                        )
1844                        .await?;
1845                        // Retry
1846                        if let Err(new_err) = self
1847                            .local_node
1848                            .handle_block_proposal(proposal.clone())
1849                            .await
1850                        {
1851                            err = new_err;
1852                        } else {
1853                            continue 'proposal_loop;
1854                        }
1855                    } else {
1856                        break;
1857                    }
1858                }
1859
1860                debug!(
1861                    remote_node = remote_node.address(),
1862                    proposer = %owner,
1863                    height = %local_height,
1864                    error = %err,
1865                    "skipping proposal from validator",
1866                );
1867            }
1868        }
1869        Ok(())
1870    }
1871
1872    async fn try_process_locking_block_from(
1873        &self,
1874        remote_node: &RemoteNode<Env::ValidatorNode>,
1875        certificate: GenericCertificate<ValidatedBlock>,
1876    ) -> Result<(), chain_client::Error> {
1877        let chain_id = certificate.inner().chain_id();
1878        match self.handle_certificate(certificate.clone()).await {
1879            Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1880                let mut blobs = Vec::new();
1881                for blob_id in blob_ids {
1882                    let blob_content = self
1883                        .requests_scheduler
1884                        .download_pending_blob(remote_node, chain_id, blob_id)
1885                        .await?;
1886                    blobs.push(Blob::new(blob_content));
1887                }
1888                self.local_node
1889                    .handle_pending_blobs(chain_id, blobs)
1890                    .await?;
1891                self.handle_certificate(certificate).await?;
1892                Ok(())
1893            }
1894            Err(err) => Err(err.into()),
1895            Ok(_) => Ok(()),
1896        }
1897    }
1898
1899    /// Downloads and processes from the specified validators a confirmed block certificates that
1900    /// use the given blobs. If this succeeds, the blob will be in our storage.
1901    async fn update_local_node_with_blobs_from(
1902        &self,
1903        blob_ids: Vec<BlobId>,
1904        remote_nodes: &[RemoteNode<Env::ValidatorNode>],
1905    ) -> Result<Vec<Arc<Blob>>, chain_client::Error> {
1906        let timeout = self.options.blob_download_timeout;
1907        // Deduplicate IDs.
1908        let blob_ids = blob_ids.into_iter().collect::<BTreeSet<_>>();
1909        stream::iter(blob_ids.into_iter().map(|blob_id| {
1910            communicate_concurrently(
1911                remote_nodes,
1912                async move |remote_node| {
1913                    let certificate = self
1914                        .requests_scheduler
1915                        .download_certificate_for_blob(&remote_node, blob_id)
1916                        .await?;
1917                    self.receive_sender_certificate(
1918                        self.storage_client().cache_certificate(certificate),
1919                        ReceiveCertificateMode::NeedsCheck,
1920                        Some(vec![remote_node.clone()]),
1921                    )
1922                    .await?;
1923                    let blob = self
1924                        .local_node
1925                        .storage_client()
1926                        .read_blob(blob_id)
1927                        .await?
1928                        .ok_or_else(|| LocalNodeError::BlobsNotFound(vec![blob_id]))?;
1929                    Result::<_, chain_client::Error>::Ok(blob)
1930                },
1931                move |_| chain_client::Error::from(NodeError::BlobsNotFound(vec![blob_id])),
1932                timeout,
1933            )
1934        }))
1935        .buffer_unordered(self.options.max_joined_tasks)
1936        .collect::<Vec<_>>()
1937        .await
1938        .into_iter()
1939        .collect()
1940    }
1941
1942    /// Attempts to execute the block locally. If any incoming message execution fails, that
1943    /// message is rejected and execution is retried, until the block accepts only messages
1944    /// that succeed.
1945    ///
1946    /// Attempts to execute the block locally with a specified policy for handling bundle failures.
1947    /// If any attempt to read a blob fails, the blob is downloaded and execution is retried.
1948    ///
1949    /// Returns the modified block (bundles may be rejected/removed based on the policy)
1950    /// and the execution result.
1951    #[instrument(level = "trace", skip(self, block))]
1952    async fn stage_block_execution(
1953        &self,
1954        block: ProposedBlock,
1955        round: Option<u32>,
1956        published_blobs: Vec<Blob>,
1957        policy: BundleExecutionPolicy,
1958    ) -> Result<(Block, ChainInfoResponse, HashSet<ChainId>), chain_client::Error> {
1959        let mut downloaded_events = HashSet::<EventId>::new();
1960        loop {
1961            let result = self
1962                .local_node
1963                .stage_block_execution(
1964                    block.clone(),
1965                    round,
1966                    published_blobs.clone(),
1967                    policy.clone(),
1968                )
1969                .await;
1970            if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1971                let validators = self.validator_nodes().await?;
1972                self.update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1973                    .await?;
1974                continue; // We found the missing blob: retry.
1975            }
1976            if let Err(LocalNodeError::EventsNotFound(event_ids)) = &result {
1977                let new_events = event_ids
1978                    .iter()
1979                    .filter(|id| !downloaded_events.contains(id))
1980                    .cloned()
1981                    .collect::<Vec<_>>();
1982                if !new_events.is_empty() {
1983                    Box::pin(self.download_certificates_for_events(&new_events)).await?;
1984                    downloaded_events.extend(new_events);
1985                    continue; // We downloaded new publisher chain data: retry.
1986                }
1987                // All reported events were already downloaded; don't loop forever.
1988            }
1989            if let Ok((_, executed_block, _, _, _)) = &result {
1990                let hash = CryptoHash::new(executed_block);
1991                let notification = Notification {
1992                    chain_id: executed_block.header.chain_id,
1993                    reason: Reason::BlockExecuted {
1994                        height: executed_block.header.height,
1995                        hash,
1996                    },
1997                };
1998                self.notifier.notify(&[notification]);
1999            }
2000            let (
2001                _modified_block,
2002                executed_block,
2003                response,
2004                _resource_tracker,
2005                never_reject_origins,
2006            ) = result?;
2007            return Ok((executed_block, response, never_reject_origins));
2008        }
2009    }
2010}
2011
2012/// Performs `f` in parallel on multiple nodes, starting with a quadratically increasing delay on
2013/// each subsequent node. Returns error `err` if all of the nodes fail.
2014async fn communicate_concurrently<'a, A, E1, E2, F, G, R, V>(
2015    nodes: &[RemoteNode<A>],
2016    f: F,
2017    err: G,
2018    timeout: Duration,
2019) -> Result<V, E2>
2020where
2021    F: Clone + FnOnce(RemoteNode<A>) -> R,
2022    RemoteNode<A>: Clone,
2023    G: FnOnce(Vec<(ValidatorPublicKey, E1)>) -> E2,
2024    R: Future<Output = Result<V, E1>> + 'a,
2025{
2026    let mut nodes = nodes.to_vec();
2027    nodes.shuffle(&mut rand::thread_rng());
2028    let mut stream = nodes
2029        .iter()
2030        .zip(0..)
2031        .map(|(remote_node, i)| {
2032            let fun = f.clone();
2033            let node = remote_node.clone();
2034            async move {
2035                linera_base::time::timer::sleep(timeout * i * i).await;
2036                fun(node).await.map_err(|err| (remote_node.public_key, err))
2037            }
2038        })
2039        .collect::<FuturesUnordered<_>>();
2040    let mut errors = vec![];
2041    while let Some(maybe_result) = stream.next().await {
2042        match maybe_result {
2043            Ok(result) => return Ok(result),
2044            Err(error) => errors.push(error),
2045        };
2046    }
2047    Err(err(errors))
2048}
2049
2050/// Wrapper for `AbortHandle` that aborts when its dropped.
2051#[must_use]
2052pub struct AbortOnDrop(pub AbortHandle);
2053
2054impl Drop for AbortOnDrop {
2055    #[instrument(level = "trace", skip(self))]
2056    fn drop(&mut self) {
2057        self.0.abort();
2058    }
2059}
2060
2061/// A pending proposed block, together with its published blobs.
2062#[derive(Clone, Serialize, Deserialize)]
2063pub struct PendingProposal {
2064    pub block: ProposedBlock,
2065    pub blobs: Vec<Blob>,
2066    /// The execution outcome from the AutoRetry execution, used as a sanity check
2067    /// against the committed execution outcome.
2068    #[serde(default)]
2069    pub auto_retry_outcome: Option<BlockExecutionOutcome>,
2070    /// The round in which this proposal was first submitted, if any.
2071    #[serde(default)]
2072    pub round: Option<Round>,
2073}
2074
2075enum ReceiveCertificateMode {
2076    NeedsCheck,
2077    AlreadyChecked,
2078}
2079
2080enum CheckCertificateResult {
2081    OldEpoch,
2082    New,
2083    FutureEpoch,
2084}
2085
2086impl CheckCertificateResult {
2087    fn into_result(self) -> Result<(), chain_client::Error> {
2088        match self {
2089            Self::OldEpoch => Err(chain_client::Error::CommitteeDeprecationError),
2090            Self::New => Ok(()),
2091            Self::FutureEpoch => Err(chain_client::Error::CommitteeSynchronizationError),
2092        }
2093    }
2094}
2095
2096/// Creates a compressed Contract, Service and bytecode.
2097#[cfg(not(target_arch = "wasm32"))]
2098pub async fn create_bytecode_blobs(
2099    contract: Bytecode,
2100    service: Bytecode,
2101    vm_runtime: VmRuntime,
2102) -> (Vec<Blob>, ModuleId) {
2103    match vm_runtime {
2104        VmRuntime::Wasm => {
2105            let (compressed_contract, compressed_service) =
2106                tokio::task::spawn_blocking(move || (contract.compress(), service.compress()))
2107                    .await
2108                    .expect("Compression should not panic");
2109            let contract_blob = Blob::new_contract_bytecode(compressed_contract);
2110            let service_blob = Blob::new_service_bytecode(compressed_service);
2111            let module_id =
2112                ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
2113            (vec![contract_blob, service_blob], module_id)
2114        }
2115        VmRuntime::Evm => {
2116            let compressed_contract = contract.compress();
2117            let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
2118            let module_id = ModuleId::new(
2119                evm_contract_blob.id().hash,
2120                evm_contract_blob.id().hash,
2121                vm_runtime,
2122            );
2123            (vec![evm_contract_blob], module_id)
2124        }
2125    }
2126}