linera_core/client/
mod.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    cmp::Ordering,
7    collections::{BTreeMap, BTreeSet, HashSet},
8    sync::{Arc, RwLock},
9};
10
11use custom_debug_derive::Debug;
12use futures::{
13    future::Future,
14    stream::{self, AbortHandle, FuturesUnordered, StreamExt},
15};
16#[cfg(with_metrics)]
17use linera_base::prometheus_util::MeasureLatency as _;
18use linera_base::{
19    crypto::{CryptoHash, Signer, ValidatorPublicKey},
20    data_types::{ArithmeticError, Blob, BlockHeight, ChainDescription, Epoch},
21    ensure,
22    identifiers::{AccountOwner, BlobId, BlobType, ChainId, StreamId},
23    time::Duration,
24};
25#[cfg(not(target_arch = "wasm32"))]
26use linera_base::{data_types::Bytecode, identifiers::ModuleId, vm::VmRuntime};
27use linera_chain::{
28    data_types::{
29        BlockProposal, ChainAndHeight, IncomingBundle, LiteVote, MessageAction, ProposedBlock,
30        Transaction,
31    },
32    manager::LockingBlock,
33    types::{
34        Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
35        LiteCertificate, ValidatedBlock, ValidatedBlockCertificate,
36    },
37    ChainError, ChainExecutionContext,
38};
39use linera_execution::committee::Committee;
40use linera_storage::{ResultReadCertificates, Storage as _};
41use rand::{
42    distributions::{Distribution, WeightedIndex},
43    seq::SliceRandom,
44};
45use received_log::ReceivedLogs;
46use serde::{Deserialize, Serialize};
47use tokio::sync::mpsc;
48use tracing::{debug, error, info, instrument, trace, warn};
49
50use crate::{
51    data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse, RoundTimeout},
52    environment::Environment,
53    local_node::{LocalChainInfoExt as _, LocalNodeClient, LocalNodeError},
54    node::{CrossChainMessageDelivery, NodeError, ValidatorNode, ValidatorNodeProvider as _},
55    notifier::ChannelNotifier,
56    remote_node::RemoteNode,
57    updater::{communicate_with_quorum, CommunicateAction, ValidatorUpdater},
58    worker::{Notification, ProcessableCertificate, WorkerError, WorkerState},
59    CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
60};
61
62pub mod chain_client;
63pub use chain_client::ChainClient;
64
65pub use crate::data_types::ClientOutcome;
66
67#[cfg(test)]
68#[path = "../unit_tests/client_tests.rs"]
69mod client_tests;
70mod received_log;
71mod validator_trackers;
72
73#[cfg(with_metrics)]
74mod metrics {
75    use std::sync::LazyLock;
76
77    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
78    use prometheus::HistogramVec;
79
80    pub static PROCESS_INBOX_WITHOUT_PREPARE_LATENCY: LazyLock<HistogramVec> =
81        LazyLock::new(|| {
82            register_histogram_vec(
83                "process_inbox_latency",
84                "process_inbox latency",
85                &[],
86                exponential_bucket_latencies(500.0),
87            )
88        });
89
90    pub static PREPARE_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
91        register_histogram_vec(
92            "prepare_chain_latency",
93            "prepare_chain latency",
94            &[],
95            exponential_bucket_latencies(500.0),
96        )
97    });
98
99    pub static SYNCHRONIZE_CHAIN_STATE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
100        register_histogram_vec(
101            "synchronize_chain_state_latency",
102            "synchronize_chain_state latency",
103            &[],
104            exponential_bucket_latencies(500.0),
105        )
106    });
107
108    pub static EXECUTE_BLOCK_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
109        register_histogram_vec(
110            "execute_block_latency",
111            "execute_block latency",
112            &[],
113            exponential_bucket_latencies(500.0),
114        )
115    });
116
117    pub static FIND_RECEIVED_CERTIFICATES_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
118        register_histogram_vec(
119            "find_received_certificates_latency",
120            "find_received_certificates latency",
121            &[],
122            exponential_bucket_latencies(500.0),
123        )
124    });
125}
126
127pub static DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE: u64 = 500;
128pub static DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE: usize = 20_000;
129
130/// Policies for automatically handling incoming messages.
131#[derive(Clone, Debug)]
132pub struct MessagePolicy {
133    /// The blanket policy applied to all messages.
134    blanket: BlanketMessagePolicy,
135    /// A collection of chains which restrict the origin of messages to be
136    /// accepted. `Option::None` means that messages from all chains are accepted. An empty
137    /// `HashSet` denotes that messages from no chains are accepted.
138    restrict_chain_ids_to: Option<HashSet<ChainId>>,
139}
140
141#[derive(Copy, Clone, Debug, clap::ValueEnum)]
142pub enum BlanketMessagePolicy {
143    /// Automatically accept all incoming messages. Reject them only if execution fails.
144    Accept,
145    /// Automatically reject tracked messages, ignore or skip untracked messages, but accept
146    /// protected ones.
147    Reject,
148    /// Don't include any messages in blocks, and don't make any decision whether to accept or
149    /// reject.
150    Ignore,
151}
152
153impl MessagePolicy {
154    pub fn new(
155        blanket: BlanketMessagePolicy,
156        restrict_chain_ids_to: Option<HashSet<ChainId>>,
157    ) -> Self {
158        Self {
159            blanket,
160            restrict_chain_ids_to,
161        }
162    }
163
164    #[cfg(with_testing)]
165    pub fn new_accept_all() -> Self {
166        Self {
167            blanket: BlanketMessagePolicy::Accept,
168            restrict_chain_ids_to: None,
169        }
170    }
171
172    #[instrument(level = "trace", skip(self))]
173    fn must_handle(&self, bundle: &mut IncomingBundle) -> bool {
174        if self.is_reject() {
175            if bundle.bundle.is_skippable() {
176                return false;
177            } else if !bundle.bundle.is_protected() {
178                bundle.action = MessageAction::Reject;
179            }
180        }
181        match &self.restrict_chain_ids_to {
182            None => true,
183            Some(chains) => chains.contains(&bundle.origin),
184        }
185    }
186
187    #[instrument(level = "trace", skip(self))]
188    fn is_ignore(&self) -> bool {
189        matches!(self.blanket, BlanketMessagePolicy::Ignore)
190    }
191
192    #[instrument(level = "trace", skip(self))]
193    fn is_reject(&self) -> bool {
194        matches!(self.blanket, BlanketMessagePolicy::Reject)
195    }
196}
197
198#[derive(Debug, Clone, Copy)]
199pub enum TimingType {
200    ExecuteOperations,
201    ExecuteBlock,
202    SubmitBlockProposal,
203    UpdateValidators,
204}
205
206/// Defines how we listen to a chain:
207/// - do we care about every block notification?
208/// - or do we only care about blocks containing events from some particular streams?
209#[derive(Debug, Clone, PartialEq, Eq)]
210pub enum ListeningMode {
211    /// Listen to everything.
212    FullChain,
213    /// Only listen to blocks which contain events from those streams.
214    EventsOnly(BTreeSet<StreamId>),
215}
216
217impl PartialOrd for ListeningMode {
218    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
219        match (self, other) {
220            (ListeningMode::FullChain, ListeningMode::FullChain) => Some(Ordering::Equal),
221            (ListeningMode::FullChain, _) => Some(Ordering::Greater),
222            (_, ListeningMode::FullChain) => Some(Ordering::Less),
223            (ListeningMode::EventsOnly(events_a), ListeningMode::EventsOnly(events_b)) => {
224                if events_a.is_superset(events_b) {
225                    Some(Ordering::Greater)
226                } else if events_b.is_superset(events_a) {
227                    Some(Ordering::Less)
228                } else {
229                    None
230                }
231            }
232        }
233    }
234}
235
236impl ListeningMode {
237    pub fn extend(&mut self, other: Option<ListeningMode>) {
238        match (self, other) {
239            (_, None) => (),
240            (ListeningMode::FullChain, _) => (),
241            (mode, Some(ListeningMode::FullChain)) => {
242                *mode = ListeningMode::FullChain;
243            }
244            (
245                ListeningMode::EventsOnly(self_events),
246                Some(ListeningMode::EventsOnly(other_events)),
247            ) => {
248                self_events.extend(other_events);
249            }
250        }
251    }
252}
253
254/// A builder that creates [`ChainClient`]s which share the cache and notifiers.
255pub struct Client<Env: Environment> {
256    environment: Env,
257    /// Local node to manage the execution state and the local storage of the chains that we are
258    /// tracking.
259    local_node: LocalNodeClient<Env::Storage>,
260    /// The admin chain ID.
261    admin_id: ChainId,
262    /// Chains that should be tracked by the client.
263    // TODO(#2412): Merge with set of chains the client is receiving notifications from validators
264    tracked_chains: Arc<RwLock<HashSet<ChainId>>>,
265    /// References to clients waiting for chain notifications.
266    notifier: Arc<ChannelNotifier<Notification>>,
267    /// Chain state for the managed chains.
268    chains: papaya::HashMap<ChainId, chain_client::State>,
269    /// Configuration options.
270    options: chain_client::Options,
271}
272
273impl<Env: Environment> Client<Env> {
274    /// Creates a new `Client` with a new cache and notifiers.
275    #[instrument(level = "trace", skip_all)]
276    #[allow(clippy::too_many_arguments)]
277    pub fn new(
278        environment: Env,
279        admin_id: ChainId,
280        long_lived_services: bool,
281        tracked_chains: impl IntoIterator<Item = ChainId>,
282        name: impl Into<String>,
283        chain_worker_ttl: Duration,
284        sender_chain_worker_ttl: Duration,
285        options: chain_client::Options,
286        block_cache_size: usize,
287        execution_state_cache_size: usize,
288    ) -> Self {
289        let tracked_chains = Arc::new(RwLock::new(tracked_chains.into_iter().collect()));
290        let state = WorkerState::new_for_client(
291            name.into(),
292            environment.storage().clone(),
293            tracked_chains.clone(),
294            block_cache_size,
295            execution_state_cache_size,
296        )
297        .with_long_lived_services(long_lived_services)
298        .with_allow_inactive_chains(true)
299        .with_allow_messages_from_deprecated_epochs(true)
300        .with_chain_worker_ttl(chain_worker_ttl)
301        .with_sender_chain_worker_ttl(sender_chain_worker_ttl);
302        let local_node = LocalNodeClient::new(state);
303
304        Self {
305            environment,
306            local_node,
307            chains: papaya::HashMap::new(),
308            admin_id,
309            tracked_chains,
310            notifier: Arc::new(ChannelNotifier::default()),
311            options,
312        }
313    }
314
315    /// Returns the storage client used by this client's local node.
316    pub fn storage_client(&self) -> &Env::Storage {
317        self.environment.storage()
318    }
319
320    pub fn validator_node_provider(&self) -> &Env::Network {
321        self.environment.network()
322    }
323
324    /// Returns a reference to the [`Signer`] of the client.
325    #[instrument(level = "trace", skip(self))]
326    pub fn signer(&self) -> &impl Signer {
327        self.environment.signer()
328    }
329
330    /// Adds a chain to the set of chains tracked by the local node.
331    #[instrument(level = "trace", skip(self))]
332    pub fn track_chain(&self, chain_id: ChainId) {
333        self.tracked_chains
334            .write()
335            .expect("Panics should not happen while holding a lock to `tracked_chains`")
336            .insert(chain_id);
337    }
338
339    /// Creates a new `ChainClient`.
340    #[instrument(level = "trace", skip_all, fields(chain_id, next_block_height))]
341    pub fn create_chain_client(
342        self: &Arc<Self>,
343        chain_id: ChainId,
344        block_hash: Option<CryptoHash>,
345        next_block_height: BlockHeight,
346        pending_proposal: Option<PendingProposal>,
347        preferred_owner: Option<AccountOwner>,
348        timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
349    ) -> ChainClient<Env> {
350        // If the entry already exists we assume that the entry is more up to date than
351        // the arguments: If they were read from the wallet file, they might be stale.
352        self.chains.pin().get_or_insert_with(chain_id, || {
353            chain_client::State::new(pending_proposal.clone())
354        });
355
356        ChainClient::new(
357            self.clone(),
358            chain_id,
359            self.options.clone(),
360            block_hash,
361            next_block_height,
362            preferred_owner,
363            timing_sender,
364        )
365    }
366
367    /// Fetches the chain description blob if needed, and returns the chain info.
368    async fn fetch_chain_info(
369        &self,
370        chain_id: ChainId,
371        validators: &[RemoteNode<Env::ValidatorNode>],
372    ) -> Result<Box<ChainInfo>, chain_client::Error> {
373        match self.local_node.chain_info(chain_id).await {
374            Ok(info) => Ok(info),
375            Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
376                // Make sure the admin chain is up to date.
377                self.synchronize_chain_state(self.admin_id).await?;
378                // If the chain is missing then the error is a WorkerError
379                // and so a BlobsNotFound
380                self.update_local_node_with_blobs_from(blob_ids, validators)
381                    .await?;
382                Ok(self.local_node.chain_info(chain_id).await?)
383            }
384            Err(err) => Err(err.into()),
385        }
386    }
387
388    fn weighted_select(
389        remaining_validators: &mut Vec<RemoteNode<Env::ValidatorNode>>,
390        remaining_weights: &mut Vec<u64>,
391    ) -> Option<RemoteNode<Env::ValidatorNode>> {
392        if remaining_weights.is_empty() {
393            return None;
394        }
395        let dist = WeightedIndex::new(remaining_weights.clone()).unwrap();
396        let idx = dist.sample(&mut rand::thread_rng());
397        remaining_weights.remove(idx);
398        Some(remaining_validators.remove(idx))
399    }
400
401    /// Downloads and processes all certificates up to (excluding) the specified height.
402    #[instrument(level = "trace", skip(self))]
403    async fn download_certificates(
404        &self,
405        chain_id: ChainId,
406        target_next_block_height: BlockHeight,
407    ) -> Result<Box<ChainInfo>, chain_client::Error> {
408        let (_, committee) = self.admin_committee().await?;
409        let mut remaining_validators = self.make_nodes(&committee)?;
410        let mut info = self
411            .fetch_chain_info(chain_id, &remaining_validators)
412            .await?;
413        // Determining the weights of the validators
414        let mut remaining_weights = remaining_validators
415            .iter()
416            .map(|validator| {
417                let validator_state = committee.validators.get(&validator.public_key).unwrap();
418                validator_state.votes
419            })
420            .collect::<Vec<_>>();
421
422        while let Some(remote_node) =
423            Self::weighted_select(&mut remaining_validators, &mut remaining_weights)
424        {
425            if target_next_block_height <= info.next_block_height {
426                return Ok(info);
427            }
428            match self
429                .download_certificates_from(&remote_node, chain_id, target_next_block_height)
430                .await
431            {
432                Err(err) => info!(
433                    "Failed to download certificates from validator {:?}: {err}",
434                    remote_node.public_key
435                ),
436                Ok(Some(new_info)) => info = new_info,
437                Ok(None) => {}
438            }
439        }
440        ensure!(
441            target_next_block_height <= info.next_block_height,
442            chain_client::Error::CannotDownloadCertificates {
443                chain_id,
444                target_next_block_height,
445            }
446        );
447        Ok(info)
448    }
449
450    /// Downloads and processes all certificates up to (excluding) the specified height from the
451    /// given validator.
452    #[instrument(level = "trace", skip_all)]
453    async fn download_certificates_from(
454        &self,
455        remote_node: &RemoteNode<Env::ValidatorNode>,
456        chain_id: ChainId,
457        stop: BlockHeight,
458    ) -> Result<Option<Box<ChainInfo>>, chain_client::Error> {
459        let mut last_info = None;
460        // First load any blocks from local storage, if available.
461        let chain_info = self.local_node.chain_info(chain_id).await?;
462        let mut next_height = chain_info.next_block_height;
463        let hashes = self
464            .local_node
465            .get_preprocessed_block_hashes(chain_id, next_height, stop)
466            .await?;
467        let certificates = self
468            .storage_client()
469            .read_certificates(hashes.clone())
470            .await?;
471        let certificates = match ResultReadCertificates::new(certificates, hashes) {
472            ResultReadCertificates::Certificates(certificates) => certificates,
473            ResultReadCertificates::InvalidHashes(hashes) => {
474                return Err(chain_client::Error::ReadCertificatesError(hashes))
475            }
476        };
477        for certificate in certificates {
478            last_info = Some(self.handle_certificate(certificate).await?.info);
479        }
480        // Now download the rest in batches from the remote node.
481        while next_height < stop {
482            // TODO(#2045): Analyze network errors instead of using a fixed batch size.
483            let limit = u64::from(stop)
484                .checked_sub(u64::from(next_height))
485                .ok_or(ArithmeticError::Overflow)?
486                .min(self.options.certificate_download_batch_size);
487            let certificates = remote_node
488                .query_certificates_from(chain_id, next_height, limit)
489                .await?;
490            let Some(info) = self.process_certificates(remote_node, certificates).await? else {
491                break;
492            };
493            assert!(info.next_block_height > next_height);
494            next_height = info.next_block_height;
495            last_info = Some(info);
496        }
497        Ok(last_info)
498    }
499
500    async fn download_blobs(
501        &self,
502        remote_node: &RemoteNode<impl ValidatorNode>,
503        blob_ids: impl IntoIterator<Item = BlobId>,
504    ) -> Result<(), chain_client::Error> {
505        self.local_node
506            .store_blobs(
507                &futures::stream::iter(blob_ids.into_iter().map(|blob_id| async move {
508                    remote_node.try_download_blob(blob_id).await.unwrap()
509                }))
510                .buffer_unordered(self.options.max_joined_tasks)
511                .collect::<Vec<_>>()
512                .await,
513            )
514            .await
515            .map_err(Into::into)
516    }
517
518    /// Tries to process all the certificates, requesting any missing blobs from the given node.
519    /// Returns the chain info of the last successfully processed certificate.
520    #[instrument(level = "trace", skip_all)]
521    async fn process_certificates(
522        &self,
523        remote_node: &RemoteNode<impl ValidatorNode>,
524        certificates: Vec<ConfirmedBlockCertificate>,
525    ) -> Result<Option<Box<ChainInfo>>, chain_client::Error> {
526        let mut info = None;
527        let required_blob_ids: Vec<_> = certificates
528            .iter()
529            .flat_map(|certificate| certificate.value().required_blob_ids())
530            .collect();
531
532        match self
533            .local_node
534            .read_blob_states_from_storage(&required_blob_ids)
535            .await
536        {
537            Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
538                self.download_blobs(remote_node, blob_ids).await?;
539            }
540            x => {
541                x?;
542            }
543        }
544
545        for certificate in certificates {
546            info = Some(
547                match self.handle_certificate(certificate.clone()).await {
548                    Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
549                        self.download_blobs(remote_node, blob_ids).await?;
550                        self.handle_certificate(certificate).await?
551                    }
552                    x => x?,
553                }
554                .info,
555            );
556        }
557
558        // Done with all certificates.
559        Ok(info)
560    }
561
562    async fn handle_certificate<T: ProcessableCertificate>(
563        &self,
564        certificate: GenericCertificate<T>,
565    ) -> Result<ChainInfoResponse, LocalNodeError> {
566        self.local_node
567            .handle_certificate(certificate, &self.notifier)
568            .await
569    }
570
571    async fn chain_info_with_committees(
572        &self,
573        chain_id: ChainId,
574    ) -> Result<Box<ChainInfo>, LocalNodeError> {
575        let query = ChainInfoQuery::new(chain_id).with_committees();
576        let info = self.local_node.handle_chain_info_query(query).await?.info;
577        Ok(info)
578    }
579
580    /// Obtains all the committees trusted by any of the given chains. Also returns the highest
581    /// of their epochs.
582    #[instrument(level = "trace", skip_all)]
583    async fn admin_committees(
584        &self,
585    ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
586        let info = self.chain_info_with_committees(self.admin_id).await?;
587        Ok((info.epoch, info.into_committees()?))
588    }
589
590    /// Obtains the committee for the latest epoch on the admin chain.
591    pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
592        let info = self.chain_info_with_committees(self.admin_id).await?;
593        Ok((info.epoch, info.into_current_committee()?))
594    }
595
596    /// Obtains the validators for the latest epoch.
597    async fn validator_nodes(
598        &self,
599    ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, chain_client::Error> {
600        let (_, committee) = self.admin_committee().await?;
601        Ok(self.make_nodes(&committee)?)
602    }
603
604    /// Creates a [`RemoteNode`] for each validator in the committee.
605    fn make_nodes(
606        &self,
607        committee: &Committee,
608    ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, NodeError> {
609        Ok(self
610            .validator_node_provider()
611            .make_nodes(committee)?
612            .map(|(public_key, node)| RemoteNode { public_key, node })
613            .collect())
614    }
615
616    /// Ensures that the client has the `ChainDescription` blob corresponding to this
617    /// client's `ChainId`.
618    pub async fn get_chain_description(
619        &self,
620        chain_id: ChainId,
621    ) -> Result<ChainDescription, chain_client::Error> {
622        let chain_desc_id = BlobId::new(chain_id.0, BlobType::ChainDescription);
623        let blob = self
624            .local_node
625            .storage_client()
626            .read_blob(chain_desc_id)
627            .await?;
628        if let Some(blob) = blob {
629            // We have the blob - return it.
630            return Ok(bcs::from_bytes(blob.bytes())?);
631        };
632        // Recover history from the current validators, according to the admin chain.
633        self.synchronize_chain_state(self.admin_id).await?;
634        let nodes = self.validator_nodes().await?;
635        let blob = self
636            .update_local_node_with_blobs_from(vec![chain_desc_id], &nodes)
637            .await?
638            .pop()
639            .unwrap(); // Returns exactly as many blobs as passed-in IDs.
640        Ok(bcs::from_bytes(blob.bytes())?)
641    }
642
643    /// Updates the latest block and next block height and round information from the chain info.
644    #[instrument(level = "trace", skip_all, fields(chain_id = format!("{:.8}", info.chain_id)))]
645    fn update_from_info(&self, info: &ChainInfo) {
646        self.chains.pin().update(info.chain_id, |state| {
647            let mut state = state.clone_for_update_unchecked();
648            state.update_from_info(info);
649            state
650        });
651    }
652
653    /// Handles the certificate in the local node and the resulting notifications.
654    #[instrument(level = "trace", skip_all)]
655    async fn process_certificate<T: ProcessableCertificate>(
656        &self,
657        certificate: Box<GenericCertificate<T>>,
658    ) -> Result<(), LocalNodeError> {
659        let info = self.handle_certificate(*certificate).await?.info;
660        self.update_from_info(&info);
661        Ok(())
662    }
663
664    /// Submits a validated block for finalization and returns the confirmed block certificate.
665    #[instrument(level = "trace", skip_all)]
666    async fn finalize_block(
667        &self,
668        committee: &Committee,
669        certificate: ValidatedBlockCertificate,
670    ) -> Result<ConfirmedBlockCertificate, chain_client::Error> {
671        debug!(round = %certificate.round, "Submitting block for confirmation");
672        let hashed_value = ConfirmedBlock::new(certificate.inner().block().clone());
673        let finalize_action = CommunicateAction::FinalizeBlock {
674            certificate: Box::new(certificate),
675            delivery: self.options.cross_chain_message_delivery,
676        };
677        let certificate = self
678            .communicate_chain_action(committee, finalize_action, hashed_value)
679            .await?;
680        self.receive_certificate_with_checked_signatures(certificate.clone())
681            .await?;
682        Ok(certificate)
683    }
684
685    /// Submits a block proposal to the validators.
686    #[instrument(level = "trace", skip_all)]
687    async fn submit_block_proposal<T: ProcessableCertificate>(
688        &self,
689        committee: &Committee,
690        proposal: Box<BlockProposal>,
691        value: T,
692    ) -> Result<GenericCertificate<T>, chain_client::Error> {
693        debug!(
694            round = %proposal.content.round,
695            "Submitting block proposal to validators"
696        );
697        let submit_action = CommunicateAction::SubmitBlock {
698            proposal,
699            blob_ids: value.required_blob_ids().into_iter().collect(),
700        };
701        let certificate = self
702            .communicate_chain_action(committee, submit_action, value)
703            .await?;
704        self.process_certificate(Box::new(certificate.clone()))
705            .await?;
706        Ok(certificate)
707    }
708
709    /// Broadcasts certified blocks to validators.
710    #[instrument(level = "trace", skip_all, fields(chain_id, block_height, delivery))]
711    async fn communicate_chain_updates(
712        &self,
713        committee: &Committee,
714        chain_id: ChainId,
715        height: BlockHeight,
716        delivery: CrossChainMessageDelivery,
717    ) -> Result<(), chain_client::Error> {
718        let nodes = self.make_nodes(committee)?;
719        communicate_with_quorum(
720            &nodes,
721            committee,
722            |_: &()| (),
723            |remote_node| {
724                let mut updater = ValidatorUpdater {
725                    remote_node,
726                    local_node: self.local_node.clone(),
727                    admin_id: self.admin_id,
728                };
729                Box::pin(async move {
730                    updater
731                        .send_chain_information(chain_id, height, delivery)
732                        .await
733                })
734            },
735            self.options.grace_period,
736        )
737        .await?;
738        Ok(())
739    }
740
741    /// Broadcasts certified blocks and optionally a block proposal, certificate or
742    /// leader timeout request.
743    ///
744    /// In that case, it verifies that the validator votes are for the provided value,
745    /// and returns a certificate.
746    #[instrument(level = "trace", skip_all)]
747    async fn communicate_chain_action<T: CertificateValue>(
748        &self,
749        committee: &Committee,
750        action: CommunicateAction,
751        value: T,
752    ) -> Result<GenericCertificate<T>, chain_client::Error> {
753        let nodes = self.make_nodes(committee)?;
754        let ((votes_hash, votes_round), votes) = communicate_with_quorum(
755            &nodes,
756            committee,
757            |vote: &LiteVote| (vote.value.value_hash, vote.round),
758            |remote_node| {
759                let mut updater = ValidatorUpdater {
760                    remote_node,
761                    local_node: self.local_node.clone(),
762                    admin_id: self.admin_id,
763                };
764                let action = action.clone();
765                Box::pin(async move { updater.send_chain_update(action).await })
766            },
767            self.options.grace_period,
768        )
769        .await?;
770        ensure!(
771            (votes_hash, votes_round) == (value.hash(), action.round()),
772            chain_client::Error::UnexpectedQuorum {
773                hash: votes_hash,
774                round: votes_round,
775                expected_hash: value.hash(),
776                expected_round: action.round(),
777            }
778        );
779        // Certificate is valid because
780        // * `communicate_with_quorum` ensured a sufficient "weight" of
781        // (non-error) answers were returned by validators.
782        // * each answer is a vote signed by the expected validator.
783        let certificate = LiteCertificate::try_from_votes(votes)
784            .ok_or_else(|| {
785                chain_client::Error::InternalError(
786                    "Vote values or rounds don't match; this is a bug",
787                )
788            })?
789            .with_value(value)
790            .ok_or_else(|| {
791                chain_client::Error::ProtocolError("A quorum voted for an unexpected value")
792            })?;
793        Ok(certificate)
794    }
795
796    /// Processes the confirmed block certificate in the local node without checking signatures.
797    /// Also downloads and processes all ancestors that are still missing.
798    #[instrument(level = "trace", skip_all)]
799    async fn receive_certificate_with_checked_signatures(
800        &self,
801        certificate: ConfirmedBlockCertificate,
802    ) -> Result<(), chain_client::Error> {
803        let certificate = Box::new(certificate);
804        let block = certificate.block();
805
806        // Recover history from the network.
807        self.download_certificates(block.header.chain_id, block.header.height)
808            .await?;
809        // Process the received operations. Download required hashed certificate values if
810        // necessary.
811        if let Err(err) = self.process_certificate(certificate.clone()).await {
812            match &err {
813                LocalNodeError::BlobsNotFound(blob_ids) => {
814                    let blobs = RemoteNode::download_blobs(
815                        blob_ids,
816                        &self.validator_nodes().await?,
817                        self.options.blob_download_timeout,
818                    )
819                    .await
820                    .ok_or(err)?;
821                    self.local_node.store_blobs(&blobs).await?;
822                    self.process_certificate(certificate).await?;
823                }
824                _ => {
825                    // The certificate is not as expected. Give up.
826                    warn!("Failed to process network hashed certificate value");
827                    return Err(err.into());
828                }
829            }
830        }
831
832        Ok(())
833    }
834
835    /// Processes the confirmed block in the local node, possibly without executing it.
836    #[instrument(level = "trace", skip_all)]
837    #[allow(dead_code)] // Otherwise CI fails when built for docker.
838    async fn receive_sender_certificate(
839        &self,
840        certificate: ConfirmedBlockCertificate,
841        mode: ReceiveCertificateMode,
842        nodes: Option<Vec<RemoteNode<Env::ValidatorNode>>>,
843    ) -> Result<(), chain_client::Error> {
844        // Verify the certificate before doing any expensive networking.
845        let (max_epoch, committees) = self.admin_committees().await?;
846        if let ReceiveCertificateMode::NeedsCheck = mode {
847            Self::check_certificate(max_epoch, &committees, &certificate)?.into_result()?;
848        }
849        // Recover history from the network.
850        let nodes = if let Some(nodes) = nodes {
851            nodes
852        } else {
853            self.validator_nodes().await?
854        };
855        if let Err(err) = self.handle_certificate(certificate.clone()).await {
856            match &err {
857                LocalNodeError::BlobsNotFound(blob_ids) => {
858                    let blobs = RemoteNode::download_blobs(
859                        blob_ids,
860                        &nodes,
861                        self.options.blob_download_timeout,
862                    )
863                    .await
864                    .ok_or(err)?;
865                    self.local_node.store_blobs(&blobs).await?;
866                    self.handle_certificate(certificate.clone()).await?;
867                }
868                _ => {
869                    // The certificate is not as expected. Give up.
870                    warn!("Failed to process network hashed certificate value");
871                    return Err(err.into());
872                }
873            }
874        }
875
876        Ok(())
877    }
878
879    /// Downloads and processes certificates for sender chain blocks.
880    #[instrument(level = "trace", skip_all)]
881    async fn download_and_process_sender_chain(
882        &self,
883        sender_chain_id: ChainId,
884        nodes: &[RemoteNode<Env::ValidatorNode>],
885        received_log: &ReceivedLogs,
886        mut remote_heights: Vec<BlockHeight>,
887        sender: mpsc::UnboundedSender<ChainAndHeight>,
888    ) {
889        let (max_epoch, committees) = match self.admin_committees().await {
890            Ok(result) => result,
891            Err(error) => {
892                error!(%error, %sender_chain_id, "could not read admin committees");
893                return;
894            }
895        };
896        let committees_ref = &committees;
897        let mut nodes = nodes.to_vec();
898        while !remote_heights.is_empty() {
899            let remote_heights_ref = &remote_heights;
900            nodes.shuffle(&mut rand::thread_rng());
901            let certificates = match communicate_concurrently(
902                &nodes,
903                async move |remote_node| {
904                    let mut remote_heights = remote_heights_ref.clone();
905                    // No need trying to download certificates the validator didn't have in their
906                    // log - we'll retry downloading the remaining ones next time we loop.
907                    remote_heights.retain(|height| {
908                        received_log.validator_has_block(
909                            &remote_node.public_key,
910                            sender_chain_id,
911                            *height,
912                        )
913                    });
914                    if remote_heights.is_empty() {
915                        // It makes no sense to return `Ok(_)` if we aren't going to try downloading
916                        // anything from the validator - let the function try the other validators
917                        return Err(());
918                    }
919                    let certificates = remote_node
920                        .download_certificates_by_heights(sender_chain_id, remote_heights)
921                        .await
922                        .map_err(|_| ())?;
923                    let mut certificates_with_check_results = vec![];
924                    for cert in certificates {
925                        if let Ok(check_result) =
926                            Self::check_certificate(max_epoch, committees_ref, &cert)
927                        {
928                            certificates_with_check_results
929                                .push((cert, check_result.into_result().is_ok()));
930                        } else {
931                            // Invalid signature - the validator is faulty
932                            return Err(());
933                        }
934                    }
935                    Ok(certificates_with_check_results)
936                },
937                |errors| {
938                    errors
939                        .into_iter()
940                        .map(|(validator, _error)| validator)
941                        .collect::<BTreeSet<_>>()
942                },
943                self.options.certificate_batch_download_timeout,
944            )
945            .await
946            {
947                Ok(certificates_with_check_results) => certificates_with_check_results,
948                Err(faulty_validators) => {
949                    // filter out faulty validators and retry if any are left
950                    nodes.retain(|node| !faulty_validators.contains(&node.public_key));
951                    if nodes.is_empty() {
952                        info!(
953                            chain_id = %sender_chain_id,
954                            "could not download certificates for chain - no more correct validators left"
955                        );
956                        return;
957                    }
958                    continue;
959                }
960            };
961
962            trace!(
963                chain_id = %sender_chain_id,
964                num_certificates = %certificates.len(),
965                "received certificates",
966            );
967
968            let mut to_remove_from_queue = BTreeSet::new();
969
970            for (certificate, check_result) in certificates {
971                let hash = certificate.hash();
972                let chain_id = certificate.block().header.chain_id;
973                let height = certificate.block().header.height;
974                if !check_result {
975                    // The certificate was correctly signed, but we were missing a committee to
976                    // validate it properly - do not receive it, but also do not attempt to
977                    // re-download it.
978                    to_remove_from_queue.insert(height);
979                    continue;
980                }
981                // We checked the certificates right after downloading them.
982                let mode = ReceiveCertificateMode::AlreadyChecked;
983                if let Err(error) = self
984                    .receive_sender_certificate(certificate, mode, None)
985                    .await
986                {
987                    warn!(%error, %hash, "Received invalid certificate");
988                } else {
989                    to_remove_from_queue.insert(height);
990                    if let Err(error) = sender.send(ChainAndHeight { chain_id, height }) {
991                        error!(
992                            %chain_id,
993                            %height,
994                            %error,
995                            "failed to send chain and height over the channel",
996                        );
997                    }
998                }
999            }
1000
1001            remote_heights.retain(|height| !to_remove_from_queue.contains(height));
1002        }
1003        trace!(
1004            chain_id = %sender_chain_id,
1005            "find_received_certificates: finished processing chain",
1006        );
1007    }
1008
1009    /// Downloads the log of received messages for a chain from a validator.
1010    #[instrument(level = "trace", skip(self))]
1011    async fn get_received_log_from_validator(
1012        &self,
1013        chain_id: ChainId,
1014        remote_node: &RemoteNode<Env::ValidatorNode>,
1015        tracker: u64,
1016    ) -> Result<Vec<ChainAndHeight>, chain_client::Error> {
1017        let mut offset = tracker;
1018
1019        // Retrieve the list of newly received certificates from this validator.
1020        let mut remote_log = Vec::new();
1021        loop {
1022            trace!("get_received_log_from_validator: looping");
1023            let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_n(offset);
1024            let info = remote_node.handle_chain_info_query(query).await?;
1025            let received_entries = info.requested_received_log.len();
1026            offset += received_entries as u64;
1027            remote_log.extend(info.requested_received_log);
1028            trace!(
1029                ?remote_node,
1030                %received_entries,
1031                "get_received_log_from_validator: received log batch",
1032            );
1033            if received_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES {
1034                break;
1035            }
1036        }
1037
1038        trace!(
1039            ?remote_node,
1040            num_entries = %remote_log.len(),
1041            "get_received_log_from_validator: returning downloaded log",
1042        );
1043
1044        Ok(remote_log)
1045    }
1046
1047    /// Downloads a specific sender block and recursively downloads any earlier blocks
1048    /// that also sent a message to our chain, based on `previous_message_blocks`.
1049    ///
1050    /// This ensures that we have all the sender blocks needed to preprocess the target block
1051    /// and put the messages to our chain into the outbox.
1052    async fn download_sender_block_with_sending_ancestors(
1053        &self,
1054        receiver_chain_id: ChainId,
1055        sender_chain_id: ChainId,
1056        height: BlockHeight,
1057        remote_node: &RemoteNode<Env::ValidatorNode>,
1058    ) -> Result<(), chain_client::Error> {
1059        let next_outbox_height = self
1060            .local_node
1061            .next_outbox_heights(&[sender_chain_id], receiver_chain_id)
1062            .await?
1063            .get(&sender_chain_id)
1064            .copied()
1065            .unwrap_or(BlockHeight::ZERO);
1066        let (max_epoch, committees) = self.admin_committees().await?;
1067
1068        // Recursively collect all certificates we need, following
1069        // the chain of previous_message_blocks back to next_outbox_height.
1070        let mut certificates = BTreeMap::new();
1071        let mut current_height = height;
1072
1073        // Stop if we've reached the height we've already processed.
1074        while current_height >= next_outbox_height {
1075            // Download the certificate for this height.
1076            let downloaded = remote_node
1077                .download_certificates_by_heights(sender_chain_id, vec![current_height])
1078                .await?;
1079            let Some(certificate) = downloaded.into_iter().next() else {
1080                return Err(chain_client::Error::CannotDownloadMissingSenderBlock {
1081                    chain_id: sender_chain_id,
1082                    height: current_height,
1083                });
1084            };
1085
1086            // Validate the certificate.
1087            Client::<Env>::check_certificate(max_epoch, &committees, &certificate)?
1088                .into_result()?;
1089
1090            // Check if there's a previous message block to our chain.
1091            let block = certificate.block();
1092            let next_height = block
1093                .body
1094                .previous_message_blocks
1095                .get(&receiver_chain_id)
1096                .map(|(_prev_hash, prev_height)| *prev_height);
1097
1098            // Store this certificate.
1099            certificates.insert(current_height, certificate);
1100
1101            if let Some(prev_height) = next_height {
1102                // Continue with the previous block.
1103                current_height = prev_height;
1104            } else {
1105                // No more dependencies.
1106                break;
1107            }
1108        }
1109
1110        if certificates.is_empty() {
1111            self.local_node
1112                .retry_pending_cross_chain_requests(sender_chain_id)
1113                .await?;
1114        }
1115
1116        // Process certificates in ascending block height order (BTreeMap keeps them sorted).
1117        for certificate in certificates.into_values() {
1118            self.receive_sender_certificate(
1119                certificate,
1120                ReceiveCertificateMode::AlreadyChecked,
1121                Some(vec![remote_node.clone()]),
1122            )
1123            .await?;
1124        }
1125
1126        Ok(())
1127    }
1128
1129    #[instrument(
1130        level = "trace", skip_all,
1131        fields(certificate_hash = ?incoming_certificate.hash()),
1132    )]
1133    fn check_certificate(
1134        highest_known_epoch: Epoch,
1135        committees: &BTreeMap<Epoch, Committee>,
1136        incoming_certificate: &ConfirmedBlockCertificate,
1137    ) -> Result<CheckCertificateResult, NodeError> {
1138        let block = incoming_certificate.block();
1139        // Check that certificates are valid w.r.t one of our trusted committees.
1140        if block.header.epoch > highest_known_epoch {
1141            return Ok(CheckCertificateResult::FutureEpoch);
1142        }
1143        if let Some(known_committee) = committees.get(&block.header.epoch) {
1144            // This epoch is recognized by our chain. Let's verify the
1145            // certificate.
1146            incoming_certificate.check(known_committee)?;
1147            Ok(CheckCertificateResult::New)
1148        } else {
1149            // We don't accept a certificate from a committee that was retired.
1150            Ok(CheckCertificateResult::OldEpoch)
1151        }
1152    }
1153
1154    /// Downloads and processes any certificates we are missing for the given chain.
1155    #[instrument(level = "trace", skip_all)]
1156    async fn synchronize_chain_state(
1157        &self,
1158        chain_id: ChainId,
1159    ) -> Result<Box<ChainInfo>, chain_client::Error> {
1160        let (_, committee) = self.admin_committee().await?;
1161        self.synchronize_chain_state_from_committee(chain_id, committee)
1162            .await
1163    }
1164
1165    /// Downloads and processes any certificates we are missing for the given chain, from the given
1166    /// committee.
1167    #[instrument(level = "trace", skip_all)]
1168    pub async fn synchronize_chain_state_from_committee(
1169        &self,
1170        chain_id: ChainId,
1171        committee: Committee,
1172    ) -> Result<Box<ChainInfo>, chain_client::Error> {
1173        #[cfg(with_metrics)]
1174        let _latency = metrics::SYNCHRONIZE_CHAIN_STATE_LATENCY.measure_latency();
1175
1176        let validators = self.make_nodes(&committee)?;
1177        Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
1178        communicate_with_quorum(
1179            &validators,
1180            &committee,
1181            |_: &()| (),
1182            |remote_node| async move {
1183                self.synchronize_chain_state_from(&remote_node, chain_id)
1184                    .await
1185            },
1186            self.options.grace_period,
1187        )
1188        .await?;
1189
1190        self.local_node
1191            .chain_info(chain_id)
1192            .await
1193            .map_err(Into::into)
1194    }
1195
1196    /// Downloads any certificates from the specified validator that we are missing for the given
1197    /// chain, and processes them.
1198    #[instrument(level = "trace", skip(self, remote_node, chain_id))]
1199    async fn synchronize_chain_state_from(
1200        &self,
1201        remote_node: &RemoteNode<Env::ValidatorNode>,
1202        chain_id: ChainId,
1203    ) -> Result<(), chain_client::Error> {
1204        let mut local_info = self.local_node.chain_info(chain_id).await?;
1205        let query = ChainInfoQuery::new(chain_id).with_manager_values();
1206        let remote_info = remote_node.handle_chain_info_query(query).await?;
1207        if let Some(new_info) = self
1208            .download_certificates_from(remote_node, chain_id, remote_info.next_block_height)
1209            .await?
1210        {
1211            local_info = new_info;
1212        };
1213
1214        // If we are at the same height as the remote node, we also update our chain manager.
1215        if local_info.next_block_height != remote_info.next_block_height {
1216            debug!(
1217                "Synced from validator {}; but remote height is {} and local height is {}",
1218                remote_node.public_key, remote_info.next_block_height, local_info.next_block_height
1219            );
1220            return Ok(());
1221        };
1222
1223        if let Some(timeout) = remote_info.manager.timeout {
1224            self.handle_certificate(*timeout).await?;
1225        }
1226        let mut proposals = Vec::new();
1227        if let Some(proposal) = remote_info.manager.requested_signed_proposal {
1228            proposals.push(*proposal);
1229        }
1230        if let Some(proposal) = remote_info.manager.requested_proposed {
1231            proposals.push(*proposal);
1232        }
1233        if let Some(locking) = remote_info.manager.requested_locking {
1234            match *locking {
1235                LockingBlock::Fast(proposal) => {
1236                    proposals.push(proposal);
1237                }
1238                LockingBlock::Regular(cert) => {
1239                    let hash = cert.hash();
1240                    if let Err(err) = self.try_process_locking_block_from(remote_node, cert).await {
1241                        debug!(
1242                            "Skipping locked block {hash} from validator {} at height {}: {err}",
1243                            remote_node.public_key, local_info.next_block_height,
1244                        );
1245                    }
1246                }
1247            }
1248        }
1249        'proposal_loop: for proposal in proposals {
1250            let owner: AccountOwner = proposal.owner();
1251            if let Err(mut err) = self
1252                .local_node
1253                .handle_block_proposal(proposal.clone())
1254                .await
1255            {
1256                if let LocalNodeError::BlobsNotFound(_) = &err {
1257                    let required_blob_ids = proposal.required_blob_ids().collect::<Vec<_>>();
1258                    if !required_blob_ids.is_empty() {
1259                        let mut blobs = Vec::new();
1260                        for blob_id in required_blob_ids {
1261                            let blob_content = match remote_node
1262                                .node
1263                                .download_pending_blob(chain_id, blob_id)
1264                                .await
1265                            {
1266                                Ok(content) => content,
1267                                Err(err) => {
1268                                    info!(
1269                                        "Skipping proposal from {owner} and validator {} at \
1270                                        height {}; failed to download {blob_id}: {err}",
1271                                        remote_node.public_key, local_info.next_block_height
1272                                    );
1273                                    continue 'proposal_loop;
1274                                }
1275                            };
1276                            blobs.push(Blob::new(blob_content));
1277                        }
1278                        self.local_node
1279                            .handle_pending_blobs(chain_id, blobs)
1280                            .await?;
1281                        // We found the missing blobs: retry.
1282                        if let Err(new_err) = self
1283                            .local_node
1284                            .handle_block_proposal(proposal.clone())
1285                            .await
1286                        {
1287                            err = new_err;
1288                        } else {
1289                            continue;
1290                        }
1291                    }
1292                    if let LocalNodeError::BlobsNotFound(blob_ids) = &err {
1293                        self.update_local_node_with_blobs_from(
1294                            blob_ids.clone(),
1295                            &[remote_node.clone()],
1296                        )
1297                        .await?;
1298                        // We found the missing blobs: retry.
1299                        if let Err(new_err) = self
1300                            .local_node
1301                            .handle_block_proposal(proposal.clone())
1302                            .await
1303                        {
1304                            err = new_err;
1305                        } else {
1306                            continue;
1307                        }
1308                    }
1309                }
1310                while let LocalNodeError::WorkerError(WorkerError::ChainError(chain_err)) = &err {
1311                    if let ChainError::MissingCrossChainUpdate {
1312                        chain_id,
1313                        origin,
1314                        height,
1315                    } = &**chain_err
1316                    {
1317                        self.download_sender_block_with_sending_ancestors(
1318                            *chain_id,
1319                            *origin,
1320                            *height,
1321                            remote_node,
1322                        )
1323                        .await?;
1324                        // Retry
1325                        if let Err(new_err) = self
1326                            .local_node
1327                            .handle_block_proposal(proposal.clone())
1328                            .await
1329                        {
1330                            err = new_err;
1331                        } else {
1332                            continue 'proposal_loop;
1333                        }
1334                    } else {
1335                        break;
1336                    }
1337                }
1338
1339                debug!(
1340                    "Skipping proposal from {owner} and validator {} at height {}: {err}",
1341                    remote_node.public_key, local_info.next_block_height
1342                );
1343            }
1344        }
1345        Ok(())
1346    }
1347
1348    async fn try_process_locking_block_from(
1349        &self,
1350        remote_node: &RemoteNode<Env::ValidatorNode>,
1351        certificate: GenericCertificate<ValidatedBlock>,
1352    ) -> Result<(), chain_client::Error> {
1353        let chain_id = certificate.inner().chain_id();
1354        let certificate = Box::new(certificate);
1355        match self.process_certificate(certificate.clone()).await {
1356            Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1357                let mut blobs = Vec::new();
1358                for blob_id in blob_ids {
1359                    let blob_content = remote_node
1360                        .node
1361                        .download_pending_blob(chain_id, blob_id)
1362                        .await?;
1363                    blobs.push(Blob::new(blob_content));
1364                }
1365                self.local_node
1366                    .handle_pending_blobs(chain_id, blobs)
1367                    .await?;
1368                self.process_certificate(certificate).await?;
1369                Ok(())
1370            }
1371            Err(err) => Err(err.into()),
1372            Ok(()) => Ok(()),
1373        }
1374    }
1375
1376    /// Downloads and processes from the specified validators a confirmed block certificates that
1377    /// use the given blobs. If this succeeds, the blob will be in our storage.
1378    async fn update_local_node_with_blobs_from(
1379        &self,
1380        blob_ids: Vec<BlobId>,
1381        remote_nodes: &[RemoteNode<Env::ValidatorNode>],
1382    ) -> Result<Vec<Blob>, chain_client::Error> {
1383        let timeout = self.options.blob_download_timeout;
1384        // Deduplicate IDs.
1385        let blob_ids = blob_ids.into_iter().collect::<BTreeSet<_>>();
1386        stream::iter(blob_ids.into_iter().map(|blob_id| {
1387            communicate_concurrently(
1388                remote_nodes,
1389                async move |remote_node| {
1390                    let certificate = remote_node.download_certificate_for_blob(blob_id).await?;
1391                    self.receive_sender_certificate(
1392                        certificate,
1393                        ReceiveCertificateMode::NeedsCheck,
1394                        Some(vec![remote_node.clone()]),
1395                    )
1396                    .await?;
1397                    let blob = self
1398                        .local_node
1399                        .storage_client()
1400                        .read_blob(blob_id)
1401                        .await?
1402                        .ok_or_else(|| LocalNodeError::BlobsNotFound(vec![blob_id]))?;
1403                    Result::<_, chain_client::Error>::Ok(blob)
1404                },
1405                move |_| chain_client::Error::from(NodeError::BlobsNotFound(vec![blob_id])),
1406                timeout,
1407            )
1408        }))
1409        .buffer_unordered(self.options.max_joined_tasks)
1410        .collect::<Vec<_>>()
1411        .await
1412        .into_iter()
1413        .collect()
1414    }
1415
1416    /// Attempts to execute the block locally. If any incoming message execution fails, that
1417    /// message is rejected and execution is retried, until the block accepts only messages
1418    /// that succeed.
1419    // TODO(#2806): Measure how failing messages affect the execution times.
1420    #[tracing::instrument(level = "trace", skip(self, block))]
1421    async fn stage_block_execution_and_discard_failing_messages(
1422        &self,
1423        mut block: ProposedBlock,
1424        round: Option<u32>,
1425        published_blobs: Vec<Blob>,
1426    ) -> Result<(Block, ChainInfoResponse), chain_client::Error> {
1427        loop {
1428            let result = self
1429                .stage_block_execution(block.clone(), round, published_blobs.clone())
1430                .await;
1431            if let Err(chain_client::Error::LocalNodeError(LocalNodeError::WorkerError(
1432                WorkerError::ChainError(chain_error),
1433            ))) = &result
1434            {
1435                if let ChainError::ExecutionError(
1436                    error,
1437                    ChainExecutionContext::IncomingBundle(index),
1438                ) = &**chain_error
1439                {
1440                    let transaction = block
1441                        .transactions
1442                        .get_mut(*index as usize)
1443                        .expect("Transaction at given index should exist");
1444                    let Transaction::ReceiveMessages(message) = transaction else {
1445                        panic!(
1446                            "Expected incoming bundle at transaction index {}, found operation",
1447                            index
1448                        );
1449                    };
1450                    ensure!(
1451                        !message.bundle.is_protected(),
1452                        chain_client::Error::BlockProposalError(
1453                            "Protected incoming message failed to execute locally"
1454                        )
1455                    );
1456                    // Reject the faulty message from the block and continue.
1457                    // TODO(#1420): This is potentially a bit heavy-handed for
1458                    // retryable errors.
1459                    info!(
1460                        %error, origin = ?message.origin,
1461                        "Message failed to execute locally and will be rejected."
1462                    );
1463                    message.action = MessageAction::Reject;
1464                    continue;
1465                }
1466            }
1467            return result;
1468        }
1469    }
1470
1471    /// Attempts to execute the block locally. If any attempt to read a blob fails, the blob is
1472    /// downloaded and execution is retried.
1473    #[instrument(level = "trace", skip(self, block))]
1474    async fn stage_block_execution(
1475        &self,
1476        block: ProposedBlock,
1477        round: Option<u32>,
1478        published_blobs: Vec<Blob>,
1479    ) -> Result<(Block, ChainInfoResponse), chain_client::Error> {
1480        loop {
1481            let result = self
1482                .local_node
1483                .stage_block_execution(block.clone(), round, published_blobs.clone())
1484                .await;
1485            if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1486                let validators = self.validator_nodes().await?;
1487                self.update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1488                    .await?;
1489                continue; // We found the missing blob: retry.
1490            }
1491            return Ok(result?);
1492        }
1493    }
1494}
1495
1496/// Performs `f` in parallel on multiple nodes, starting with a quadratically increasing delay on
1497/// each subsequent node. Returns error `err` is all of the nodes fail.
1498async fn communicate_concurrently<'a, A, E1, E2, F, G, R, V>(
1499    nodes: &[RemoteNode<A>],
1500    f: F,
1501    err: G,
1502    timeout: Duration,
1503) -> Result<V, E2>
1504where
1505    F: Clone + FnOnce(RemoteNode<A>) -> R,
1506    RemoteNode<A>: Clone,
1507    G: FnOnce(Vec<(ValidatorPublicKey, E1)>) -> E2,
1508    R: Future<Output = Result<V, E1>> + 'a,
1509{
1510    let mut stream = nodes
1511        .iter()
1512        .zip(0..)
1513        .map(|(remote_node, i)| {
1514            let fun = f.clone();
1515            let node = remote_node.clone();
1516            async move {
1517                linera_base::time::timer::sleep(timeout * i * i).await;
1518                fun(node).await.map_err(|err| (remote_node.public_key, err))
1519            }
1520        })
1521        .collect::<FuturesUnordered<_>>();
1522    let mut errors = vec![];
1523    while let Some(maybe_result) = stream.next().await {
1524        match maybe_result {
1525            Ok(result) => return Ok(result),
1526            Err(error) => errors.push(error),
1527        };
1528    }
1529    Err(err(errors))
1530}
1531
1532/// The outcome of trying to commit a list of incoming messages and operations to the chain.
1533#[derive(Debug)]
1534enum ExecuteBlockOutcome {
1535    /// A block with the messages and operations was committed.
1536    Executed(ConfirmedBlockCertificate),
1537    /// A different block was already proposed and got committed. Check whether the messages and
1538    /// operations are still suitable, and try again at the next block height.
1539    Conflict(ConfirmedBlockCertificate),
1540    /// We are not the round leader and cannot do anything. Try again at the specified time or
1541    /// or whenever the round or block height changes.
1542    WaitForTimeout(RoundTimeout),
1543}
1544
1545/// Wrapper for `AbortHandle` that aborts when its dropped.
1546#[must_use]
1547pub struct AbortOnDrop(pub AbortHandle);
1548
1549impl Drop for AbortOnDrop {
1550    #[instrument(level = "trace", skip(self))]
1551    fn drop(&mut self) {
1552        self.0.abort();
1553    }
1554}
1555
1556/// A pending proposed block, together with its published blobs.
1557#[derive(Clone, Serialize, Deserialize)]
1558pub struct PendingProposal {
1559    pub block: ProposedBlock,
1560    pub blobs: Vec<Blob>,
1561}
1562
1563enum ReceiveCertificateMode {
1564    NeedsCheck,
1565    AlreadyChecked,
1566}
1567
1568enum CheckCertificateResult {
1569    OldEpoch,
1570    New,
1571    FutureEpoch,
1572}
1573
1574impl CheckCertificateResult {
1575    fn into_result(self) -> Result<(), chain_client::Error> {
1576        match self {
1577            Self::OldEpoch => Err(chain_client::Error::CommitteeDeprecationError),
1578            Self::New => Ok(()),
1579            Self::FutureEpoch => Err(chain_client::Error::CommitteeSynchronizationError),
1580        }
1581    }
1582}
1583
1584/// Creates a compressed Contract, Service and bytecode.
1585#[cfg(not(target_arch = "wasm32"))]
1586pub async fn create_bytecode_blobs(
1587    contract: Bytecode,
1588    service: Bytecode,
1589    vm_runtime: VmRuntime,
1590) -> (Vec<Blob>, ModuleId) {
1591    match vm_runtime {
1592        VmRuntime::Wasm => {
1593            let (compressed_contract, compressed_service) =
1594                tokio::task::spawn_blocking(move || (contract.compress(), service.compress()))
1595                    .await
1596                    .expect("Compression should not panic");
1597            let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1598            let service_blob = Blob::new_service_bytecode(compressed_service);
1599            let module_id =
1600                ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
1601            (vec![contract_blob, service_blob], module_id)
1602        }
1603        VmRuntime::Evm => {
1604            let compressed_contract = contract.compress();
1605            let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1606            let module_id = ModuleId::new(
1607                evm_contract_blob.id().hash,
1608                evm_contract_blob.id().hash,
1609                vm_runtime,
1610            );
1611            (vec![evm_contract_blob], module_id)
1612        }
1613    }
1614}