linera_core/client/
mod.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    cmp::Ordering,
7    collections::{BTreeMap, BTreeSet, HashSet},
8    sync::{Arc, RwLock},
9};
10
11use custom_debug_derive::Debug;
12use futures::{
13    future::Future,
14    stream::{self, AbortHandle, FuturesUnordered, StreamExt},
15};
16#[cfg(with_metrics)]
17use linera_base::prometheus_util::MeasureLatency as _;
18use linera_base::{
19    crypto::{CryptoHash, ValidatorPublicKey},
20    data_types::{ArithmeticError, Blob, BlockHeight, ChainDescription, Epoch, TimeDelta},
21    ensure,
22    identifiers::{AccountOwner, BlobId, BlobType, ChainId, GenericApplicationId, StreamId},
23    time::Duration,
24};
25#[cfg(not(target_arch = "wasm32"))]
26use linera_base::{data_types::Bytecode, identifiers::ModuleId, vm::VmRuntime};
27use linera_chain::{
28    data_types::{
29        BlockProposal, ChainAndHeight, IncomingBundle, LiteVote, MessageAction, ProposedBlock,
30        Transaction,
31    },
32    manager::LockingBlock,
33    types::{
34        Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
35        LiteCertificate, ValidatedBlock, ValidatedBlockCertificate,
36    },
37    ChainError, ChainExecutionContext,
38};
39use linera_execution::committee::Committee;
40use linera_storage::{ResultReadCertificates, Storage as _};
41use rand::{
42    distributions::{Distribution, WeightedIndex},
43    seq::SliceRandom,
44};
45use received_log::ReceivedLogs;
46use serde::{Deserialize, Serialize};
47use tokio::sync::mpsc;
48use tracing::{debug, error, info, instrument, trace, warn};
49
50use crate::{
51    data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse, RoundTimeout},
52    environment::Environment,
53    local_node::{LocalChainInfoExt as _, LocalNodeClient, LocalNodeError},
54    node::{CrossChainMessageDelivery, NodeError, ValidatorNodeProvider as _},
55    notifier::{ChannelNotifier, Notifier as _},
56    remote_node::RemoteNode,
57    updater::{communicate_with_quorum, CommunicateAction, ValidatorUpdater},
58    worker::{Notification, ProcessableCertificate, Reason, WorkerError, WorkerState},
59    CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES,
60};
61
62pub mod chain_client;
63pub use chain_client::ChainClient;
64
65pub use crate::data_types::ClientOutcome;
66
67#[cfg(test)]
68#[path = "../unit_tests/client_tests.rs"]
69mod client_tests;
70pub mod requests_scheduler;
71
72pub use requests_scheduler::{RequestsScheduler, RequestsSchedulerConfig, ScoringWeights};
73mod received_log;
74mod validator_trackers;
75
76#[cfg(with_metrics)]
77mod metrics {
78    use std::sync::LazyLock;
79
80    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
81    use prometheus::HistogramVec;
82
83    pub static PROCESS_INBOX_WITHOUT_PREPARE_LATENCY: LazyLock<HistogramVec> =
84        LazyLock::new(|| {
85            register_histogram_vec(
86                "process_inbox_latency",
87                "process_inbox latency",
88                &[],
89                exponential_bucket_latencies(500.0),
90            )
91        });
92
93    pub static PREPARE_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
94        register_histogram_vec(
95            "prepare_chain_latency",
96            "prepare_chain latency",
97            &[],
98            exponential_bucket_latencies(500.0),
99        )
100    });
101
102    pub static SYNCHRONIZE_CHAIN_STATE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
103        register_histogram_vec(
104            "synchronize_chain_state_latency",
105            "synchronize_chain_state latency",
106            &[],
107            exponential_bucket_latencies(500.0),
108        )
109    });
110
111    pub static EXECUTE_BLOCK_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
112        register_histogram_vec(
113            "execute_block_latency",
114            "execute_block latency",
115            &[],
116            exponential_bucket_latencies(500.0),
117        )
118    });
119
120    pub static FIND_RECEIVED_CERTIFICATES_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
121        register_histogram_vec(
122            "find_received_certificates_latency",
123            "find_received_certificates latency",
124            &[],
125            exponential_bucket_latencies(500.0),
126        )
127    });
128}
129
130pub static DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE: u64 = 500;
131pub static DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE: usize = 20_000;
132
133/// Policies for automatically handling incoming messages.
134#[derive(Clone, Debug)]
135pub struct MessagePolicy {
136    /// The blanket policy applied to all messages.
137    blanket: BlanketMessagePolicy,
138    /// A collection of chains which restrict the origin of messages to be
139    /// accepted. `Option::None` means that messages from all chains are accepted. An empty
140    /// `HashSet` denotes that messages from no chains are accepted.
141    restrict_chain_ids_to: Option<HashSet<ChainId>>,
142    /// A collection of applications: If `Some`, only bundles with at least one message by any
143    /// of these applications will be accepted.
144    reject_message_bundles_without_application_ids: Option<HashSet<GenericApplicationId>>,
145    /// A collection of applications: If `Some`, only bundles all of whose messages are by these
146    /// applications will be accepted.
147    reject_message_bundles_with_other_application_ids: Option<HashSet<GenericApplicationId>>,
148}
149
150#[derive(Default, Copy, Clone, Debug, clap::ValueEnum, serde::Deserialize, tsify_next::Tsify)]
151pub enum BlanketMessagePolicy {
152    /// Automatically accept all incoming messages. Reject them only if execution fails.
153    #[default]
154    Accept,
155    /// Automatically reject tracked messages, ignore or skip untracked messages, but accept
156    /// protected ones.
157    Reject,
158    /// Don't include any messages in blocks, and don't make any decision whether to accept or
159    /// reject.
160    Ignore,
161}
162
163impl MessagePolicy {
164    pub fn new(
165        blanket: BlanketMessagePolicy,
166        restrict_chain_ids_to: Option<HashSet<ChainId>>,
167        reject_message_bundles_without_application_ids: Option<HashSet<GenericApplicationId>>,
168        reject_message_bundles_with_other_application_ids: Option<HashSet<GenericApplicationId>>,
169    ) -> Self {
170        Self {
171            blanket,
172            restrict_chain_ids_to,
173            reject_message_bundles_without_application_ids,
174            reject_message_bundles_with_other_application_ids,
175        }
176    }
177
178    #[cfg(with_testing)]
179    pub fn new_accept_all() -> Self {
180        Self {
181            blanket: BlanketMessagePolicy::Accept,
182            restrict_chain_ids_to: None,
183            reject_message_bundles_without_application_ids: None,
184            reject_message_bundles_with_other_application_ids: None,
185        }
186    }
187
188    #[instrument(level = "trace", skip(self))]
189    fn apply(&self, mut bundle: IncomingBundle) -> Option<IncomingBundle> {
190        if let Some(chain_ids) = &self.restrict_chain_ids_to {
191            if !chain_ids.contains(&bundle.origin) {
192                return None;
193            }
194        }
195        if let Some(app_ids) = &self.reject_message_bundles_without_application_ids {
196            if !bundle
197                .messages()
198                .any(|posted_msg| app_ids.contains(&posted_msg.message.application_id()))
199            {
200                return None;
201            }
202        }
203        if let Some(app_ids) = &self.reject_message_bundles_with_other_application_ids {
204            if !bundle
205                .messages()
206                .all(|posted_msg| app_ids.contains(&posted_msg.message.application_id()))
207            {
208                return None;
209            }
210        }
211        if self.is_reject() {
212            if bundle.bundle.is_skippable() {
213                return None;
214            } else if !bundle.bundle.is_protected() {
215                bundle.action = MessageAction::Reject;
216            }
217        }
218        Some(bundle)
219    }
220
221    #[instrument(level = "trace", skip(self))]
222    fn is_ignore(&self) -> bool {
223        matches!(self.blanket, BlanketMessagePolicy::Ignore)
224    }
225
226    #[instrument(level = "trace", skip(self))]
227    fn is_reject(&self) -> bool {
228        matches!(self.blanket, BlanketMessagePolicy::Reject)
229    }
230}
231
232#[derive(Debug, Clone, Copy)]
233pub enum TimingType {
234    ExecuteOperations,
235    ExecuteBlock,
236    SubmitBlockProposal,
237    UpdateValidators,
238}
239
240/// Defines how we listen to a chain:
241/// - do we care about every block notification?
242/// - or do we only care about blocks containing events from some particular streams?
243#[derive(Debug, Clone, PartialEq, Eq)]
244pub enum ListeningMode {
245    /// Listen to everything: all blocks for the chain and all blocks from sender chains,
246    /// and participate in rounds.
247    FullChain,
248    /// Listen to all blocks for the chain, but don't download sender chain blocks or participate
249    /// in rounds. Use this when interested in the chain's state but not intending to propose
250    /// blocks (e.g., because we're not a chain owner).
251    FollowChain,
252    /// Only listen to blocks which contain events from those streams.
253    EventsOnly(BTreeSet<StreamId>),
254}
255
256impl PartialOrd for ListeningMode {
257    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
258        match (self, other) {
259            (ListeningMode::FullChain, ListeningMode::FullChain) => Some(Ordering::Equal),
260            (ListeningMode::FullChain, _) => Some(Ordering::Greater),
261            (_, ListeningMode::FullChain) => Some(Ordering::Less),
262            (ListeningMode::FollowChain, ListeningMode::FollowChain) => Some(Ordering::Equal),
263            (ListeningMode::FollowChain, ListeningMode::EventsOnly(_)) => Some(Ordering::Greater),
264            (ListeningMode::EventsOnly(_), ListeningMode::FollowChain) => Some(Ordering::Less),
265            (ListeningMode::EventsOnly(events_a), ListeningMode::EventsOnly(events_b)) => {
266                if events_a.is_superset(events_b) {
267                    Some(Ordering::Greater)
268                } else if events_b.is_superset(events_a) {
269                    Some(Ordering::Less)
270                } else {
271                    None
272                }
273            }
274        }
275    }
276}
277
278impl ListeningMode {
279    /// Returns whether a notification with this reason should be processed under this listening
280    /// mode.
281    pub fn is_relevant(&self, reason: &Reason) -> bool {
282        match (reason, self) {
283            // FullChain processes everything.
284            (_, ListeningMode::FullChain) => true,
285            // FollowChain processes new blocks on the chain itself, including blocks that
286            // produced events.
287            (Reason::NewBlock { .. }, ListeningMode::FollowChain) => true,
288            (Reason::NewEvents { .. }, ListeningMode::FollowChain) => true,
289            (_, ListeningMode::FollowChain) => false,
290            // EventsOnly only processes events from relevant streams.
291            (Reason::NewEvents { event_streams, .. }, ListeningMode::EventsOnly(relevant)) => {
292                relevant.intersection(event_streams).next().is_some()
293            }
294            (_, ListeningMode::EventsOnly(_)) => false,
295        }
296    }
297
298    pub fn extend(&mut self, other: Option<ListeningMode>) {
299        match (self, other) {
300            (_, None) => (),
301            (ListeningMode::FullChain, _) => (),
302            (mode, Some(ListeningMode::FullChain)) => {
303                *mode = ListeningMode::FullChain;
304            }
305            (ListeningMode::FollowChain, _) => (),
306            (mode, Some(ListeningMode::FollowChain)) => {
307                *mode = ListeningMode::FollowChain;
308            }
309            (
310                ListeningMode::EventsOnly(self_events),
311                Some(ListeningMode::EventsOnly(other_events)),
312            ) => {
313                self_events.extend(other_events);
314            }
315        }
316    }
317}
318
319/// A builder that creates [`ChainClient`]s which share the cache and notifiers.
320pub struct Client<Env: Environment> {
321    environment: Env,
322    /// Local node to manage the execution state and the local storage of the chains that we are
323    /// tracking.
324    pub local_node: LocalNodeClient<Env::Storage>,
325    /// Manages the requests sent to validator nodes.
326    requests_scheduler: RequestsScheduler<Env>,
327    /// The admin chain ID.
328    admin_id: ChainId,
329    /// Chains that should be tracked by the client.
330    // TODO(#2412): Merge with set of chains the client is receiving notifications from validators
331    tracked_chains: Arc<RwLock<HashSet<ChainId>>>,
332    /// References to clients waiting for chain notifications.
333    notifier: Arc<ChannelNotifier<Notification>>,
334    /// Chain state for the managed chains.
335    chains: papaya::HashMap<ChainId, chain_client::State>,
336    /// Configuration options.
337    options: chain_client::Options,
338}
339
340impl<Env: Environment> Client<Env> {
341    /// Creates a new `Client` with a new cache and notifiers.
342    #[instrument(level = "trace", skip_all)]
343    #[allow(clippy::too_many_arguments)]
344    pub fn new(
345        environment: Env,
346        admin_id: ChainId,
347        long_lived_services: bool,
348        tracked_chains: impl IntoIterator<Item = ChainId>,
349        name: impl Into<String>,
350        chain_worker_ttl: Duration,
351        sender_chain_worker_ttl: Duration,
352        options: chain_client::Options,
353        block_cache_size: usize,
354        execution_state_cache_size: usize,
355        requests_scheduler_config: requests_scheduler::RequestsSchedulerConfig,
356    ) -> Self {
357        let tracked_chains = Arc::new(RwLock::new(tracked_chains.into_iter().collect()));
358        let state = WorkerState::new_for_client(
359            name.into(),
360            environment.storage().clone(),
361            tracked_chains.clone(),
362            block_cache_size,
363            execution_state_cache_size,
364        )
365        .with_long_lived_services(long_lived_services)
366        .with_allow_inactive_chains(true)
367        .with_allow_messages_from_deprecated_epochs(true)
368        .with_chain_worker_ttl(chain_worker_ttl)
369        .with_sender_chain_worker_ttl(sender_chain_worker_ttl);
370        let local_node = LocalNodeClient::new(state);
371        let requests_scheduler = RequestsScheduler::new(vec![], requests_scheduler_config);
372
373        Self {
374            environment,
375            local_node,
376            requests_scheduler,
377            chains: papaya::HashMap::new(),
378            admin_id,
379            tracked_chains,
380            notifier: Arc::new(ChannelNotifier::default()),
381            options,
382        }
383    }
384
385    /// Returns the chain ID of the admin chain.
386    pub fn admin_chain(&self) -> ChainId {
387        self.admin_id
388    }
389
390    /// Returns the storage client used by this client's local node.
391    pub fn storage_client(&self) -> &Env::Storage {
392        self.environment.storage()
393    }
394
395    pub fn validator_node_provider(&self) -> &Env::Network {
396        self.environment.network()
397    }
398
399    /// Returns a reference to the client's [`Signer`][crate::environment::Signer].
400    #[instrument(level = "trace", skip(self))]
401    pub fn signer(&self) -> &Env::Signer {
402        self.environment.signer()
403    }
404
405    /// Returns a reference to the client's [`Wallet`][crate::environment::Wallet].
406    pub fn wallet(&self) -> &Env::Wallet {
407        self.environment.wallet()
408    }
409
410    /// Adds a chain to the set of chains tracked by the local node.
411    #[instrument(level = "trace", skip(self))]
412    pub fn track_chain(&self, chain_id: ChainId) {
413        self.tracked_chains
414            .write()
415            .expect("Panics should not happen while holding a lock to `tracked_chains`")
416            .insert(chain_id);
417    }
418
419    /// Creates a new `ChainClient`.
420    #[expect(clippy::too_many_arguments)]
421    #[instrument(level = "trace", skip_all, fields(chain_id, next_block_height))]
422    pub fn create_chain_client(
423        self: &Arc<Self>,
424        chain_id: ChainId,
425        block_hash: Option<CryptoHash>,
426        next_block_height: BlockHeight,
427        pending_proposal: Option<PendingProposal>,
428        preferred_owner: Option<AccountOwner>,
429        timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
430        follow_only: bool,
431    ) -> ChainClient<Env> {
432        // If the entry already exists we assume that the entry is more up to date than
433        // the arguments: If they were read from the wallet file, they might be stale.
434        self.chains.pin().get_or_insert_with(chain_id, || {
435            chain_client::State::new(pending_proposal.clone(), follow_only)
436        });
437
438        ChainClient::new(
439            self.clone(),
440            chain_id,
441            self.options.clone(),
442            block_hash,
443            next_block_height,
444            preferred_owner,
445            timing_sender,
446        )
447    }
448
449    /// Returns whether the given chain is in follow-only mode.
450    fn is_chain_follow_only(&self, chain_id: ChainId) -> bool {
451        self.chains
452            .pin()
453            .get(&chain_id)
454            .is_some_and(|state| state.is_follow_only())
455    }
456
457    /// Sets whether the given chain is in follow-only mode.
458    pub fn set_chain_follow_only(&self, chain_id: ChainId, follow_only: bool) {
459        self.chains.pin().update(chain_id, |state| {
460            let mut state = state.clone_for_update_unchecked();
461            state.set_follow_only(follow_only);
462            state
463        });
464    }
465
466    /// Fetches the chain description blob if needed, and returns the chain info.
467    async fn fetch_chain_info(
468        &self,
469        chain_id: ChainId,
470        validators: &[RemoteNode<Env::ValidatorNode>],
471    ) -> Result<Box<ChainInfo>, chain_client::Error> {
472        match self.local_node.chain_info(chain_id).await {
473            Ok(info) => Ok(info),
474            Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
475                // Make sure the admin chain is up to date.
476                self.synchronize_chain_state(self.admin_id).await?;
477                // If the chain is missing then the error is a WorkerError
478                // and so a BlobsNotFound
479                self.update_local_node_with_blobs_from(blob_ids, validators)
480                    .await?;
481                Ok(self.local_node.chain_info(chain_id).await?)
482            }
483            Err(err) => Err(err.into()),
484        }
485    }
486
487    fn weighted_select(
488        remaining_validators: &mut Vec<RemoteNode<Env::ValidatorNode>>,
489        remaining_weights: &mut Vec<u64>,
490    ) -> Option<RemoteNode<Env::ValidatorNode>> {
491        if remaining_weights.is_empty() {
492            return None;
493        }
494        let dist = WeightedIndex::new(remaining_weights.clone()).unwrap();
495        let idx = dist.sample(&mut rand::thread_rng());
496        remaining_weights.remove(idx);
497        Some(remaining_validators.remove(idx))
498    }
499
500    /// Downloads and processes all certificates up to (excluding) the specified height.
501    #[instrument(level = "trace", skip(self))]
502    async fn download_certificates(
503        &self,
504        chain_id: ChainId,
505        target_next_block_height: BlockHeight,
506    ) -> Result<Box<ChainInfo>, chain_client::Error> {
507        let (_, committee) = self.admin_committee().await?;
508        let mut remaining_validators = self.make_nodes(&committee)?;
509        let mut info = self
510            .fetch_chain_info(chain_id, &remaining_validators)
511            .await?;
512        // Determining the weights of the validators
513        let mut remaining_weights = remaining_validators
514            .iter()
515            .map(|validator| {
516                let validator_state = committee.validators.get(&validator.public_key).unwrap();
517                validator_state.votes
518            })
519            .collect::<Vec<_>>();
520
521        while let Some(remote_node) =
522            Self::weighted_select(&mut remaining_validators, &mut remaining_weights)
523        {
524            if target_next_block_height <= info.next_block_height {
525                return Ok(info);
526            }
527            match self
528                .download_certificates_from(&remote_node, chain_id, target_next_block_height)
529                .await
530            {
531                Err(error) => info!(
532                    remote_node = remote_node.address(),
533                    %error,
534                    "failed to download certificates from validator",
535                ),
536                Ok(Some(new_info)) => info = new_info,
537                Ok(None) => {}
538            }
539        }
540        ensure!(
541            target_next_block_height <= info.next_block_height,
542            chain_client::Error::CannotDownloadCertificates {
543                chain_id,
544                target_next_block_height,
545            }
546        );
547        Ok(info)
548    }
549
550    /// Downloads and processes all certificates up to (excluding) the specified height from the
551    /// given validator.
552    #[instrument(level = "trace", skip_all)]
553    async fn download_certificates_from(
554        &self,
555        remote_node: &RemoteNode<Env::ValidatorNode>,
556        chain_id: ChainId,
557        stop: BlockHeight,
558    ) -> Result<Option<Box<ChainInfo>>, chain_client::Error> {
559        let mut last_info = None;
560        // First load any blocks from local storage, if available.
561        let chain_info = self.local_node.chain_info(chain_id).await?;
562        let mut next_height = chain_info.next_block_height;
563        let hashes = self
564            .local_node
565            .get_preprocessed_block_hashes(chain_id, next_height, stop)
566            .await?;
567        let certificates = self
568            .storage_client()
569            .read_certificates(hashes.clone())
570            .await?;
571        let certificates = match ResultReadCertificates::new(certificates, hashes) {
572            ResultReadCertificates::Certificates(certificates) => certificates,
573            ResultReadCertificates::InvalidHashes(hashes) => {
574                return Err(chain_client::Error::ReadCertificatesError(hashes))
575            }
576        };
577        for certificate in certificates {
578            last_info = Some(self.handle_certificate(certificate).await?.info);
579        }
580        // Now download the rest in batches from the remote node.
581        while next_height < stop {
582            // TODO(#2045): Analyze network errors instead of using a fixed batch size.
583            let limit = u64::from(stop)
584                .checked_sub(u64::from(next_height))
585                .ok_or(ArithmeticError::Overflow)?
586                .min(self.options.certificate_download_batch_size);
587
588            let certificates = self
589                .requests_scheduler
590                .download_certificates(remote_node, chain_id, next_height, limit)
591                .await?;
592            let Some(info) = self.process_certificates(remote_node, certificates).await? else {
593                break;
594            };
595            assert!(info.next_block_height > next_height);
596            next_height = info.next_block_height;
597            last_info = Some(info);
598        }
599        Ok(last_info)
600    }
601
602    async fn download_blobs(
603        &self,
604        remote_nodes: &[RemoteNode<Env::ValidatorNode>],
605        blob_ids: &[BlobId],
606    ) -> Result<(), chain_client::Error> {
607        let blobs = &self
608            .requests_scheduler
609            .download_blobs(remote_nodes, blob_ids, self.options.blob_download_timeout)
610            .await?
611            .ok_or_else(|| {
612                chain_client::Error::RemoteNodeError(NodeError::BlobsNotFound(blob_ids.to_vec()))
613            })?;
614        self.local_node.store_blobs(blobs).await.map_err(Into::into)
615    }
616
617    /// Tries to process all the certificates, requesting any missing blobs from the given node.
618    /// Returns the chain info of the last successfully processed certificate.
619    #[instrument(level = "trace", skip_all)]
620    async fn process_certificates(
621        &self,
622        remote_node: &RemoteNode<Env::ValidatorNode>,
623        certificates: Vec<ConfirmedBlockCertificate>,
624    ) -> Result<Option<Box<ChainInfo>>, chain_client::Error> {
625        let mut info = None;
626        let required_blob_ids: Vec<_> = certificates
627            .iter()
628            .flat_map(|certificate| certificate.value().required_blob_ids())
629            .collect();
630
631        match self
632            .local_node
633            .read_blob_states_from_storage(&required_blob_ids)
634            .await
635        {
636            Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
637                self.download_blobs(&[remote_node.clone()], &blob_ids)
638                    .await?;
639            }
640            x => {
641                x?;
642            }
643        }
644
645        for certificate in certificates {
646            info = Some(
647                match self.handle_certificate(certificate.clone()).await {
648                    Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
649                        self.download_blobs(&[remote_node.clone()], &blob_ids)
650                            .await?;
651                        self.handle_certificate(certificate).await?
652                    }
653                    x => x?,
654                }
655                .info,
656            );
657        }
658
659        // Done with all certificates.
660        Ok(info)
661    }
662
663    async fn handle_certificate<T: ProcessableCertificate>(
664        &self,
665        certificate: GenericCertificate<T>,
666    ) -> Result<ChainInfoResponse, LocalNodeError> {
667        self.local_node
668            .handle_certificate(certificate, &self.notifier)
669            .await
670    }
671
672    async fn chain_info_with_committees(
673        &self,
674        chain_id: ChainId,
675    ) -> Result<Box<ChainInfo>, LocalNodeError> {
676        let query = ChainInfoQuery::new(chain_id).with_committees();
677        let info = self.local_node.handle_chain_info_query(query).await?.info;
678        Ok(info)
679    }
680
681    /// Obtains all the committees trusted by any of the given chains. Also returns the highest
682    /// of their epochs.
683    #[instrument(level = "trace", skip_all)]
684    async fn admin_committees(
685        &self,
686    ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
687        let info = self.chain_info_with_committees(self.admin_id).await?;
688        Ok((info.epoch, info.into_committees()?))
689    }
690
691    /// Obtains the committee for the latest epoch on the admin chain.
692    pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
693        let info = self.chain_info_with_committees(self.admin_id).await?;
694        Ok((info.epoch, info.into_current_committee()?))
695    }
696
697    /// Obtains the validators for the latest epoch.
698    async fn validator_nodes(
699        &self,
700    ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, chain_client::Error> {
701        let (_, committee) = self.admin_committee().await?;
702        Ok(self.make_nodes(&committee)?)
703    }
704
705    /// Creates a [`RemoteNode`] for each validator in the committee.
706    fn make_nodes(
707        &self,
708        committee: &Committee,
709    ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, NodeError> {
710        Ok(self
711            .validator_node_provider()
712            .make_nodes(committee)?
713            .map(|(public_key, node)| RemoteNode { public_key, node })
714            .collect())
715    }
716
717    /// Ensures that the client has the `ChainDescription` blob corresponding to this
718    /// client's `ChainId`.
719    pub async fn get_chain_description(
720        &self,
721        chain_id: ChainId,
722    ) -> Result<ChainDescription, chain_client::Error> {
723        let chain_desc_id = BlobId::new(chain_id.0, BlobType::ChainDescription);
724        let blob = self
725            .local_node
726            .storage_client()
727            .read_blob(chain_desc_id)
728            .await?;
729        if let Some(blob) = blob {
730            // We have the blob - return it.
731            return Ok(bcs::from_bytes(blob.bytes())?);
732        };
733        // Recover history from the current validators, according to the admin chain.
734        self.synchronize_chain_state(self.admin_id).await?;
735        let nodes = self.validator_nodes().await?;
736        let blob = self
737            .update_local_node_with_blobs_from(vec![chain_desc_id], &nodes)
738            .await?
739            .pop()
740            .unwrap(); // Returns exactly as many blobs as passed-in IDs.
741        Ok(bcs::from_bytes(blob.bytes())?)
742    }
743
744    /// Updates the latest block and next block height and round information from the chain info.
745    #[instrument(level = "trace", skip_all, fields(chain_id = format!("{:.8}", info.chain_id)))]
746    fn update_from_info(&self, info: &ChainInfo) {
747        self.chains.pin().update(info.chain_id, |state| {
748            let mut state = state.clone_for_update_unchecked();
749            state.update_from_info(info);
750            state
751        });
752    }
753
754    /// Handles the certificate in the local node and the resulting notifications.
755    #[instrument(level = "trace", skip_all)]
756    async fn process_certificate<T: ProcessableCertificate>(
757        &self,
758        certificate: Box<GenericCertificate<T>>,
759    ) -> Result<(), LocalNodeError> {
760        let info = self.handle_certificate(*certificate).await?.info;
761        self.update_from_info(&info);
762        Ok(())
763    }
764
765    /// Submits a validated block for finalization and returns the confirmed block certificate.
766    #[instrument(level = "trace", skip_all)]
767    pub(crate) async fn finalize_block(
768        self: &Arc<Self>,
769        committee: &Committee,
770        certificate: ValidatedBlockCertificate,
771    ) -> Result<ConfirmedBlockCertificate, chain_client::Error> {
772        debug!(round = %certificate.round, "Submitting block for confirmation");
773        let hashed_value = ConfirmedBlock::new(certificate.inner().block().clone());
774        let finalize_action = CommunicateAction::FinalizeBlock {
775            certificate: Box::new(certificate),
776            delivery: self.options.cross_chain_message_delivery,
777        };
778        let certificate = self
779            .communicate_chain_action(committee, finalize_action, hashed_value)
780            .await?;
781        self.receive_certificate_with_checked_signatures(certificate.clone())
782            .await?;
783        Ok(certificate)
784    }
785
786    /// Submits a block proposal to the validators.
787    #[instrument(level = "trace", skip_all)]
788    pub(crate) async fn submit_block_proposal<T: ProcessableCertificate>(
789        self: &Arc<Self>,
790        committee: &Committee,
791        proposal: Box<BlockProposal>,
792        value: T,
793    ) -> Result<GenericCertificate<T>, chain_client::Error> {
794        use linera_storage::Clock as _;
795
796        debug!(
797            round = %proposal.content.round,
798            "Submitting block proposal to validators"
799        );
800
801        // Check if the block timestamp is in the future and log INFO.
802        let block_timestamp = proposal.content.block.timestamp;
803        let local_time = self.local_node.storage_client().clock().current_time();
804        if block_timestamp > local_time {
805            info!(
806                chain_id = %proposal.content.block.chain_id,
807                %block_timestamp,
808                %local_time,
809                "Block timestamp is in the future; waiting for validators",
810            );
811        }
812
813        // Create channel for clock skew reports from validators.
814        let (clock_skew_sender, mut clock_skew_receiver) = mpsc::unbounded_channel();
815        let submit_action = CommunicateAction::SubmitBlock {
816            proposal,
817            blob_ids: value.required_blob_ids().into_iter().collect(),
818            clock_skew_sender,
819        };
820
821        // Spawn a task to monitor clock skew reports and warn if threshold is reached.
822        let validity_threshold = committee.validity_threshold();
823        let committee_clone = committee.clone();
824        let clock_skew_check_handle = linera_base::task::spawn(async move {
825            let mut skew_weight = 0u64;
826            let mut min_skew = TimeDelta::MAX;
827            let mut max_skew = TimeDelta::ZERO;
828            while let Some((public_key, clock_skew)) = clock_skew_receiver.recv().await {
829                if clock_skew.as_micros() > 0 {
830                    skew_weight += committee_clone.weight(&public_key);
831                    min_skew = min_skew.min(clock_skew);
832                    max_skew = max_skew.max(clock_skew);
833                    if skew_weight >= validity_threshold {
834                        warn!(
835                            skew_weight,
836                            validity_threshold,
837                            min_skew_ms = min_skew.as_micros() / 1000,
838                            max_skew_ms = max_skew.as_micros() / 1000,
839                            "A validity threshold of validators reported clock skew; \
840                             consider checking your system clock",
841                        );
842                        return;
843                    }
844                }
845            }
846        });
847
848        let certificate = self
849            .communicate_chain_action(committee, submit_action, value)
850            .await?;
851
852        clock_skew_check_handle.await;
853
854        self.process_certificate(Box::new(certificate.clone()))
855            .await?;
856        Ok(certificate)
857    }
858
859    /// Broadcasts certified blocks to validators.
860    #[instrument(level = "trace", skip_all, fields(chain_id, block_height, delivery))]
861    async fn communicate_chain_updates(
862        self: &Arc<Self>,
863        committee: &Committee,
864        chain_id: ChainId,
865        height: BlockHeight,
866        delivery: CrossChainMessageDelivery,
867        latest_certificate: Option<GenericCertificate<ConfirmedBlock>>,
868    ) -> Result<(), chain_client::Error> {
869        let nodes = self.make_nodes(committee)?;
870        communicate_with_quorum(
871            &nodes,
872            committee,
873            |_: &()| (),
874            |remote_node| {
875                let mut updater = ValidatorUpdater {
876                    remote_node,
877                    client: self.clone(),
878                    admin_id: self.admin_id,
879                };
880                let certificate = latest_certificate.clone();
881                Box::pin(async move {
882                    updater
883                        .send_chain_information(chain_id, height, delivery, certificate)
884                        .await
885                })
886            },
887            self.options.quorum_grace_period,
888        )
889        .await?;
890        Ok(())
891    }
892
893    /// Broadcasts certified blocks and optionally a block proposal, certificate or
894    /// leader timeout request.
895    ///
896    /// In that case, it verifies that the validator votes are for the provided value,
897    /// and returns a certificate.
898    #[instrument(level = "trace", skip_all)]
899    async fn communicate_chain_action<T: CertificateValue>(
900        self: &Arc<Self>,
901        committee: &Committee,
902        action: CommunicateAction,
903        value: T,
904    ) -> Result<GenericCertificate<T>, chain_client::Error> {
905        let nodes = self.make_nodes(committee)?;
906        let ((votes_hash, votes_round), votes) = communicate_with_quorum(
907            &nodes,
908            committee,
909            |vote: &LiteVote| (vote.value.value_hash, vote.round),
910            |remote_node| {
911                let mut updater = ValidatorUpdater {
912                    remote_node,
913                    client: self.clone(),
914                    admin_id: self.admin_id,
915                };
916                let action = action.clone();
917                Box::pin(async move { updater.send_chain_update(action).await })
918            },
919            self.options.quorum_grace_period,
920        )
921        .await?;
922        ensure!(
923            (votes_hash, votes_round) == (value.hash(), action.round()),
924            chain_client::Error::UnexpectedQuorum {
925                hash: votes_hash,
926                round: votes_round,
927                expected_hash: value.hash(),
928                expected_round: action.round(),
929            }
930        );
931        // Certificate is valid because
932        // * `communicate_with_quorum` ensured a sufficient "weight" of
933        // (non-error) answers were returned by validators.
934        // * each answer is a vote signed by the expected validator.
935        let certificate = LiteCertificate::try_from_votes(votes)
936            .ok_or_else(|| {
937                chain_client::Error::InternalError(
938                    "Vote values or rounds don't match; this is a bug",
939                )
940            })?
941            .with_value(value)
942            .ok_or_else(|| {
943                chain_client::Error::ProtocolError("A quorum voted for an unexpected value")
944            })?;
945        Ok(certificate)
946    }
947
948    /// Processes the confirmed block certificate in the local node without checking signatures.
949    /// Also downloads and processes all ancestors that are still missing.
950    #[instrument(level = "trace", skip_all)]
951    async fn receive_certificate_with_checked_signatures(
952        &self,
953        certificate: ConfirmedBlockCertificate,
954    ) -> Result<(), chain_client::Error> {
955        let certificate = Box::new(certificate);
956        let block = certificate.block();
957        // Recover history from the network.
958        self.download_certificates(block.header.chain_id, block.header.height)
959            .await?;
960        // Process the received operations. Download required hashed certificate values if
961        // necessary.
962        if let Err(err) = self.process_certificate(certificate.clone()).await {
963            match &err {
964                LocalNodeError::BlobsNotFound(blob_ids) => {
965                    self.download_blobs(&self.validator_nodes().await?, blob_ids)
966                        .await
967                        .map_err(|_| err)?;
968                    self.process_certificate(certificate).await?;
969                }
970                _ => {
971                    // The certificate is not as expected. Give up.
972                    warn!("Failed to process network hashed certificate value");
973                    return Err(err.into());
974                }
975            }
976        }
977
978        Ok(())
979    }
980
981    /// Processes the confirmed block in the local node, possibly without executing it.
982    #[instrument(level = "trace", skip_all)]
983    #[allow(dead_code)] // Otherwise CI fails when built for docker.
984    async fn receive_sender_certificate(
985        &self,
986        certificate: ConfirmedBlockCertificate,
987        mode: ReceiveCertificateMode,
988        nodes: Option<Vec<RemoteNode<Env::ValidatorNode>>>,
989    ) -> Result<(), chain_client::Error> {
990        // Verify the certificate before doing any expensive networking.
991        let (max_epoch, committees) = self.admin_committees().await?;
992        if let ReceiveCertificateMode::NeedsCheck = mode {
993            Self::check_certificate(max_epoch, &committees, &certificate)?.into_result()?;
994        }
995        // Recover history from the network.
996        let nodes = if let Some(nodes) = nodes {
997            nodes
998        } else {
999            self.validator_nodes().await?
1000        };
1001        if let Err(err) = self.handle_certificate(certificate.clone()).await {
1002            match &err {
1003                LocalNodeError::BlobsNotFound(blob_ids) => {
1004                    self.download_blobs(&nodes, blob_ids).await?;
1005                    self.handle_certificate(certificate.clone()).await?;
1006                }
1007                _ => {
1008                    // The certificate is not as expected. Give up.
1009                    warn!("Failed to process network hashed certificate value");
1010                    return Err(err.into());
1011                }
1012            }
1013        }
1014
1015        Ok(())
1016    }
1017
1018    /// Downloads and processes certificates for sender chain blocks.
1019    #[instrument(level = "trace", skip_all)]
1020    async fn download_and_process_sender_chain(
1021        &self,
1022        sender_chain_id: ChainId,
1023        nodes: &[RemoteNode<Env::ValidatorNode>],
1024        received_log: &ReceivedLogs,
1025        mut remote_heights: Vec<BlockHeight>,
1026        sender: mpsc::UnboundedSender<ChainAndHeight>,
1027    ) {
1028        let (max_epoch, committees) = match self.admin_committees().await {
1029            Ok(result) => result,
1030            Err(error) => {
1031                error!(%error, %sender_chain_id, "could not read admin committees");
1032                return;
1033            }
1034        };
1035        let committees_ref = &committees;
1036        let mut nodes = nodes.to_vec();
1037        while !remote_heights.is_empty() {
1038            let remote_heights_ref = &remote_heights;
1039            nodes.shuffle(&mut rand::thread_rng());
1040            let certificates = match communicate_concurrently(
1041                &nodes,
1042                async move |remote_node| {
1043                    let mut remote_heights = remote_heights_ref.clone();
1044                    // No need trying to download certificates the validator didn't have in their
1045                    // log - we'll retry downloading the remaining ones next time we loop.
1046                    remote_heights.retain(|height| {
1047                        received_log.validator_has_block(
1048                            &remote_node.public_key,
1049                            sender_chain_id,
1050                            *height,
1051                        )
1052                    });
1053                    if remote_heights.is_empty() {
1054                        // It makes no sense to return `Ok(_)` if we aren't going to try downloading
1055                        // anything from the validator - let the function try the other validators
1056                        return Err(());
1057                    }
1058                    let certificates = self
1059                        .requests_scheduler
1060                        .download_certificates_by_heights(
1061                            &remote_node,
1062                            sender_chain_id,
1063                            remote_heights,
1064                        )
1065                        .await
1066                        .map_err(|_| ())?;
1067                    let mut certificates_with_check_results = vec![];
1068                    for cert in certificates {
1069                        if let Ok(check_result) =
1070                            Self::check_certificate(max_epoch, committees_ref, &cert)
1071                        {
1072                            certificates_with_check_results
1073                                .push((cert, check_result.into_result().is_ok()));
1074                        } else {
1075                            // Invalid signature - the validator is faulty
1076                            return Err(());
1077                        }
1078                    }
1079                    Ok(certificates_with_check_results)
1080                },
1081                |errors| {
1082                    errors
1083                        .into_iter()
1084                        .map(|(validator, _error)| validator)
1085                        .collect::<BTreeSet<_>>()
1086                },
1087                self.options.certificate_batch_download_timeout,
1088            )
1089            .await
1090            {
1091                Ok(certificates_with_check_results) => certificates_with_check_results,
1092                Err(faulty_validators) => {
1093                    // filter out faulty validators and retry if any are left
1094                    nodes.retain(|node| !faulty_validators.contains(&node.public_key));
1095                    if nodes.is_empty() {
1096                        info!(
1097                            chain_id = %sender_chain_id,
1098                            "could not download certificates for chain - no more correct validators left"
1099                        );
1100                        return;
1101                    }
1102                    continue;
1103                }
1104            };
1105
1106            trace!(
1107                chain_id = %sender_chain_id,
1108                num_certificates = %certificates.len(),
1109                "received certificates",
1110            );
1111
1112            let mut to_remove_from_queue = BTreeSet::new();
1113
1114            for (certificate, check_result) in certificates {
1115                let hash = certificate.hash();
1116                let chain_id = certificate.block().header.chain_id;
1117                let height = certificate.block().header.height;
1118                if !check_result {
1119                    // The certificate was correctly signed, but we were missing a committee to
1120                    // validate it properly - do not receive it, but also do not attempt to
1121                    // re-download it.
1122                    to_remove_from_queue.insert(height);
1123                    continue;
1124                }
1125                // We checked the certificates right after downloading them.
1126                let mode = ReceiveCertificateMode::AlreadyChecked;
1127                if let Err(error) = self
1128                    .receive_sender_certificate(certificate, mode, None)
1129                    .await
1130                {
1131                    warn!(%error, %hash, "Received invalid certificate");
1132                } else {
1133                    to_remove_from_queue.insert(height);
1134                    if let Err(error) = sender.send(ChainAndHeight { chain_id, height }) {
1135                        error!(
1136                            %chain_id,
1137                            %height,
1138                            %error,
1139                            "failed to send chain and height over the channel",
1140                        );
1141                    }
1142                }
1143            }
1144
1145            remote_heights.retain(|height| !to_remove_from_queue.contains(height));
1146        }
1147        trace!(
1148            chain_id = %sender_chain_id,
1149            "find_received_certificates: finished processing chain",
1150        );
1151    }
1152
1153    /// Downloads the log of received messages for a chain from a validator.
1154    #[instrument(level = "trace", skip(self))]
1155    async fn get_received_log_from_validator(
1156        &self,
1157        chain_id: ChainId,
1158        remote_node: &RemoteNode<Env::ValidatorNode>,
1159        tracker: u64,
1160    ) -> Result<Vec<ChainAndHeight>, chain_client::Error> {
1161        let mut offset = tracker;
1162
1163        // Retrieve the list of newly received certificates from this validator.
1164        let mut remote_log = Vec::new();
1165        loop {
1166            trace!("get_received_log_from_validator: looping");
1167            let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_n(offset);
1168            let info = remote_node.handle_chain_info_query(query).await?;
1169            let received_entries = info.requested_received_log.len();
1170            offset += received_entries as u64;
1171            remote_log.extend(info.requested_received_log);
1172            trace!(
1173                remote_node = remote_node.address(),
1174                %received_entries,
1175                "get_received_log_from_validator: received log batch",
1176            );
1177            if received_entries < CHAIN_INFO_MAX_RECEIVED_LOG_ENTRIES {
1178                break;
1179            }
1180        }
1181
1182        trace!(
1183            remote_node = remote_node.address(),
1184            num_entries = remote_log.len(),
1185            "get_received_log_from_validator: returning downloaded log",
1186        );
1187
1188        Ok(remote_log)
1189    }
1190
1191    /// Downloads a specific sender block and recursively downloads any earlier blocks
1192    /// that also sent a message to our chain, based on `previous_message_blocks`.
1193    ///
1194    /// This ensures that we have all the sender blocks needed to preprocess the target block
1195    /// and put the messages to our chain into the outbox.
1196    async fn download_sender_block_with_sending_ancestors(
1197        &self,
1198        receiver_chain_id: ChainId,
1199        sender_chain_id: ChainId,
1200        height: BlockHeight,
1201        remote_node: &RemoteNode<Env::ValidatorNode>,
1202    ) -> Result<(), chain_client::Error> {
1203        let next_outbox_height = self
1204            .local_node
1205            .next_outbox_heights(&[sender_chain_id], receiver_chain_id)
1206            .await?
1207            .get(&sender_chain_id)
1208            .copied()
1209            .unwrap_or(BlockHeight::ZERO);
1210        let (max_epoch, committees) = self.admin_committees().await?;
1211
1212        // Recursively collect all certificates we need, following
1213        // the chain of previous_message_blocks back to next_outbox_height.
1214        let mut certificates = BTreeMap::new();
1215        let mut current_height = height;
1216
1217        // Stop if we've reached the height we've already processed.
1218        while current_height >= next_outbox_height {
1219            // Download the certificate for this height.
1220            let downloaded = self
1221                .requests_scheduler
1222                .download_certificates_by_heights(
1223                    remote_node,
1224                    sender_chain_id,
1225                    vec![current_height],
1226                )
1227                .await?;
1228            let Some(certificate) = downloaded.into_iter().next() else {
1229                return Err(chain_client::Error::CannotDownloadMissingSenderBlock {
1230                    chain_id: sender_chain_id,
1231                    height: current_height,
1232                });
1233            };
1234
1235            // Validate the certificate.
1236            Client::<Env>::check_certificate(max_epoch, &committees, &certificate)?
1237                .into_result()?;
1238
1239            // Check if there's a previous message block to our chain.
1240            let block = certificate.block();
1241            let next_height = block
1242                .body
1243                .previous_message_blocks
1244                .get(&receiver_chain_id)
1245                .map(|(_prev_hash, prev_height)| *prev_height);
1246
1247            // Store this certificate.
1248            certificates.insert(current_height, certificate);
1249
1250            if let Some(prev_height) = next_height {
1251                // Continue with the previous block.
1252                current_height = prev_height;
1253            } else {
1254                // No more dependencies.
1255                break;
1256            }
1257        }
1258
1259        if certificates.is_empty() {
1260            self.local_node
1261                .retry_pending_cross_chain_requests(sender_chain_id)
1262                .await?;
1263        }
1264
1265        // Process certificates in ascending block height order (BTreeMap keeps them sorted).
1266        for certificate in certificates.into_values() {
1267            self.receive_sender_certificate(
1268                certificate,
1269                ReceiveCertificateMode::AlreadyChecked,
1270                Some(vec![remote_node.clone()]),
1271            )
1272            .await?;
1273        }
1274
1275        Ok(())
1276    }
1277
1278    #[instrument(
1279        level = "trace", skip_all,
1280        fields(certificate_hash = ?incoming_certificate.hash()),
1281    )]
1282    fn check_certificate(
1283        highest_known_epoch: Epoch,
1284        committees: &BTreeMap<Epoch, Committee>,
1285        incoming_certificate: &ConfirmedBlockCertificate,
1286    ) -> Result<CheckCertificateResult, NodeError> {
1287        let block = incoming_certificate.block();
1288        // Check that certificates are valid w.r.t one of our trusted committees.
1289        if block.header.epoch > highest_known_epoch {
1290            return Ok(CheckCertificateResult::FutureEpoch);
1291        }
1292        if let Some(known_committee) = committees.get(&block.header.epoch) {
1293            // This epoch is recognized by our chain. Let's verify the
1294            // certificate.
1295            incoming_certificate.check(known_committee)?;
1296            Ok(CheckCertificateResult::New)
1297        } else {
1298            // We don't accept a certificate from a committee that was retired.
1299            Ok(CheckCertificateResult::OldEpoch)
1300        }
1301    }
1302
1303    /// Downloads and processes any certificates we are missing for the given chain.
1304    ///
1305    /// Whether manager values are fetched depends on the chain's follow-only state.
1306    #[instrument(level = "trace", skip_all)]
1307    async fn synchronize_chain_state(
1308        &self,
1309        chain_id: ChainId,
1310    ) -> Result<Box<ChainInfo>, chain_client::Error> {
1311        let (_, committee) = self.admin_committee().await?;
1312        self.synchronize_chain_from_committee(chain_id, committee)
1313            .await
1314    }
1315
1316    /// Downloads certificates for the given chain from the given committee.
1317    ///
1318    /// If the chain is not in follow-only mode, also fetches and processes manager values
1319    /// (timeout certificates, proposals, locking blocks) for consensus participation.
1320    #[instrument(level = "trace", skip_all)]
1321    pub(crate) async fn synchronize_chain_from_committee(
1322        &self,
1323        chain_id: ChainId,
1324        committee: Committee,
1325    ) -> Result<Box<ChainInfo>, chain_client::Error> {
1326        #[cfg(with_metrics)]
1327        let _latency = if !self.is_chain_follow_only(chain_id) {
1328            Some(metrics::SYNCHRONIZE_CHAIN_STATE_LATENCY.measure_latency())
1329        } else {
1330            None
1331        };
1332
1333        let validators = self.make_nodes(&committee)?;
1334        Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
1335        communicate_with_quorum(
1336            &validators,
1337            &committee,
1338            |_: &()| (),
1339            |remote_node| async move {
1340                self.synchronize_chain_state_from(&remote_node, chain_id)
1341                    .await
1342            },
1343            self.options.quorum_grace_period,
1344        )
1345        .await?;
1346
1347        self.local_node
1348            .chain_info(chain_id)
1349            .await
1350            .map_err(Into::into)
1351    }
1352
1353    /// Downloads any certificates from the specified validator that we are missing for the given
1354    /// chain.
1355    ///
1356    /// If the chain is not in follow-only mode, also fetches and processes manager values
1357    /// (timeout certificates, proposals, locking blocks) for consensus participation.
1358    #[instrument(level = "trace", skip(self, remote_node, chain_id))]
1359    pub(crate) async fn synchronize_chain_state_from(
1360        &self,
1361        remote_node: &RemoteNode<Env::ValidatorNode>,
1362        chain_id: ChainId,
1363    ) -> Result<(), chain_client::Error> {
1364        let with_manager_values = !self.is_chain_follow_only(chain_id);
1365        let query = if with_manager_values {
1366            ChainInfoQuery::new(chain_id).with_manager_values()
1367        } else {
1368            ChainInfoQuery::new(chain_id)
1369        };
1370        let remote_info = remote_node.handle_chain_info_query(query).await?;
1371        let local_info = self
1372            .download_certificates_from(remote_node, chain_id, remote_info.next_block_height)
1373            .await?;
1374
1375        if !with_manager_values {
1376            return Ok(());
1377        }
1378
1379        // If we are at the same height as the remote node, we also update our chain manager.
1380        let local_height = match local_info {
1381            Some(info) => info.next_block_height,
1382            None => {
1383                self.local_node
1384                    .chain_info(chain_id)
1385                    .await?
1386                    .next_block_height
1387            }
1388        };
1389        if local_height != remote_info.next_block_height {
1390            debug!(
1391                remote_node = remote_node.address(),
1392                remote_height = %remote_info.next_block_height,
1393                local_height = %local_height,
1394                "synced from validator, but remote height and local height are different",
1395            );
1396            return Ok(());
1397        };
1398
1399        if let Some(timeout) = remote_info.manager.timeout {
1400            self.handle_certificate(*timeout).await?;
1401        }
1402        let mut proposals = Vec::new();
1403        if let Some(proposal) = remote_info.manager.requested_signed_proposal {
1404            proposals.push(*proposal);
1405        }
1406        if let Some(proposal) = remote_info.manager.requested_proposed {
1407            proposals.push(*proposal);
1408        }
1409        if let Some(locking) = remote_info.manager.requested_locking {
1410            match *locking {
1411                LockingBlock::Fast(proposal) => {
1412                    proposals.push(proposal);
1413                }
1414                LockingBlock::Regular(cert) => {
1415                    let hash = cert.hash();
1416                    if let Err(error) = self.try_process_locking_block_from(remote_node, cert).await
1417                    {
1418                        debug!(
1419                            remote_node = remote_node.address(),
1420                            %hash,
1421                            height = %local_height,
1422                            %error,
1423                            "skipping locked block from validator",
1424                        );
1425                    }
1426                }
1427            }
1428        }
1429        'proposal_loop: for proposal in proposals {
1430            let owner: AccountOwner = proposal.owner();
1431            if let Err(mut err) = self
1432                .local_node
1433                .handle_block_proposal(proposal.clone())
1434                .await
1435            {
1436                if let LocalNodeError::BlobsNotFound(_) = &err {
1437                    let required_blob_ids = proposal.required_blob_ids().collect::<Vec<_>>();
1438                    if !required_blob_ids.is_empty() {
1439                        let mut blobs = Vec::new();
1440                        for blob_id in required_blob_ids {
1441                            let blob_content = match self
1442                                .requests_scheduler
1443                                .download_pending_blob(remote_node, chain_id, blob_id)
1444                                .await
1445                            {
1446                                Ok(content) => content,
1447                                Err(error) => {
1448                                    info!(
1449                                        remote_node = remote_node.address(),
1450                                        height = %local_height,
1451                                        proposer = %owner,
1452                                        %blob_id,
1453                                        %error,
1454                                        "skipping proposal from validator; failed to download blob",
1455                                    );
1456                                    continue 'proposal_loop;
1457                                }
1458                            };
1459                            blobs.push(Blob::new(blob_content));
1460                        }
1461                        self.local_node
1462                            .handle_pending_blobs(chain_id, blobs)
1463                            .await?;
1464                        // We found the missing blobs: retry.
1465                        if let Err(new_err) = self
1466                            .local_node
1467                            .handle_block_proposal(proposal.clone())
1468                            .await
1469                        {
1470                            err = new_err;
1471                        } else {
1472                            continue;
1473                        }
1474                    }
1475                    if let LocalNodeError::BlobsNotFound(blob_ids) = &err {
1476                        self.update_local_node_with_blobs_from(
1477                            blob_ids.clone(),
1478                            &[remote_node.clone()],
1479                        )
1480                        .await?;
1481                        // We found the missing blobs: retry.
1482                        if let Err(new_err) = self
1483                            .local_node
1484                            .handle_block_proposal(proposal.clone())
1485                            .await
1486                        {
1487                            err = new_err;
1488                        } else {
1489                            continue;
1490                        }
1491                    }
1492                }
1493                while let LocalNodeError::WorkerError(WorkerError::ChainError(chain_err)) = &err {
1494                    if let ChainError::MissingCrossChainUpdate {
1495                        chain_id,
1496                        origin,
1497                        height,
1498                    } = &**chain_err
1499                    {
1500                        self.download_sender_block_with_sending_ancestors(
1501                            *chain_id,
1502                            *origin,
1503                            *height,
1504                            remote_node,
1505                        )
1506                        .await?;
1507                        // Retry
1508                        if let Err(new_err) = self
1509                            .local_node
1510                            .handle_block_proposal(proposal.clone())
1511                            .await
1512                        {
1513                            err = new_err;
1514                        } else {
1515                            continue 'proposal_loop;
1516                        }
1517                    } else {
1518                        break;
1519                    }
1520                }
1521
1522                debug!(
1523                    remote_node = remote_node.address(),
1524                    proposer = %owner,
1525                    height = %local_height,
1526                    error = %err,
1527                    "skipping proposal from validator",
1528                );
1529            }
1530        }
1531        Ok(())
1532    }
1533
1534    async fn try_process_locking_block_from(
1535        &self,
1536        remote_node: &RemoteNode<Env::ValidatorNode>,
1537        certificate: GenericCertificate<ValidatedBlock>,
1538    ) -> Result<(), chain_client::Error> {
1539        let chain_id = certificate.inner().chain_id();
1540        let certificate = Box::new(certificate);
1541        match self.process_certificate(certificate.clone()).await {
1542            Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1543                let mut blobs = Vec::new();
1544                for blob_id in blob_ids {
1545                    let blob_content = self
1546                        .requests_scheduler
1547                        .download_pending_blob(remote_node, chain_id, blob_id)
1548                        .await?;
1549                    blobs.push(Blob::new(blob_content));
1550                }
1551                self.local_node
1552                    .handle_pending_blobs(chain_id, blobs)
1553                    .await?;
1554                self.process_certificate(certificate).await?;
1555                Ok(())
1556            }
1557            Err(err) => Err(err.into()),
1558            Ok(()) => Ok(()),
1559        }
1560    }
1561
1562    /// Downloads and processes from the specified validators a confirmed block certificates that
1563    /// use the given blobs. If this succeeds, the blob will be in our storage.
1564    async fn update_local_node_with_blobs_from(
1565        &self,
1566        blob_ids: Vec<BlobId>,
1567        remote_nodes: &[RemoteNode<Env::ValidatorNode>],
1568    ) -> Result<Vec<Blob>, chain_client::Error> {
1569        let timeout = self.options.blob_download_timeout;
1570        // Deduplicate IDs.
1571        let blob_ids = blob_ids.into_iter().collect::<BTreeSet<_>>();
1572        stream::iter(blob_ids.into_iter().map(|blob_id| {
1573            communicate_concurrently(
1574                remote_nodes,
1575                async move |remote_node| {
1576                    let certificate = self
1577                        .requests_scheduler
1578                        .download_certificate_for_blob(&remote_node, blob_id)
1579                        .await?;
1580                    self.receive_sender_certificate(
1581                        certificate,
1582                        ReceiveCertificateMode::NeedsCheck,
1583                        Some(vec![remote_node.clone()]),
1584                    )
1585                    .await?;
1586                    let blob = self
1587                        .local_node
1588                        .storage_client()
1589                        .read_blob(blob_id)
1590                        .await?
1591                        .ok_or_else(|| LocalNodeError::BlobsNotFound(vec![blob_id]))?;
1592                    Result::<_, chain_client::Error>::Ok(blob)
1593                },
1594                move |_| chain_client::Error::from(NodeError::BlobsNotFound(vec![blob_id])),
1595                timeout,
1596            )
1597        }))
1598        .buffer_unordered(self.options.max_joined_tasks)
1599        .collect::<Vec<_>>()
1600        .await
1601        .into_iter()
1602        .collect()
1603    }
1604
1605    /// Attempts to execute the block locally. If any incoming message execution fails, that
1606    /// message is rejected and execution is retried, until the block accepts only messages
1607    /// that succeed.
1608    // TODO(#2806): Measure how failing messages affect the execution times.
1609    #[tracing::instrument(level = "trace", skip(self, block))]
1610    async fn stage_block_execution_and_discard_failing_messages(
1611        &self,
1612        mut block: ProposedBlock,
1613        round: Option<u32>,
1614        published_blobs: Vec<Blob>,
1615    ) -> Result<(Block, ChainInfoResponse), chain_client::Error> {
1616        loop {
1617            let result = self
1618                .stage_block_execution(block.clone(), round, published_blobs.clone())
1619                .await;
1620            if let Err(chain_client::Error::LocalNodeError(LocalNodeError::WorkerError(
1621                WorkerError::ChainError(chain_error),
1622            ))) = &result
1623            {
1624                if let ChainError::ExecutionError(
1625                    error,
1626                    ChainExecutionContext::IncomingBundle(index),
1627                ) = &**chain_error
1628                {
1629                    let transaction = block
1630                        .transactions
1631                        .get_mut(*index as usize)
1632                        .expect("Transaction at given index should exist");
1633                    let Transaction::ReceiveMessages(incoming_bundle) = transaction else {
1634                        panic!(
1635                            "Expected incoming bundle at transaction index {}, found operation",
1636                            index
1637                        );
1638                    };
1639                    ensure!(
1640                        !incoming_bundle.bundle.is_protected(),
1641                        chain_client::Error::BlockProposalError(
1642                            "Protected incoming message failed to execute locally"
1643                        )
1644                    );
1645                    if incoming_bundle.action == MessageAction::Reject {
1646                        return result;
1647                    }
1648                    // Reject the faulty message from the block and continue.
1649                    // TODO(#1420): This is potentially a bit heavy-handed for
1650                    // retryable errors.
1651                    info!(
1652                        %error, %index, origin = ?incoming_bundle.origin,
1653                        "Message bundle failed to execute locally and will be rejected."
1654                    );
1655                    incoming_bundle.action = MessageAction::Reject;
1656                    continue;
1657                }
1658            }
1659            return result;
1660        }
1661    }
1662
1663    /// Attempts to execute the block locally. If any attempt to read a blob fails, the blob is
1664    /// downloaded and execution is retried.
1665    #[instrument(level = "trace", skip(self, block))]
1666    async fn stage_block_execution(
1667        &self,
1668        block: ProposedBlock,
1669        round: Option<u32>,
1670        published_blobs: Vec<Blob>,
1671    ) -> Result<(Block, ChainInfoResponse), chain_client::Error> {
1672        loop {
1673            let result = self
1674                .local_node
1675                .stage_block_execution(block.clone(), round, published_blobs.clone())
1676                .await;
1677            if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1678                let validators = self.validator_nodes().await?;
1679                self.update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1680                    .await?;
1681                continue; // We found the missing blob: retry.
1682            }
1683            if let Ok((block, _)) = &result {
1684                let hash = CryptoHash::new(block);
1685                let notification = Notification {
1686                    chain_id: block.header.chain_id,
1687                    reason: Reason::BlockExecuted {
1688                        height: block.header.height,
1689                        hash,
1690                    },
1691                };
1692                self.notifier.notify(&[notification]);
1693            }
1694            return Ok(result?);
1695        }
1696    }
1697}
1698
1699/// Performs `f` in parallel on multiple nodes, starting with a quadratically increasing delay on
1700/// each subsequent node. Returns error `err` if all of the nodes fail.
1701async fn communicate_concurrently<'a, A, E1, E2, F, G, R, V>(
1702    nodes: &[RemoteNode<A>],
1703    f: F,
1704    err: G,
1705    timeout: Duration,
1706) -> Result<V, E2>
1707where
1708    F: Clone + FnOnce(RemoteNode<A>) -> R,
1709    RemoteNode<A>: Clone,
1710    G: FnOnce(Vec<(ValidatorPublicKey, E1)>) -> E2,
1711    R: Future<Output = Result<V, E1>> + 'a,
1712{
1713    let mut stream = nodes
1714        .iter()
1715        .zip(0..)
1716        .map(|(remote_node, i)| {
1717            let fun = f.clone();
1718            let node = remote_node.clone();
1719            async move {
1720                linera_base::time::timer::sleep(timeout * i * i).await;
1721                fun(node).await.map_err(|err| (remote_node.public_key, err))
1722            }
1723        })
1724        .collect::<FuturesUnordered<_>>();
1725    let mut errors = vec![];
1726    while let Some(maybe_result) = stream.next().await {
1727        match maybe_result {
1728            Ok(result) => return Ok(result),
1729            Err(error) => errors.push(error),
1730        };
1731    }
1732    Err(err(errors))
1733}
1734
1735/// The outcome of trying to commit a list of incoming messages and operations to the chain.
1736#[derive(Debug)]
1737enum ExecuteBlockOutcome {
1738    /// A block with the messages and operations was committed.
1739    Executed(ConfirmedBlockCertificate),
1740    /// A different block was already proposed and got committed. Check whether the messages and
1741    /// operations are still suitable, and try again at the next block height.
1742    Conflict(ConfirmedBlockCertificate),
1743    /// We are not the round leader and cannot do anything. Try again at the specified time or
1744    /// or whenever the round or block height changes.
1745    WaitForTimeout(RoundTimeout),
1746}
1747
1748/// Wrapper for `AbortHandle` that aborts when its dropped.
1749#[must_use]
1750pub struct AbortOnDrop(pub AbortHandle);
1751
1752impl Drop for AbortOnDrop {
1753    #[instrument(level = "trace", skip(self))]
1754    fn drop(&mut self) {
1755        self.0.abort();
1756    }
1757}
1758
1759/// A pending proposed block, together with its published blobs.
1760#[derive(Clone, Serialize, Deserialize)]
1761pub struct PendingProposal {
1762    pub block: ProposedBlock,
1763    pub blobs: Vec<Blob>,
1764}
1765
1766enum ReceiveCertificateMode {
1767    NeedsCheck,
1768    AlreadyChecked,
1769}
1770
1771enum CheckCertificateResult {
1772    OldEpoch,
1773    New,
1774    FutureEpoch,
1775}
1776
1777impl CheckCertificateResult {
1778    fn into_result(self) -> Result<(), chain_client::Error> {
1779        match self {
1780            Self::OldEpoch => Err(chain_client::Error::CommitteeDeprecationError),
1781            Self::New => Ok(()),
1782            Self::FutureEpoch => Err(chain_client::Error::CommitteeSynchronizationError),
1783        }
1784    }
1785}
1786
1787/// Creates a compressed Contract, Service and bytecode.
1788#[cfg(not(target_arch = "wasm32"))]
1789pub async fn create_bytecode_blobs(
1790    contract: Bytecode,
1791    service: Bytecode,
1792    vm_runtime: VmRuntime,
1793) -> (Vec<Blob>, ModuleId) {
1794    match vm_runtime {
1795        VmRuntime::Wasm => {
1796            let (compressed_contract, compressed_service) =
1797                tokio::task::spawn_blocking(move || (contract.compress(), service.compress()))
1798                    .await
1799                    .expect("Compression should not panic");
1800            let contract_blob = Blob::new_contract_bytecode(compressed_contract);
1801            let service_blob = Blob::new_service_bytecode(compressed_service);
1802            let module_id =
1803                ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
1804            (vec![contract_blob, service_blob], module_id)
1805        }
1806        VmRuntime::Evm => {
1807            let compressed_contract = contract.compress();
1808            let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
1809            let module_id = ModuleId::new(
1810                evm_contract_blob.id().hash,
1811                evm_contract_blob.id().hash,
1812                vm_runtime,
1813            );
1814            (vec![evm_contract_blob], module_id)
1815        }
1816    }
1817}