linera_core/client/
mod.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    cmp::Ordering,
7    collections::{BTreeMap, BTreeSet},
8    sync::{Arc, RwLock},
9};
10
11use custom_debug_derive::Debug;
12use futures::{
13    future::Future,
14    stream::{self, AbortHandle, FuturesUnordered, StreamExt},
15};
16#[cfg(with_metrics)]
17use linera_base::prometheus_util::MeasureLatency as _;
18use linera_base::{
19    crypto::{CryptoHash, Signer as _, ValidatorPublicKey},
20    data_types::{ArithmeticError, Blob, BlockHeight, ChainDescription, Epoch, TimeDelta},
21    ensure,
22    identifiers::{AccountOwner, BlobId, BlobType, ChainId, StreamId},
23    time::Duration,
24};
25#[cfg(not(target_arch = "wasm32"))]
26use linera_base::{data_types::Bytecode, identifiers::ModuleId, vm::VmRuntime};
27use linera_chain::{
28    data_types::{BlockProposal, BundleExecutionPolicy, ChainAndHeight, LiteVote, ProposedBlock},
29    manager::LockingBlock,
30    types::{
31        Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
32        LiteCertificate, ValidatedBlock, ValidatedBlockCertificate,
33    },
34    ChainError,
35};
36use linera_execution::committee::Committee;
37use linera_storage::{ResultReadCertificates, Storage as _};
38use rand::{
39    distributions::{Distribution, WeightedIndex},
40    seq::SliceRandom,
41};
42use received_log::ReceivedLogs;
43use serde::{Deserialize, Serialize};
44use tokio::sync::mpsc;
45use tracing::{debug, error, info, instrument, trace, warn};
46
47use crate::{
48    data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
49    environment::Environment,
50    local_node::{LocalChainInfoExt as _, LocalNodeClient, LocalNodeError},
51    node::{CrossChainMessageDelivery, NodeError, ValidatorNodeProvider as _},
52    notifier::{ChannelNotifier, Notifier as _},
53    remote_node::RemoteNode,
54    updater::{communicate_with_quorum, CommunicateAction, ValidatorUpdater},
55    worker::{Notification, ProcessableCertificate, Reason, WorkerError, WorkerState},
56    CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
57};
58
59pub mod chain_client;
60pub use chain_client::ChainClient;
61
62pub use crate::data_types::ClientOutcome;
63
64#[cfg(test)]
65#[path = "../unit_tests/client_tests.rs"]
66mod client_tests;
67pub mod requests_scheduler;
68
69pub use requests_scheduler::{RequestsScheduler, RequestsSchedulerConfig, ScoringWeights};
70mod received_log;
71mod validator_trackers;
72
73#[cfg(with_metrics)]
74mod metrics {
75    use std::sync::LazyLock;
76
77    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
78    use prometheus::HistogramVec;
79
80    pub static PROCESS_INBOX_WITHOUT_PREPARE_LATENCY: LazyLock<HistogramVec> =
81        LazyLock::new(|| {
82            register_histogram_vec(
83                "process_inbox_latency",
84                "process_inbox latency",
85                &[],
86                exponential_bucket_latencies(500.0),
87            )
88        });
89
90    pub static PREPARE_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
91        register_histogram_vec(
92            "prepare_chain_latency",
93            "prepare_chain latency",
94            &[],
95            exponential_bucket_latencies(500.0),
96        )
97    });
98
99    pub static SYNCHRONIZE_CHAIN_STATE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
100        register_histogram_vec(
101            "synchronize_chain_state_latency",
102            "synchronize_chain_state latency",
103            &[],
104            exponential_bucket_latencies(500.0),
105        )
106    });
107
108    pub static EXECUTE_BLOCK_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
109        register_histogram_vec(
110            "execute_block_latency",
111            "execute_block latency",
112            &[],
113            exponential_bucket_latencies(500.0),
114        )
115    });
116
117    pub static FIND_RECEIVED_CERTIFICATES_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
118        register_histogram_vec(
119            "find_received_certificates_latency",
120            "find_received_certificates latency",
121            &[],
122            exponential_bucket_latencies(500.0),
123        )
124    });
125}
126
127pub static DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE: u64 = 500;
128pub static DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE: usize = 20_000;
129
130#[derive(Debug, Clone, Copy)]
131pub enum TimingType {
132    ExecuteOperations,
133    ExecuteBlock,
134    SubmitBlockProposal,
135    UpdateValidators,
136}
137
138/// Defines how we listen to a chain:
139/// - do we care about every block notification?
140/// - or do we only care about blocks containing events from some particular streams?
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub enum ListeningMode {
143    /// Listen to everything: all blocks for the chain and all blocks from sender chains,
144    /// and participate in rounds.
145    FullChain,
146    /// Listen to all blocks for the chain, but don't download sender chain blocks or participate
147    /// in rounds. Use this when interested in the chain's state but not intending to propose
148    /// blocks (e.g., because we're not a chain owner).
149    FollowChain,
150    /// Only listen to blocks which contain events from those streams.
151    EventsOnly(BTreeSet<StreamId>),
152}
153
154impl PartialOrd for ListeningMode {
155    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
156        match (self, other) {
157            (ListeningMode::FullChain, ListeningMode::FullChain) => Some(Ordering::Equal),
158            (ListeningMode::FullChain, _) => Some(Ordering::Greater),
159            (_, ListeningMode::FullChain) => Some(Ordering::Less),
160            (ListeningMode::FollowChain, ListeningMode::FollowChain) => Some(Ordering::Equal),
161            (ListeningMode::FollowChain, ListeningMode::EventsOnly(_)) => Some(Ordering::Greater),
162            (ListeningMode::EventsOnly(_), ListeningMode::FollowChain) => Some(Ordering::Less),
163            (ListeningMode::EventsOnly(events_a), ListeningMode::EventsOnly(events_b)) => {
164                if events_a.is_superset(events_b) {
165                    Some(Ordering::Greater)
166                } else if events_b.is_superset(events_a) {
167                    Some(Ordering::Less)
168                } else {
169                    None
170                }
171            }
172        }
173    }
174}
175
176impl ListeningMode {
177    /// Returns whether a notification with this reason should be processed under this listening
178    /// mode.
179    pub fn is_relevant(&self, reason: &Reason) -> bool {
180        match (reason, self) {
181            // FullChain processes everything.
182            (_, ListeningMode::FullChain) => true,
183            // FollowChain processes new blocks on the chain itself, including blocks that
184            // produced events.
185            (Reason::NewBlock { .. }, ListeningMode::FollowChain) => true,
186            (Reason::NewEvents { .. }, ListeningMode::FollowChain) => true,
187            (_, ListeningMode::FollowChain) => false,
188            // EventsOnly only processes events from relevant streams.
189            (Reason::NewEvents { event_streams, .. }, ListeningMode::EventsOnly(relevant)) => {
190                relevant.intersection(event_streams).next().is_some()
191            }
192            (_, ListeningMode::EventsOnly(_)) => false,
193        }
194    }
195
196    pub fn extend(&mut self, other: Option<ListeningMode>) {
197        match (self, other) {
198            (_, None) => (),
199            (ListeningMode::FullChain, _) => (),
200            (mode, Some(ListeningMode::FullChain)) => {
201                *mode = ListeningMode::FullChain;
202            }
203            (ListeningMode::FollowChain, _) => (),
204            (mode, Some(ListeningMode::FollowChain)) => {
205                *mode = ListeningMode::FollowChain;
206            }
207            (
208                ListeningMode::EventsOnly(self_events),
209                Some(ListeningMode::EventsOnly(other_events)),
210            ) => {
211                self_events.extend(other_events);
212            }
213        }
214    }
215
216    /// Returns whether this mode implies follow-only behavior (i.e., not participating in
217    /// consensus rounds).
218    pub fn is_follow_only(&self) -> bool {
219        !matches!(self, ListeningMode::FullChain)
220    }
221
222    /// Returns whether this is a full chain mode (synchronizing sender chains and updating
223    /// inboxes).
224    pub fn is_full(&self) -> bool {
225        matches!(self, ListeningMode::FullChain)
226    }
227}
228
229/// A builder that creates [`ChainClient`]s which share the cache and notifiers.
230pub struct Client<Env: Environment> {
231    environment: Env,
232    /// Local node to manage the execution state and the local storage of the chains that we are
233    /// tracking.
234    pub local_node: LocalNodeClient<Env::Storage>,
235    /// Manages the requests sent to validator nodes.
236    requests_scheduler: RequestsScheduler<Env>,
237    /// The admin chain ID.
238    admin_chain_id: ChainId,
239    /// Chains that should be tracked by the client, along with their listening mode.
240    /// The presence of a chain in this map means it is tracked by the local node.
241    chain_modes: Arc<RwLock<BTreeMap<ChainId, ListeningMode>>>,
242    /// References to clients waiting for chain notifications.
243    notifier: Arc<ChannelNotifier<Notification>>,
244    /// Chain state for the managed chains.
245    chains: papaya::HashMap<ChainId, chain_client::State>,
246    /// Configuration options.
247    options: chain_client::Options,
248}
249
250impl<Env: Environment> Client<Env> {
251    /// Creates a new `Client` with a new cache and notifiers.
252    #[instrument(level = "trace", skip_all)]
253    #[allow(clippy::too_many_arguments)]
254    pub fn new(
255        environment: Env,
256        admin_chain_id: ChainId,
257        long_lived_services: bool,
258        chain_modes: impl IntoIterator<Item = (ChainId, ListeningMode)>,
259        name: impl Into<String>,
260        chain_worker_ttl: Duration,
261        sender_chain_worker_ttl: Duration,
262        options: chain_client::Options,
263        block_cache_size: usize,
264        execution_state_cache_size: usize,
265        requests_scheduler_config: requests_scheduler::RequestsSchedulerConfig,
266    ) -> Self {
267        let chain_modes = Arc::new(RwLock::new(chain_modes.into_iter().collect()));
268        let state = WorkerState::new_for_client(
269            name.into(),
270            environment.storage().clone(),
271            chain_modes.clone(),
272            block_cache_size,
273            execution_state_cache_size,
274        )
275        .with_long_lived_services(long_lived_services)
276        .with_allow_inactive_chains(true)
277        .with_allow_messages_from_deprecated_epochs(true)
278        .with_chain_worker_ttl(chain_worker_ttl)
279        .with_sender_chain_worker_ttl(sender_chain_worker_ttl);
280        let local_node = LocalNodeClient::new(state);
281        let requests_scheduler = RequestsScheduler::new(vec![], requests_scheduler_config);
282
283        Self {
284            environment,
285            local_node,
286            requests_scheduler,
287            chains: papaya::HashMap::new(),
288            admin_chain_id,
289            chain_modes,
290            notifier: Arc::new(ChannelNotifier::default()),
291            options,
292        }
293    }
294
295    /// Returns the chain ID of the admin chain.
296    pub fn admin_chain_id(&self) -> ChainId {
297        self.admin_chain_id
298    }
299
300    /// Returns the storage client used by this client's local node.
301    pub fn storage_client(&self) -> &Env::Storage {
302        self.environment.storage()
303    }
304
305    pub fn validator_node_provider(&self) -> &Env::Network {
306        self.environment.network()
307    }
308
309    /// Returns a reference to the client's [`Signer`][crate::environment::Signer].
310    #[instrument(level = "trace", skip(self))]
311    pub fn signer(&self) -> &Env::Signer {
312        self.environment.signer()
313    }
314
315    /// Returns whether the signer has a key for the given owner.
316    pub async fn has_key_for(&self, owner: &AccountOwner) -> Result<bool, chain_client::Error> {
317        self.signer()
318            .contains_key(owner)
319            .await
320            .map_err(chain_client::Error::signer_failure)
321    }
322
323    /// Returns a reference to the client's [`Wallet`][crate::environment::Wallet].
324    pub fn wallet(&self) -> &Env::Wallet {
325        self.environment.wallet()
326    }
327
328    /// Extends the listening mode for a chain, combining with the existing mode if present.
329    /// Returns the resulting mode.
330    #[instrument(level = "trace", skip(self))]
331    pub fn extend_chain_mode(&self, chain_id: ChainId, mode: ListeningMode) -> ListeningMode {
332        let mut chain_modes = self
333            .chain_modes
334            .write()
335            .expect("Panics should not happen while holding a lock to `chain_modes`");
336        let entry = chain_modes.entry(chain_id).or_insert(mode.clone());
337        entry.extend(Some(mode));
338        entry.clone()
339    }
340
341    /// Returns the listening mode for a chain, if it is tracked.
342    pub fn chain_mode(&self, chain_id: ChainId) -> Option<ListeningMode> {
343        self.chain_modes
344            .read()
345            .expect("Panics should not happen while holding a lock to `chain_modes`")
346            .get(&chain_id)
347            .cloned()
348    }
349
350    /// Returns whether a chain is fully tracked by the local node.
351    pub fn is_tracked(&self, chain_id: ChainId) -> bool {
352        self.chain_modes
353            .read()
354            .expect("Panics should not happen while holding a lock to `chain_modes`")
355            .get(&chain_id)
356            .is_some_and(ListeningMode::is_full)
357    }
358
359    /// Creates a new `ChainClient`.
360    #[expect(clippy::too_many_arguments)]
361    #[instrument(level = "trace", skip_all, fields(chain_id, next_block_height))]
362    pub fn create_chain_client(
363        self: &Arc<Self>,
364        chain_id: ChainId,
365        block_hash: Option<CryptoHash>,
366        next_block_height: BlockHeight,
367        pending_proposal: Option<PendingProposal>,
368        preferred_owner: Option<AccountOwner>,
369        timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
370        follow_only: bool,
371    ) -> ChainClient<Env> {
372        // If the entry already exists we assume that the entry is more up to date than
373        // the arguments: If they were read from the wallet file, they might be stale.
374        self.chains.pin().get_or_insert_with(chain_id, || {
375            chain_client::State::new(pending_proposal.clone(), follow_only)
376        });
377
378        ChainClient::new(
379            self.clone(),
380            chain_id,
381            self.options.clone(),
382            block_hash,
383            next_block_height,
384            preferred_owner,
385            timing_sender,
386        )
387    }
388
389    /// Returns whether the given chain is in follow-only mode.
390    fn is_chain_follow_only(&self, chain_id: ChainId) -> bool {
391        self.chains
392            .pin()
393            .get(&chain_id)
394            .is_some_and(|state| state.is_follow_only())
395    }
396
397    /// Sets whether the given chain is in follow-only mode.
398    pub fn set_chain_follow_only(&self, chain_id: ChainId, follow_only: bool) {
399        self.chains.pin().update(chain_id, |state| {
400            let mut state = state.clone_for_update_unchecked();
401            state.set_follow_only(follow_only);
402            state
403        });
404    }
405
406    /// Fetches the chain description blob if needed, and returns the chain info.
407    async fn fetch_chain_info(
408        &self,
409        chain_id: ChainId,
410        validators: &[RemoteNode<Env::ValidatorNode>],
411    ) -> Result<Box<ChainInfo>, chain_client::Error> {
412        match self.local_node.chain_info(chain_id).await {
413            Ok(info) => Ok(info),
414            Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
415                // Make sure the admin chain is up to date.
416                self.synchronize_chain_state(self.admin_chain_id).await?;
417                // If the chain is missing then the error is a WorkerError
418                // and so a BlobsNotFound
419                self.update_local_node_with_blobs_from(blob_ids, validators)
420                    .await?;
421                Ok(self.local_node.chain_info(chain_id).await?)
422            }
423            Err(err) => Err(err.into()),
424        }
425    }
426
427    fn weighted_select(
428        remaining_validators: &mut Vec<RemoteNode<Env::ValidatorNode>>,
429        remaining_weights: &mut Vec<u64>,
430    ) -> Option<RemoteNode<Env::ValidatorNode>> {
431        if remaining_weights.is_empty() {
432            return None;
433        }
434        let dist = WeightedIndex::new(remaining_weights.clone()).unwrap();
435        let idx = dist.sample(&mut rand::thread_rng());
436        remaining_weights.remove(idx);
437        Some(remaining_validators.remove(idx))
438    }
439
440    /// Downloads and processes all certificates up to (excluding) the specified height.
441    #[instrument(level = "trace", skip(self))]
442    async fn download_certificates(
443        &self,
444        chain_id: ChainId,
445        target_next_block_height: BlockHeight,
446    ) -> Result<Box<ChainInfo>, chain_client::Error> {
447        let (_, committee) = self.admin_committee().await?;
448        let mut remaining_validators = self.make_nodes(&committee)?;
449        let mut info = self
450            .fetch_chain_info(chain_id, &remaining_validators)
451            .await?;
452        // Determining the weights of the validators
453        let mut remaining_weights = remaining_validators
454            .iter()
455            .map(|validator| {
456                let validator_state = committee.validators.get(&validator.public_key).unwrap();
457                validator_state.votes
458            })
459            .collect::<Vec<_>>();
460
461        while let Some(remote_node) =
462            Self::weighted_select(&mut remaining_validators, &mut remaining_weights)
463        {
464            if target_next_block_height <= info.next_block_height {
465                return Ok(info);
466            }
467            match self
468                .download_certificates_from(&remote_node, chain_id, target_next_block_height)
469                .await
470            {
471                Err(error) => info!(
472                    remote_node = remote_node.address(),
473                    %error,
474                    "failed to download certificates from validator",
475                ),
476                Ok(Some(new_info)) => info = new_info,
477                Ok(None) => {}
478            }
479        }
480        ensure!(
481            target_next_block_height <= info.next_block_height,
482            chain_client::Error::CannotDownloadCertificates {
483                chain_id,
484                target_next_block_height,
485            }
486        );
487        Ok(info)
488    }
489
490    /// Downloads and processes all certificates up to (excluding) the specified height from the
491    /// given validator.
492    #[instrument(level = "trace", skip_all)]
493    async fn download_certificates_from(
494        &self,
495        remote_node: &RemoteNode<Env::ValidatorNode>,
496        chain_id: ChainId,
497        stop: BlockHeight,
498    ) -> Result<Option<Box<ChainInfo>>, chain_client::Error> {
499        let mut last_info = None;
500        // First load any blocks from local storage, if available.
501        let chain_info = self.local_node.chain_info(chain_id).await?;
502        let mut next_height = chain_info.next_block_height;
503        let hashes = self
504            .local_node
505            .get_preprocessed_block_hashes(chain_id, next_height, stop)
506            .await?;
507        let certificates = self.storage_client().read_certificates(&hashes).await?;
508        let certificates = match ResultReadCertificates::new(certificates, hashes) {
509            ResultReadCertificates::Certificates(certificates) => certificates,
510            ResultReadCertificates::InvalidHashes(hashes) => {
511                return Err(chain_client::Error::ReadCertificatesError(hashes))
512            }
513        };
514        for certificate in certificates {
515            last_info = Some(self.handle_certificate(certificate).await?.info);
516        }
517        // Now download the rest in batches from the remote node.
518        while next_height < stop {
519            // TODO(#2045): Analyze network errors instead of using a fixed batch size.
520            let limit = u64::from(stop)
521                .checked_sub(u64::from(next_height))
522                .ok_or(ArithmeticError::Overflow)?
523                .min(self.options.certificate_download_batch_size);
524
525            let certificates = self
526                .requests_scheduler
527                .download_certificates(remote_node, chain_id, next_height, limit)
528                .await?;
529            let Some(info) = self.process_certificates(remote_node, certificates).await? else {
530                break;
531            };
532            assert!(info.next_block_height > next_height);
533            next_height = info.next_block_height;
534            last_info = Some(info);
535        }
536        Ok(last_info)
537    }
538
539    async fn download_blobs(
540        &self,
541        remote_nodes: &[RemoteNode<Env::ValidatorNode>],
542        blob_ids: &[BlobId],
543    ) -> Result<(), chain_client::Error> {
544        let blobs = &self
545            .requests_scheduler
546            .download_blobs(remote_nodes, blob_ids, self.options.blob_download_timeout)
547            .await?
548            .ok_or_else(|| {
549                chain_client::Error::RemoteNodeError(NodeError::BlobsNotFound(blob_ids.to_vec()))
550            })?;
551        self.local_node.store_blobs(blobs).await.map_err(Into::into)
552    }
553
554    /// Tries to process all the certificates, requesting any missing blobs from the given node.
555    /// Returns the chain info of the last successfully processed certificate.
556    #[instrument(level = "trace", skip_all)]
557    async fn process_certificates(
558        &self,
559        remote_node: &RemoteNode<Env::ValidatorNode>,
560        certificates: Vec<ConfirmedBlockCertificate>,
561    ) -> Result<Option<Box<ChainInfo>>, chain_client::Error> {
562        let mut info = None;
563        let required_blob_ids: Vec<_> = certificates
564            .iter()
565            .flat_map(|certificate| certificate.value().required_blob_ids())
566            .collect();
567
568        match self
569            .local_node
570            .read_blob_states_from_storage(&required_blob_ids)
571            .await
572        {
573            Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
574                self.download_blobs(std::slice::from_ref(remote_node), &blob_ids)
575                    .await?;
576            }
577            x => {
578                x?;
579            }
580        }
581
582        for certificate in certificates {
583            info = Some(
584                match self.handle_certificate(certificate.clone()).await {
585                    Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
586                        self.download_blobs(std::slice::from_ref(remote_node), &blob_ids)
587                            .await?;
588                        self.handle_certificate(certificate).await?
589                    }
590                    x => x?,
591                }
592                .info,
593            );
594        }
595
596        // Done with all certificates.
597        Ok(info)
598    }
599
600    async fn handle_certificate<T: ProcessableCertificate>(
601        &self,
602        certificate: GenericCertificate<T>,
603    ) -> Result<ChainInfoResponse, LocalNodeError> {
604        self.local_node
605            .handle_certificate(certificate, &self.notifier)
606            .await
607    }
608
609    async fn chain_info_with_committees(
610        &self,
611        chain_id: ChainId,
612    ) -> Result<Box<ChainInfo>, LocalNodeError> {
613        let query = ChainInfoQuery::new(chain_id).with_committees();
614        let info = self.local_node.handle_chain_info_query(query).await?.info;
615        Ok(info)
616    }
617
618    /// Obtains all the committees trusted by any of the given chains. Also returns the highest
619    /// of their epochs.
620    #[instrument(level = "trace", skip_all)]
621    async fn admin_committees(
622        &self,
623    ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
624        let info = self.chain_info_with_committees(self.admin_chain_id).await?;
625        Ok((info.epoch, info.into_committees()?))
626    }
627
628    /// Obtains the committee for the latest epoch on the admin chain.
629    pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
630        let info = self.chain_info_with_committees(self.admin_chain_id).await?;
631        Ok((info.epoch, info.into_current_committee()?))
632    }
633
634    /// Obtains the validators for the latest epoch.
635    async fn validator_nodes(
636        &self,
637    ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, chain_client::Error> {
638        let (_, committee) = self.admin_committee().await?;
639        Ok(self.make_nodes(&committee)?)
640    }
641
642    /// Creates a [`RemoteNode`] for each validator in the committee.
643    fn make_nodes(
644        &self,
645        committee: &Committee,
646    ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, NodeError> {
647        Ok(self
648            .validator_node_provider()
649            .make_nodes(committee)?
650            .map(|(public_key, node)| RemoteNode { public_key, node })
651            .collect())
652    }
653
654    /// Ensures that the client has the `ChainDescription` blob corresponding to this
655    /// client's `ChainId`, and returns the chain description blob.
656    pub async fn get_chain_description_blob(
657        &self,
658        chain_id: ChainId,
659    ) -> Result<Blob, chain_client::Error> {
660        let chain_desc_id = BlobId::new(chain_id.0, BlobType::ChainDescription);
661        let blob = self
662            .local_node
663            .storage_client()
664            .read_blob(chain_desc_id)
665            .await?;
666        if let Some(blob) = blob {
667            // We have the blob - return it.
668            return Ok(blob);
669        }
670        // Recover history from the current validators, according to the admin chain.
671        self.synchronize_chain_state(self.admin_chain_id).await?;
672        let nodes = self.validator_nodes().await?;
673        Ok(self
674            .update_local_node_with_blobs_from(vec![chain_desc_id], &nodes)
675            .await?
676            .pop()
677            .unwrap()) // Returns exactly as many blobs as passed-in IDs.
678    }
679
680    /// Ensures that the client has the `ChainDescription` blob corresponding to this
681    /// client's `ChainId`, and returns the chain description.
682    pub async fn get_chain_description(
683        &self,
684        chain_id: ChainId,
685    ) -> Result<ChainDescription, chain_client::Error> {
686        let blob = self.get_chain_description_blob(chain_id).await?;
687        Ok(bcs::from_bytes(blob.bytes())?)
688    }
689
690    /// Updates the latest block and next block height and round information from the chain info.
691    #[instrument(level = "trace", skip_all, fields(chain_id = format!("{:.8}", info.chain_id)))]
692    fn update_from_info(&self, info: &ChainInfo) {
693        self.chains.pin().update(info.chain_id, |state| {
694            let mut state = state.clone_for_update_unchecked();
695            state.update_from_info(info);
696            state
697        });
698    }
699
700    /// Handles the certificate in the local node and the resulting notifications.
701    #[instrument(level = "trace", skip_all)]
702    async fn process_certificate<T: ProcessableCertificate>(
703        &self,
704        certificate: Box<GenericCertificate<T>>,
705    ) -> Result<(), LocalNodeError> {
706        let info = self.handle_certificate(*certificate).await?.info;
707        self.update_from_info(&info);
708        Ok(())
709    }
710
711    /// Submits a validated block for finalization and returns the confirmed block certificate.
712    #[instrument(level = "trace", skip_all)]
713    pub(crate) async fn finalize_block(
714        self: &Arc<Self>,
715        committee: &Committee,
716        certificate: ValidatedBlockCertificate,
717    ) -> Result<ConfirmedBlockCertificate, chain_client::Error> {
718        debug!(round = %certificate.round, "Submitting block for confirmation");
719        let hashed_value = ConfirmedBlock::new(certificate.inner().block().clone());
720        let finalize_action = CommunicateAction::FinalizeBlock {
721            certificate: Box::new(certificate),
722            delivery: self.options.cross_chain_message_delivery,
723        };
724        let certificate = self
725            .communicate_chain_action(committee, finalize_action, hashed_value)
726            .await?;
727        self.receive_certificate_with_checked_signatures(certificate.clone())
728            .await?;
729        Ok(certificate)
730    }
731
732    /// Submits a block proposal to the validators.
733    #[instrument(level = "trace", skip_all)]
734    pub(crate) async fn submit_block_proposal<T: ProcessableCertificate>(
735        self: &Arc<Self>,
736        committee: &Committee,
737        proposal: Box<BlockProposal>,
738        value: T,
739    ) -> Result<GenericCertificate<T>, chain_client::Error> {
740        use linera_storage::Clock as _;
741
742        debug!(
743            round = %proposal.content.round,
744            "Submitting block proposal to validators"
745        );
746
747        // Check if the block timestamp is in the future and log INFO.
748        let block_timestamp = proposal.content.block.timestamp;
749        let local_time = self.local_node.storage_client().clock().current_time();
750        if block_timestamp > local_time {
751            info!(
752                chain_id = %proposal.content.block.chain_id,
753                %block_timestamp,
754                %local_time,
755                "Block timestamp is in the future; waiting for validators",
756            );
757        }
758
759        // Create channel for clock skew reports from validators.
760        let (clock_skew_sender, mut clock_skew_receiver) = mpsc::unbounded_channel();
761        let submit_action = CommunicateAction::SubmitBlock {
762            proposal,
763            blob_ids: value.required_blob_ids().into_iter().collect(),
764            clock_skew_sender,
765        };
766
767        // Spawn a task to monitor clock skew reports and warn if threshold is reached.
768        let validity_threshold = committee.validity_threshold();
769        let committee_clone = committee.clone();
770        let clock_skew_check_handle = linera_base::Task::spawn(async move {
771            let mut skew_weight = 0u64;
772            let mut min_skew = TimeDelta::MAX;
773            let mut max_skew = TimeDelta::ZERO;
774            while let Some((public_key, clock_skew)) = clock_skew_receiver.recv().await {
775                if clock_skew.as_micros() > 0 {
776                    skew_weight += committee_clone.weight(&public_key);
777                    min_skew = min_skew.min(clock_skew);
778                    max_skew = max_skew.max(clock_skew);
779                    if skew_weight >= validity_threshold {
780                        warn!(
781                            skew_weight,
782                            validity_threshold,
783                            min_skew_ms = min_skew.as_micros() / 1000,
784                            max_skew_ms = max_skew.as_micros() / 1000,
785                            "A validity threshold of validators reported clock skew; \
786                             consider checking your system clock",
787                        );
788                        return;
789                    }
790                }
791            }
792        });
793
794        let certificate = self
795            .communicate_chain_action(committee, submit_action, value)
796            .await?;
797
798        clock_skew_check_handle.await;
799
800        self.process_certificate(Box::new(certificate.clone()))
801            .await?;
802        Ok(certificate)
803    }
804
805    /// Broadcasts certified blocks to validators.
806    #[instrument(level = "trace", skip_all, fields(chain_id, block_height, delivery))]
807    async fn communicate_chain_updates(
808        self: &Arc<Self>,
809        committee: &Committee,
810        chain_id: ChainId,
811        height: BlockHeight,
812        delivery: CrossChainMessageDelivery,
813        latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
814    ) -> Result<(), chain_client::Error> {
815        let nodes = self.make_nodes(committee)?;
816        communicate_with_quorum(
817            &nodes,
818            committee,
819            |_: &()| (),
820            |remote_node| {
821                let mut updater = ValidatorUpdater {
822                    remote_node,
823                    client: self.clone(),
824                    admin_chain_id: self.admin_chain_id,
825                };
826                let certificate = latest_certificate.clone();
827                Box::pin(async move {
828                    updater
829                        .send_chain_information(chain_id, height, delivery, certificate)
830                        .await
831                })
832            },
833            self.options.quorum_grace_period,
834        )
835        .await?;
836        Ok(())
837    }
838
839    /// Broadcasts certified blocks and optionally a block proposal, certificate or
840    /// leader timeout request.
841    ///
842    /// In that case, it verifies that the validator votes are for the provided value,
843    /// and returns a certificate.
844    #[instrument(level = "trace", skip_all)]
845    async fn communicate_chain_action<T: CertificateValue>(
846        self: &Arc<Self>,
847        committee: &Committee,
848        action: CommunicateAction,
849        value: T,
850    ) -> Result<GenericCertificate<T>, chain_client::Error> {
851        let nodes = self.make_nodes(committee)?;
852        let ((votes_hash, votes_round), votes) = communicate_with_quorum(
853            &nodes,
854            committee,
855            |vote: &LiteVote| (vote.value.value_hash, vote.round),
856            |remote_node| {
857                let mut updater = ValidatorUpdater {
858                    remote_node,
859                    client: self.clone(),
860                    admin_chain_id: self.admin_chain_id,
861                };
862                let action = action.clone();
863                Box::pin(async move { updater.send_chain_update(action).await })
864            },
865            self.options.quorum_grace_period,
866        )
867        .await?;
868        ensure!(
869            (votes_hash, votes_round) == (value.hash(), action.round()),
870            chain_client::Error::UnexpectedQuorum {
871                hash: votes_hash,
872                round: votes_round,
873                expected_hash: value.hash(),
874                expected_round: action.round(),
875            }
876        );
877        // Certificate is valid because
878        // * `communicate_with_quorum` ensured a sufficient "weight" of
879        // (non-error) answers were returned by validators.
880        // * each answer is a vote signed by the expected validator.
881        let certificate = LiteCertificate::try_from_votes(votes)
882            .ok_or_else(|| {
883                chain_client::Error::InternalError(
884                    "Vote values or rounds don't match; this is a bug",
885                )
886            })?
887            .with_value(value)
888            .ok_or_else(|| {
889                chain_client::Error::ProtocolError("A quorum voted for an unexpected value")
890            })?;
891        Ok(certificate)
892    }
893
894    /// Processes the confirmed block certificate in the local node without checking signatures.
895    /// Also downloads and processes all ancestors that are still missing.
896    #[instrument(level = "trace", skip_all)]
897    async fn receive_certificate_with_checked_signatures(
898        &self,
899        certificate: ConfirmedBlockCertificate,
900    ) -> Result<(), chain_client::Error> {
901        let certificate = Box::new(certificate);
902        let block = certificate.block();
903        // Recover history from the network.
904        self.download_certificates(block.header.chain_id, block.header.height)
905            .await?;
906        // Process the received operations. Download required hashed certificate values if
907        // necessary.
908        if let Err(err) = self.process_certificate(certificate.clone()).await {
909            match &err {
910                LocalNodeError::BlobsNotFound(blob_ids) => {
911                    self.download_blobs(&self.validator_nodes().await?, blob_ids)
912                        .await
913                        .map_err(|_| err)?;
914                    self.process_certificate(certificate).await?;
915                }
916                _ => {
917                    // The certificate is not as expected. Give up.
918                    warn!("Failed to process network hashed certificate value");
919                    return Err(err.into());
920                }
921            }
922        }
923
924        Ok(())
925    }
926
927    /// Processes the confirmed block in the local node, possibly without executing it.
928    #[instrument(level = "trace", skip_all)]
929    #[allow(dead_code)] // Otherwise CI fails when built for docker.
930    async fn receive_sender_certificate(
931        &self,
932        certificate: ConfirmedBlockCertificate,
933        mode: ReceiveCertificateMode,
934        nodes: Option<Vec<RemoteNode<Env::ValidatorNode>>>,
935    ) -> Result<(), chain_client::Error> {
936        // Verify the certificate before doing any expensive networking.
937        let (max_epoch, committees) = self.admin_committees().await?;
938        if let ReceiveCertificateMode::NeedsCheck = mode {
939            Self::check_certificate(max_epoch, &committees, &certificate)?.into_result()?;
940        }
941        // Recover history from the network.
942        let nodes = if let Some(nodes) = nodes {
943            nodes
944        } else {
945            self.validator_nodes().await?
946        };
947        if let Err(err) = self.handle_certificate(certificate.clone()).await {
948            match &err {
949                LocalNodeError::BlobsNotFound(blob_ids) => {
950                    self.download_blobs(&nodes, blob_ids).await?;
951                    self.handle_certificate(certificate.clone()).await?;
952                }
953                _ => {
954                    // The certificate is not as expected. Give up.
955                    warn!("Failed to process network hashed certificate value");
956                    return Err(err.into());
957                }
958            }
959        }
960
961        Ok(())
962    }
963
964    /// Downloads and processes certificates for sender chain blocks.
965    #[instrument(level = "trace", skip_all)]
966    async fn download_and_process_sender_chain(
967        &self,
968        sender_chain_id: ChainId,
969        nodes: &[RemoteNode<Env::ValidatorNode>],
970        received_log: &ReceivedLogs,
971        mut remote_heights: Vec<BlockHeight>,
972        sender: mpsc::UnboundedSender<ChainAndHeight>,
973    ) {
974        let (max_epoch, committees) = match self.admin_committees().await {
975            Ok(result) => result,
976            Err(error) => {
977                error!(%error, %sender_chain_id, "could not read admin committees");
978                return;
979            }
980        };
981        let committees_ref = &committees;
982        let mut nodes = nodes.to_vec();
983        while !remote_heights.is_empty() {
984            let remote_heights_ref = &remote_heights;
985            nodes.shuffle(&mut rand::thread_rng());
986            let certificates = match communicate_concurrently(
987                &nodes,
988                async move |remote_node| {
989                    let mut remote_heights = remote_heights_ref.clone();
990                    // No need trying to download certificates the validator didn't have in their
991                    // log - we'll retry downloading the remaining ones next time we loop.
992                    remote_heights.retain(|height| {
993                        received_log.validator_has_block(
994                            &remote_node.public_key,
995                            sender_chain_id,
996                            *height,
997                        )
998                    });
999                    if remote_heights.is_empty() {
1000                        // It makes no sense to return `Ok(_)` if we aren't going to try downloading
1001                        // anything from the validator - let the function try the other validators
1002                        return Err(());
1003                    }
1004                    let certificates = self
1005                        .requests_scheduler
1006                        .download_certificates_by_heights(
1007                            &remote_node,
1008                            sender_chain_id,
1009                            remote_heights,
1010                        )
1011                        .await
1012                        .map_err(|_| ())?;
1013                    let mut certificates_with_check_results = vec![];
1014                    for cert in certificates {
1015                        if let Ok(check_result) =
1016                            Self::check_certificate(max_epoch, committees_ref, &cert)
1017                        {
1018                            certificates_with_check_results
1019                                .push((cert, check_result.into_result().is_ok()));
1020                        } else {
1021                            // Invalid signature - the validator is faulty
1022                            return Err(());
1023                        }
1024                    }
1025                    Ok(certificates_with_check_results)
1026                },
1027                |errors| {
1028                    errors
1029                        .into_iter()
1030                        .map(|(validator, _error)| validator)
1031                        .collect::<BTreeSet<_>>()
1032                },
1033                self.options.certificate_batch_download_timeout,
1034            )
1035            .await
1036            {
1037                Ok(certificates_with_check_results) => certificates_with_check_results,
1038                Err(faulty_validators) => {
1039                    // filter out faulty validators and retry if any are left
1040                    nodes.retain(|node| !faulty_validators.contains(&node.public_key));
1041                    if nodes.is_empty() {
1042                        info!(
1043                            chain_id = %sender_chain_id,
1044                            "could not download certificates for chain - no more correct validators left"
1045                        );
1046                        return;
1047                    }
1048                    continue;
1049                }
1050            };
1051
1052            trace!(
1053                chain_id = %sender_chain_id,
1054                num_certificates = %certificates.len(),
1055                "received certificates",
1056            );
1057
1058            let mut to_remove_from_queue = BTreeSet::new();
1059
1060            for (certificate, check_result) in certificates {
1061                let hash = certificate.hash();
1062                let chain_id = certificate.block().header.chain_id;
1063                let height = certificate.block().header.height;
1064                if !check_result {
1065                    // The certificate was correctly signed, but we were missing a committee to
1066                    // validate it properly - do not receive it, but also do not attempt to
1067                    // re-download it.
1068                    to_remove_from_queue.insert(height);
1069                    continue;
1070                }
1071                // We checked the certificates right after downloading them.
1072                let mode = ReceiveCertificateMode::AlreadyChecked;
1073                if let Err(error) = self
1074                    .receive_sender_certificate(certificate, mode, None)
1075                    .await
1076                {
1077                    warn!(%error, %hash, "Received invalid certificate");
1078                } else {
1079                    to_remove_from_queue.insert(height);
1080                    if let Err(error) = sender.send(ChainAndHeight { chain_id, height }) {
1081                        error!(
1082                            %chain_id,
1083                            %height,
1084                            %error,
1085                            "failed to send chain and height over the channel",
1086                        );
1087                    }
1088                }
1089            }
1090
1091            remote_heights.retain(|height| !to_remove_from_queue.contains(height));
1092        }
1093        trace!(
1094            chain_id = %sender_chain_id,
1095            "find_received_certificates: finished processing chain",
1096        );
1097    }
1098
1099    /// Downloads the log of received messages for a chain from a validator.
1100    #[instrument(level = "trace", skip(self))]
1101    async fn get_received_log_from_validator(
1102        &self,
1103        chain_id: ChainId,
1104        remote_node: &RemoteNode<Env::ValidatorNode>,
1105        tracker: u64,
1106    ) -> Result<Vec<ChainAndHeight>, chain_client::Error> {
1107        let mut offset = tracker;
1108
1109        // Retrieve the list of newly received certificates from this validator.
1110        let mut remote_log = Vec::new();
1111        loop {
1112            trace!("get_received_log_from_validator: looping");
1113            let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_n(offset);
1114            let info = remote_node.handle_chain_info_query(query).await?;
1115            let received_entries = info.requested_received_log.len();
1116            offset += received_entries as u64;
1117            remote_log.extend(info.requested_received_log);
1118            trace!(
1119                remote_node = remote_node.address(),
1120                %received_entries,
1121                "get_received_log_from_validator: received log batch",
1122            );
1123            if received_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES {
1124                break;
1125            }
1126        }
1127
1128        trace!(
1129            remote_node = remote_node.address(),
1130            num_entries = remote_log.len(),
1131            "get_received_log_from_validator: returning downloaded log",
1132        );
1133
1134        Ok(remote_log)
1135    }
1136
1137    /// Downloads a specific sender block and recursively downloads any earlier blocks
1138    /// that also sent a message to our chain, based on `previous_message_blocks`.
1139    ///
1140    /// This ensures that we have all the sender blocks needed to preprocess the target block
1141    /// and put the messages to our chain into the outbox.
1142    async fn download_sender_block_with_sending_ancestors(
1143        &self,
1144        receiver_chain_id: ChainId,
1145        sender_chain_id: ChainId,
1146        height: BlockHeight,
1147        remote_node: &RemoteNode<Env::ValidatorNode>,
1148    ) -> Result<(), chain_client::Error> {
1149        let next_outbox_height = self
1150            .local_node
1151            .next_outbox_heights(&[sender_chain_id], receiver_chain_id)
1152            .await?
1153            .get(&sender_chain_id)
1154            .copied()
1155            .unwrap_or(BlockHeight::ZERO);
1156        let (max_epoch, committees) = self.admin_committees().await?;
1157
1158        // Recursively collect all certificates we need, following
1159        // the chain of previous_message_blocks back to next_outbox_height.
1160        let mut certificates = BTreeMap::new();
1161        let mut current_height = height;
1162
1163        // Stop if we've reached the height we've already processed.
1164        while current_height >= next_outbox_height {
1165            // Download the certificate for this height.
1166            let downloaded = self
1167                .requests_scheduler
1168                .download_certificates_by_heights(
1169                    remote_node,
1170                    sender_chain_id,
1171                    vec![current_height],
1172                )
1173                .await?;
1174            let Some(certificate) = downloaded.into_iter().next() else {
1175                return Err(chain_client::Error::CannotDownloadMissingSenderBlock {
1176                    chain_id: sender_chain_id,
1177                    height: current_height,
1178                });
1179            };
1180
1181            // Validate the certificate.
1182            Client::<Env>::check_certificate(max_epoch, &committees, &certificate)?
1183                .into_result()?;
1184
1185            // Check if there's a previous message block to our chain.
1186            let block = certificate.block();
1187            let next_height = block
1188                .body
1189                .previous_message_blocks
1190                .get(&receiver_chain_id)
1191                .map(|(_prev_hash, prev_height)| *prev_height);
1192
1193            // Store this certificate.
1194            certificates.insert(current_height, certificate);
1195
1196            if let Some(prev_height) = next_height {
1197                // Continue with the previous block.
1198                current_height = prev_height;
1199            } else {
1200                // No more dependencies.
1201                break;
1202            }
1203        }
1204
1205        if certificates.is_empty() {
1206            self.local_node
1207                .retry_pending_cross_chain_requests(sender_chain_id)
1208                .await?;
1209        }
1210
1211        // Process certificates in ascending block height order (BTreeMap keeps them sorted).
1212        for certificate in certificates.into_values() {
1213            self.receive_sender_certificate(
1214                certificate,
1215                ReceiveCertificateMode::AlreadyChecked,
1216                Some(vec![remote_node.clone()]),
1217            )
1218            .await?;
1219        }
1220
1221        Ok(())
1222    }
1223
1224    #[instrument(
1225        level = "trace", skip_all,
1226        fields(certificate_hash = ?incoming_certificate.hash()),
1227    )]
1228    fn check_certificate(
1229        highest_known_epoch: Epoch,
1230        committees: &BTreeMap<Epoch, Committee>,
1231        incoming_certificate: &ConfirmedBlockCertificate,
1232    ) -> Result<CheckCertificateResult, NodeError> {
1233        let block = incoming_certificate.block();
1234        // Check that certificates are valid w.r.t one of our trusted committees.
1235        if block.header.epoch > highest_known_epoch {
1236            return Ok(CheckCertificateResult::FutureEpoch);
1237        }
1238        if let Some(known_committee) = committees.get(&block.header.epoch) {
1239            // This epoch is recognized by our chain. Let's verify the
1240            // certificate.
1241            incoming_certificate.check(known_committee)?;
1242            Ok(CheckCertificateResult::New)
1243        } else {
1244            // We don't accept a certificate from a committee that was retired.
1245            Ok(CheckCertificateResult::OldEpoch)
1246        }
1247    }
1248
1249    /// Downloads and processes any certificates we are missing for the given chain.
1250    ///
1251    /// Whether manager values are fetched depends on the chain's follow-only state.
1252    #[instrument(level = "trace", skip_all)]
1253    async fn synchronize_chain_state(
1254        &self,
1255        chain_id: ChainId,
1256    ) -> Result<Box<ChainInfo>, chain_client::Error> {
1257        let (_, committee) = self.admin_committee().await?;
1258        self.synchronize_chain_from_committee(chain_id, committee)
1259            .await
1260    }
1261
1262    /// Downloads certificates for the given chain from the given committee.
1263    ///
1264    /// If the chain is not in follow-only mode, also fetches and processes manager values
1265    /// (timeout certificates, proposals, locking blocks) for consensus participation.
1266    #[instrument(level = "trace", skip_all)]
1267    pub(crate) async fn synchronize_chain_from_committee(
1268        &self,
1269        chain_id: ChainId,
1270        committee: Committee,
1271    ) -> Result<Box<ChainInfo>, chain_client::Error> {
1272        #[cfg(with_metrics)]
1273        let _latency = if !self.is_chain_follow_only(chain_id) {
1274            Some(metrics::SYNCHRONIZE_CHAIN_STATE_LATENCY.measure_latency())
1275        } else {
1276            None
1277        };
1278
1279        let validators = self.make_nodes(&committee)?;
1280        Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
1281        communicate_with_quorum(
1282            &validators,
1283            &committee,
1284            |_: &()| (),
1285            |remote_node| async move {
1286                self.synchronize_chain_state_from(&remote_node, chain_id)
1287                    .await
1288            },
1289            self.options.quorum_grace_period,
1290        )
1291        .await?;
1292
1293        self.local_node
1294            .chain_info(chain_id)
1295            .await
1296            .map_err(Into::into)
1297    }
1298
1299    /// Downloads any certificates from the specified validator that we are missing for the given
1300    /// chain.
1301    ///
1302    /// If the chain is not in follow-only mode, also fetches and processes manager values
1303    /// (timeout certificates, proposals, locking blocks) for consensus participation.
1304    #[instrument(level = "trace", skip(self, remote_node, chain_id))]
1305    pub(crate) async fn synchronize_chain_state_from(
1306        &self,
1307        remote_node: &RemoteNode<Env::ValidatorNode>,
1308        chain_id: ChainId,
1309    ) -> Result<(), chain_client::Error> {
1310        let with_manager_values = !self.is_chain_follow_only(chain_id);
1311        let query = if with_manager_values {
1312            ChainInfoQuery::new(chain_id).with_manager_values()
1313        } else {
1314            ChainInfoQuery::new(chain_id)
1315        };
1316        let remote_info = remote_node.handle_chain_info_query(query).await?;
1317        let local_info = self
1318            .download_certificates_from(remote_node, chain_id, remote_info.next_block_height)
1319            .await?;
1320
1321        if !with_manager_values {
1322            return Ok(());
1323        }
1324
1325        // If we are at the same height as the remote node, we also update our chain manager.
1326        let local_height = match local_info {
1327            Some(info) => info.next_block_height,
1328            None => {
1329                self.local_node
1330                    .chain_info(chain_id)
1331                    .await?
1332                    .next_block_height
1333            }
1334        };
1335        if local_height != remote_info.next_block_height {
1336            debug!(
1337                remote_node = remote_node.address(),
1338                remote_height = %remote_info.next_block_height,
1339                local_height = %local_height,
1340                "synced from validator, but remote height and local height are different",
1341            );
1342            return Ok(());
1343        };
1344
1345        if let Some(timeout) = remote_info.manager.timeout {
1346            self.handle_certificate(*timeout).await?;
1347        }
1348        let mut proposals = Vec::new();
1349        if let Some(proposal) = remote_info.manager.requested_signed_proposal {
1350            proposals.push(*proposal);
1351        }
1352        if let Some(proposal) = remote_info.manager.requested_proposed {
1353            proposals.push(*proposal);
1354        }
1355        if let Some(locking) = remote_info.manager.requested_locking {
1356            match *locking {
1357                LockingBlock::Fast(proposal) => {
1358                    proposals.push(proposal);
1359                }
1360                LockingBlock::Regular(cert) => {
1361                    let hash = cert.hash();
1362                    if let Err(error) = self.try_process_locking_block_from(remote_node, cert).await
1363                    {
1364                        debug!(
1365                            remote_node = remote_node.address(),
1366                            %hash,
1367                            height = %local_height,
1368                            %error,
1369                            "skipping locked block from validator",
1370                        );
1371                    }
1372                }
1373            }
1374        }
1375        'proposal_loop: for proposal in proposals {
1376            let owner: AccountOwner = proposal.owner();
1377            if let Err(mut err) = self
1378                .local_node
1379                .handle_block_proposal(proposal.clone())
1380                .await
1381            {
1382                if let LocalNodeError::BlobsNotFound(_) = &err {
1383                    let required_blob_ids = proposal.required_blob_ids().collect::<Vec<_>>();
1384                    if !required_blob_ids.is_empty() {
1385                        let mut blobs = Vec::new();
1386                        for blob_id in required_blob_ids {
1387                            let blob_content = match self
1388                                .requests_scheduler
1389                                .download_pending_blob(remote_node, chain_id, blob_id)
1390                                .await
1391                            {
1392                                Ok(content) => content,
1393                                Err(error) => {
1394                                    info!(
1395                                        remote_node = remote_node.address(),
1396                                        height = %local_height,
1397                                        proposer = %owner,
1398                                        %blob_id,
1399                                        %error,
1400                                        "skipping proposal from validator; failed to download blob",
1401                                    );
1402                                    continue 'proposal_loop;
1403                                }
1404                            };
1405                            blobs.push(Blob::new(blob_content));
1406                        }
1407                        self.local_node
1408                            .handle_pending_blobs(chain_id, blobs)
1409                            .await?;
1410                        // We found the missing blobs: retry.
1411                        if let Err(new_err) = self
1412                            .local_node
1413                            .handle_block_proposal(proposal.clone())
1414                            .await
1415                        {
1416                            err = new_err;
1417                        } else {
1418                            continue;
1419                        }
1420                    }
1421                    if let LocalNodeError::BlobsNotFound(blob_ids) = &err {
1422                        self.update_local_node_with_blobs_from(
1423                            blob_ids.clone(),
1424                            std::slice::from_ref(remote_node),
1425                        )
1426                        .await?;
1427                        // We found the missing blobs: retry.
1428                        if let Err(new_err) = self
1429                            .local_node
1430                            .handle_block_proposal(proposal.clone())
1431                            .await
1432                        {
1433                            err = new_err;
1434                        } else {
1435                            continue;
1436                        }
1437                    }
1438                }
1439                while let LocalNodeError::WorkerError(WorkerError::ChainError(chain_err)) = &err {
1440                    if let ChainError::MissingCrossChainUpdate {
1441                        chain_id,
1442                        origin,
1443                        height,
1444                    } = &**chain_err
1445                    {
1446                        self.download_sender_block_with_sending_ancestors(
1447                            *chain_id,
1448                            *origin,
1449                            *height,
1450                            remote_node,
1451                        )
1452                        .await?;
1453                        // Retry
1454                        if let Err(new_err) = self
1455                            .local_node
1456                            .handle_block_proposal(proposal.clone())
1457                            .await
1458                        {
1459                            err = new_err;
1460                        } else {
1461                            continue 'proposal_loop;
1462                        }
1463                    } else {
1464                        break;
1465                    }
1466                }
1467
1468                debug!(
1469                    remote_node = remote_node.address(),
1470                    proposer = %owner,
1471                    height = %local_height,
1472                    error = %err,
1473                    "skipping proposal from validator",
1474                );
1475            }
1476        }
1477        Ok(())
1478    }
1479
1480    async fn try_process_locking_block_from(
1481        &self,
1482        remote_node: &RemoteNode<Env::ValidatorNode>,
1483        certificate: GenericCertificate<ValidatedBlock>,
1484    ) -> Result<(), chain_client::Error> {
1485        let chain_id = certificate.inner().chain_id();
1486        let certificate = Box::new(certificate);
1487        match self.process_certificate(certificate.clone()).await {
1488            Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1489                let mut blobs = Vec::new();
1490                for blob_id in blob_ids {
1491                    let blob_content = self
1492                        .requests_scheduler
1493                        .download_pending_blob(remote_node, chain_id, blob_id)
1494                        .await?;
1495                    blobs.push(Blob::new(blob_content));
1496                }
1497                self.local_node
1498                    .handle_pending_blobs(chain_id, blobs)
1499                    .await?;
1500                self.process_certificate(certificate).await?;
1501                Ok(())
1502            }
1503            Err(err) => Err(err.into()),
1504            Ok(()) => Ok(()),
1505        }
1506    }
1507
1508    /// Downloads and processes from the specified validators a confirmed block certificates that
1509    /// use the given blobs. If this succeeds, the blob will be in our storage.
1510    async fn update_local_node_with_blobs_from(
1511        &self,
1512        blob_ids: Vec<BlobId>,
1513        remote_nodes: &[RemoteNode<Env::ValidatorNode>],
1514    ) -> Result<Vec<Blob>, chain_client::Error> {
1515        let timeout = self.options.blob_download_timeout;
1516        // Deduplicate IDs.
1517        let blob_ids = blob_ids.into_iter().collect::<BTreeSet<_>>();
1518        stream::iter(blob_ids.into_iter().map(|blob_id| {
1519            communicate_concurrently(
1520                remote_nodes,
1521                async move |remote_node| {
1522                    let certificate = self
1523                        .requests_scheduler
1524                        .download_certificate_for_blob(&remote_node, blob_id)
1525                        .await?;
1526                    self.receive_sender_certificate(
1527                        certificate,
1528                        ReceiveCertificateMode::NeedsCheck,
1529                        Some(vec![remote_node.clone()]),
1530                    )
1531                    .await?;
1532                    let blob = self
1533                        .local_node
1534                        .storage_client()
1535                        .read_blob(blob_id)
1536                        .await?
1537                        .ok_or_else(|| LocalNodeError::BlobsNotFound(vec![blob_id]))?;
1538                    Result::<_, chain_client::Error>::Ok(blob)
1539                },
1540                move |_| chain_client::Error::from(NodeError::BlobsNotFound(vec![blob_id])),
1541                timeout,
1542            )
1543        }))
1544        .buffer_unordered(self.options.max_joined_tasks)
1545        .collect::<Vec<_>>()
1546        .await
1547        .into_iter()
1548        .collect()
1549    }
1550
1551    /// Attempts to execute the block locally. If any incoming message execution fails, that
1552    /// message is rejected and execution is retried, until the block accepts only messages
1553    /// that succeed.
1554    ///
1555    /// Attempts to execute the block locally with a specified policy for handling bundle failures.
1556    /// If any attempt to read a blob fails, the blob is downloaded and execution is retried.
1557    ///
1558    /// Returns the modified block (bundles may be rejected/removed based on the policy)
1559    /// and the execution result.
1560    #[instrument(level = "trace", skip(self, block))]
1561    async fn stage_block_execution_with_policy(
1562        &self,
1563        block: ProposedBlock,
1564        round: Option<u32>,
1565        published_blobs: Vec<Blob>,
1566        policy: BundleExecutionPolicy,
1567    ) -> Result<(Block, ChainInfoResponse), chain_client::Error> {
1568        loop {
1569            let result = self
1570                .local_node
1571                .stage_block_execution_with_policy(
1572                    block.clone(),
1573                    round,
1574                    published_blobs.clone(),
1575                    policy,
1576                )
1577                .await;
1578            if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1579                let validators = self.validator_nodes().await?;
1580                self.update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1581                    .await?;
1582                continue; // We found the missing blob: retry.
1583            }
1584            if let Ok((_, executed_block, _, _)) = &result {
1585                let hash = CryptoHash::new(executed_block);
1586                let notification = Notification {
1587                    chain_id: executed_block.header.chain_id,
1588                    reason: Reason::BlockExecuted {
1589                        height: executed_block.header.height,
1590                        hash,
1591                    },
1592                };
1593                self.notifier.notify(&[notification]);
1594            }
1595            let (_modified_block, executed_block, response, _resource_tracker) = result?;
1596            return Ok((executed_block, response));
1597        }
1598    }
1599
1600    /// Attempts to execute the block locally. If any attempt to read a blob fails, the blob is
1601    /// downloaded and execution is retried.
1602    #[instrument(level = "trace", skip(self, block))]
1603    async fn stage_block_execution(
1604        &self,
1605        block: ProposedBlock,
1606        round: Option<u32>,
1607        published_blobs: Vec<Blob>,
1608    ) -> Result<(Block, ChainInfoResponse), chain_client::Error> {
1609        loop {
1610            let result = self
1611                .local_node
1612                .stage_block_execution(block.clone(), round, published_blobs.clone())
1613                .await;
1614            if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1615                let validators = self.validator_nodes().await?;
1616                self.update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1617                    .await?;
1618                continue; // We found the missing blob: retry.
1619            }
1620            if let Ok((block, _, _)) = &result {
1621                let hash = CryptoHash::new(block);
1622                let notification = Notification {
1623                    chain_id: block.header.chain_id,
1624                    reason: Reason::BlockExecuted {
1625                        height: block.header.height,
1626                        hash,
1627                    },
1628                };
1629                self.notifier.notify(&[notification]);
1630            }
1631            let (block, response, _resource_tracker) = result?;
1632            return Ok((block, response));
1633        }
1634    }
1635}
1636
1637/// Performs `f` in parallel on multiple nodes, starting with a quadratically increasing delay on
1638/// each subsequent node. Returns error `err` if all of the nodes fail.
1639async fn communicate_concurrently<'a, A, E1, E2, F, G, R, V>(
1640    nodes: &[RemoteNode<A>],
1641    f: F,
1642    err: G,
1643    timeout: Duration,
1644) -> Result<V, E2>
1645where
1646    F: Clone + FnOnce(RemoteNode<A>) -> R,
1647    RemoteNode<A>: Clone,
1648    G: FnOnce(Vec<(ValidatorPublicKey, E1)>) -> E2,
1649    R: Future<Output = Result<V, E1>> + 'a,
1650{
1651    let mut stream = nodes
1652        .iter()
1653        .zip(0..)
1654        .map(|(remote_node, i)| {
1655            let fun = f.clone();
1656            let node = remote_node.clone();
1657            async move {
1658                linera_base::time::timer::sleep(timeout * i * i).await;
1659                fun(node).await.map_err(|err| (remote_node.public_key, err))
1660            }
1661        })
1662        .collect::<FuturesUnordered<_>>();
1663    let mut errors = vec![];
1664    while let Some(maybe_result) = stream.next().await {
1665        match maybe_result {
1666            Ok(result) => return Ok(result),
1667            Err(error) => errors.push(error),
1668        };
1669    }
1670    Err(err(errors))
1671}
1672
1673/// Wrapper for `AbortHandle` that aborts when its dropped.
1674#[must_use]
1675pub struct AbortOnDrop(pub AbortHandle);
1676
1677impl Drop for AbortOnDrop {
1678    #[instrument(level = "trace", skip(self))]
1679    fn drop(&mut self) {
1680        self.0.abort();
1681    }
1682}
1683
1684/// A pending proposed block, together with its published blobs.
1685#[derive(Clone, Serialize, Deserialize)]
1686pub struct PendingProposal {
1687    pub block: ProposedBlock,
1688    pub blobs: Vec<Blob>,
1689}
1690
1691enum ReceiveCertificateMode {
1692    NeedsCheck,
1693    AlreadyChecked,
1694}
1695
1696enum CheckCertificateResult {
1697    OldEpoch,
1698    New,
1699    FutureEpoch,
1700}
1701
1702impl CheckCertificateResult {
1703    fn into_result(self) -> Result<(), chain_client::Error> {
1704        match self {
1705            Self::OldEpoch => Err(chain_client::Error::CommitteeDeprecationError),
1706            Self::New => Ok(()),
1707            Self::FutureEpoch => Err(chain_client::Error::CommitteeSynchronizationError),
1708        }
1709    }
1710}
1711
1712/// Creates a compressed Contract, Service and bytecode.
1713#[cfg(not(target_arch = "wasm32"))]
1714pub async fn create_bytecode_blobs(
1715    contract: Bytecode,
1716    service: Bytecode,
1717    vm_runtime: VmRuntime,
1718) -> (Vec<Blob>, ModuleId) {
1719    match vm_runtime {
1720        VmRuntime::Wasm => {
1721            let (compressed_contract, compressed_service) =
1722                tokio::task::spawn_blocking(move || (contract.compress(), service.compress()))
1723                    .await
1724                    .expect("Compression should not panic");
1725            let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1726            let service_blob = Blob::new_service_bytecode(compressed_service);
1727            let module_id =
1728                ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
1729            (vec![contract_blob, service_blob], module_id)
1730        }
1731        VmRuntime::Evm => {
1732            let compressed_contract = contract.compress();
1733            let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1734            let module_id = ModuleId::new(
1735                evm_contract_blob.id().hash,
1736                evm_contract_blob.id().hash,
1737                vm_runtime,
1738            );
1739            (vec![evm_contract_blob], module_id)
1740        }
1741    }
1742}