linera_core/client/
mod.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    cmp::{Ordering, PartialOrd},
7    collections::{hash_map, BTreeMap, BTreeSet, HashMap, HashSet},
8    convert::Infallible,
9    iter,
10    sync::{Arc, RwLock},
11};
12
13use chain_client_state::ChainClientState;
14use custom_debug_derive::Debug;
15use futures::{
16    future::{self, Either, FusedFuture, Future},
17    stream::{self, AbortHandle, FusedStream, FuturesUnordered, StreamExt},
18};
19#[cfg(with_metrics)]
20use linera_base::prometheus_util::MeasureLatency as _;
21use linera_base::{
22    abi::Abi,
23    crypto::{signer, AccountPublicKey, CryptoHash, Signer, ValidatorPublicKey},
24    data_types::{
25        Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
26        ChainDescription, Epoch, Round, Timestamp,
27    },
28    ensure,
29    identifiers::{
30        Account, AccountOwner, ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent,
31        ModuleId, StreamId,
32    },
33    ownership::{ChainOwnership, TimeoutConfig},
34    time::{Duration, Instant},
35};
36#[cfg(not(target_arch = "wasm32"))]
37use linera_base::{data_types::Bytecode, vm::VmRuntime};
38use linera_chain::{
39    data_types::{
40        BlockProposal, ChainAndHeight, IncomingBundle, LiteVote, MessageAction, ProposedBlock,
41        Transaction,
42    },
43    manager::LockingBlock,
44    types::{
45        Block, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
46        LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
47    },
48    ChainError, ChainExecutionContext, ChainStateView,
49};
50use linera_execution::{
51    committee::Committee,
52    system::{
53        AdminOperation, OpenChainConfig, SystemOperation, EPOCH_STREAM_NAME,
54        REMOVED_EPOCH_STREAM_NAME,
55    },
56    ExecutionError, Operation, Query, QueryOutcome, QueryResponse, SystemQuery, SystemResponse,
57};
58use linera_storage::{Clock as _, ResultReadCertificates, Storage as _};
59use linera_views::ViewError;
60use rand::{
61    distributions::{Distribution, WeightedIndex},
62    rngs::StdRng,
63    SeedableRng,
64};
65use serde::{Deserialize, Serialize};
66use thiserror::Error;
67use tokio::sync::{mpsc, OwnedRwLockReadGuard};
68use tokio_stream::wrappers::UnboundedReceiverStream;
69use tracing::{debug, error, info, instrument, trace, warn, Instrument as _};
70
71use crate::{
72    data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse, ClientOutcome, RoundTimeout},
73    environment::Environment,
74    local_node::{LocalChainInfoExt as _, LocalNodeClient, LocalNodeError},
75    node::{
76        CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
77        ValidatorNodeProvider as _,
78    },
79    notifier::ChannelNotifier,
80    remote_node::RemoteNode,
81    updater::{communicate_with_quorum, CommunicateAction, CommunicationError, ValidatorUpdater},
82    worker::{Notification, ProcessableCertificate, Reason, WorkerError, WorkerState},
83};
84
85mod chain_client_state;
86#[cfg(test)]
87#[path = "../unit_tests/client_tests.rs"]
88mod client_tests;
89
90#[cfg(with_metrics)]
91mod metrics {
92    use std::sync::LazyLock;
93
94    use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram_vec};
95    use prometheus::HistogramVec;
96
97    pub static PROCESS_INBOX_WITHOUT_PREPARE_LATENCY: LazyLock<HistogramVec> =
98        LazyLock::new(|| {
99            register_histogram_vec(
100                "process_inbox_latency",
101                "process_inbox latency",
102                &[],
103                exponential_bucket_latencies(500.0),
104            )
105        });
106
107    pub static PREPARE_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
108        register_histogram_vec(
109            "prepare_chain_latency",
110            "prepare_chain latency",
111            &[],
112            exponential_bucket_latencies(500.0),
113        )
114    });
115
116    pub static SYNCHRONIZE_CHAIN_STATE_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
117        register_histogram_vec(
118            "synchronize_chain_state_latency",
119            "synchronize_chain_state latency",
120            &[],
121            exponential_bucket_latencies(500.0),
122        )
123    });
124
125    pub static EXECUTE_BLOCK_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
126        register_histogram_vec(
127            "execute_block_latency",
128            "execute_block latency",
129            &[],
130            exponential_bucket_latencies(500.0),
131        )
132    });
133
134    pub static FIND_RECEIVED_CERTIFICATES_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
135        register_histogram_vec(
136            "find_received_certificates_latency",
137            "find_received_certificates latency",
138            &[],
139            exponential_bucket_latencies(500.0),
140        )
141    });
142}
143
144/// Defines how we listen to a chain:
145/// - do we care about every block notification?
146/// - or do we only care about blocks containing events from some particular streams?
147#[derive(Debug, Clone, PartialEq, Eq)]
148pub enum ListeningMode {
149    /// Listen to everything.
150    FullChain,
151    /// Only listen to blocks which contain events from those streams.
152    EventsOnly(BTreeSet<StreamId>),
153}
154
155impl PartialOrd for ListeningMode {
156    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
157        match (self, other) {
158            (ListeningMode::FullChain, ListeningMode::FullChain) => Some(Ordering::Equal),
159            (ListeningMode::FullChain, _) => Some(Ordering::Greater),
160            (_, ListeningMode::FullChain) => Some(Ordering::Less),
161            (ListeningMode::EventsOnly(events_a), ListeningMode::EventsOnly(events_b)) => {
162                if events_a.is_superset(events_b) {
163                    Some(Ordering::Greater)
164                } else if events_b.is_superset(events_a) {
165                    Some(Ordering::Less)
166                } else {
167                    None
168                }
169            }
170        }
171    }
172}
173
174impl ListeningMode {
175    pub fn extend(&mut self, other: Option<ListeningMode>) {
176        match (self, other) {
177            (_, None) => (),
178            (ListeningMode::FullChain, _) => (),
179            (mode, Some(ListeningMode::FullChain)) => {
180                *mode = ListeningMode::FullChain;
181            }
182            (
183                ListeningMode::EventsOnly(self_events),
184                Some(ListeningMode::EventsOnly(other_events)),
185            ) => {
186                self_events.extend(other_events);
187            }
188        }
189    }
190}
191
192/// A builder that creates [`ChainClient`]s which share the cache and notifiers.
193pub struct Client<Env: Environment> {
194    environment: Env,
195    /// Local node to manage the execution state and the local storage of the chains that we are
196    /// tracking.
197    local_node: LocalNodeClient<Env::Storage>,
198    /// The admin chain ID.
199    admin_id: ChainId,
200    /// Chains that should be tracked by the client.
201    // TODO(#2412): Merge with set of chains the client is receiving notifications from validators
202    tracked_chains: Arc<RwLock<HashSet<ChainId>>>,
203    /// References to clients waiting for chain notifications.
204    notifier: Arc<ChannelNotifier<Notification>>,
205    /// Chain state for the managed chains.
206    chains: papaya::HashMap<ChainId, ChainClientState>,
207    /// Configuration options.
208    options: ChainClientOptions,
209}
210
211impl<Env: Environment> Client<Env> {
212    /// Creates a new `Client` with a new cache and notifiers.
213    #[instrument(level = "trace", skip_all)]
214    pub fn new(
215        environment: Env,
216        admin_id: ChainId,
217        long_lived_services: bool,
218        tracked_chains: impl IntoIterator<Item = ChainId>,
219        name: impl Into<String>,
220        chain_worker_ttl: Duration,
221        options: ChainClientOptions,
222    ) -> Self {
223        let tracked_chains = Arc::new(RwLock::new(tracked_chains.into_iter().collect()));
224        let state = WorkerState::new_for_client(
225            name.into(),
226            environment.storage().clone(),
227            tracked_chains.clone(),
228        )
229        .with_long_lived_services(long_lived_services)
230        .with_allow_inactive_chains(true)
231        .with_allow_messages_from_deprecated_epochs(true)
232        .with_chain_worker_ttl(chain_worker_ttl);
233        let local_node = LocalNodeClient::new(state);
234
235        Self {
236            environment,
237            local_node,
238            chains: papaya::HashMap::new(),
239            admin_id,
240            tracked_chains,
241            notifier: Arc::new(ChannelNotifier::default()),
242            options,
243        }
244    }
245
246    /// Returns the storage client used by this client's local node.
247    pub fn storage_client(&self) -> &Env::Storage {
248        self.environment.storage()
249    }
250
251    pub fn validator_node_provider(&self) -> &Env::Network {
252        self.environment.network()
253    }
254
255    /// Returns a reference to the [`Signer`] of the client.
256    #[instrument(level = "trace", skip(self))]
257    pub fn signer(&self) -> &impl Signer {
258        self.environment.signer()
259    }
260
261    /// Adds a chain to the set of chains tracked by the local node.
262    #[instrument(level = "trace", skip(self))]
263    pub fn track_chain(&self, chain_id: ChainId) {
264        self.tracked_chains
265            .write()
266            .expect("Panics should not happen while holding a lock to `tracked_chains`")
267            .insert(chain_id);
268    }
269
270    /// Creates a new `ChainClient`.
271    #[instrument(level = "trace", skip_all, fields(chain_id, next_block_height))]
272    pub fn create_chain_client(
273        self: &Arc<Self>,
274        chain_id: ChainId,
275        block_hash: Option<CryptoHash>,
276        next_block_height: BlockHeight,
277        pending_proposal: Option<PendingProposal>,
278        preferred_owner: Option<AccountOwner>,
279        timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
280    ) -> ChainClient<Env> {
281        // If the entry already exists we assume that the entry is more up to date than
282        // the arguments: If they were read from the wallet file, they might be stale.
283        self.chains
284            .pin()
285            .get_or_insert_with(chain_id, || ChainClientState::new(pending_proposal.clone()));
286
287        ChainClient {
288            client: self.clone(),
289            chain_id,
290            options: self.options.clone(),
291            preferred_owner,
292            initial_block_hash: block_hash,
293            initial_next_block_height: next_block_height,
294            timing_sender,
295        }
296    }
297
298    /// Fetches the chain description blob if needed, and returns the chain info.
299    async fn fetch_chain_info(
300        &self,
301        chain_id: ChainId,
302        validators: &[RemoteNode<Env::ValidatorNode>],
303    ) -> Result<Box<ChainInfo>, ChainClientError> {
304        match self.local_node.chain_info(chain_id).await {
305            Ok(info) => Ok(info),
306            Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
307                // Make sure the admin chain is up to date.
308                self.synchronize_chain_state(self.admin_id).await?;
309                // If the chain is missing then the error is a WorkerError
310                // and so a BlobsNotFound
311                self.update_local_node_with_blobs_from(blob_ids, validators)
312                    .await?;
313                Ok(self.local_node.chain_info(chain_id).await?)
314            }
315            Err(err) => Err(err.into()),
316        }
317    }
318
319    fn weighted_select(
320        remaining_validators: &mut Vec<RemoteNode<Env::ValidatorNode>>,
321        remaining_weights: &mut Vec<u64>,
322        rng: &mut StdRng,
323    ) -> Option<RemoteNode<Env::ValidatorNode>> {
324        if remaining_weights.is_empty() {
325            return None;
326        }
327        let dist = WeightedIndex::new(remaining_weights.clone()).unwrap();
328        let idx = dist.sample(rng);
329        remaining_weights.remove(idx);
330        Some(remaining_validators.remove(idx))
331    }
332
333    /// Downloads and processes all certificates up to (excluding) the specified height.
334    #[instrument(level = "trace", skip(self))]
335    async fn download_certificates(
336        &self,
337        chain_id: ChainId,
338        target_next_block_height: BlockHeight,
339    ) -> Result<Box<ChainInfo>, ChainClientError> {
340        let (_, committee) = self.admin_committee().await?;
341        let mut remaining_validators = self.make_nodes(&committee)?;
342        let mut info = self
343            .fetch_chain_info(chain_id, &remaining_validators)
344            .await?;
345        // Determining the weights of the validators
346        let mut remaining_weights = remaining_validators
347            .iter()
348            .map(|validator| {
349                let validator_state = committee.validators.get(&validator.public_key).unwrap();
350                validator_state.votes
351            })
352            .collect::<Vec<_>>();
353        let mut rng: StdRng = StdRng::from_entropy();
354
355        while let Some(remote_node) =
356            Self::weighted_select(&mut remaining_validators, &mut remaining_weights, &mut rng)
357        {
358            if target_next_block_height <= info.next_block_height {
359                return Ok(info);
360            }
361            match self
362                .download_certificates_from(&remote_node, chain_id, target_next_block_height)
363                .await
364            {
365                Err(err) => warn!(
366                    "Failed to download certificates from validator {:?}: {err}",
367                    remote_node.public_key
368                ),
369                Ok(Some(new_info)) => info = new_info,
370                Ok(None) => {}
371            }
372        }
373        ensure!(
374            target_next_block_height <= info.next_block_height,
375            ChainClientError::CannotDownloadCertificates {
376                chain_id,
377                target_next_block_height,
378            }
379        );
380        Ok(info)
381    }
382
383    /// Downloads and processes all certificates up to (excluding) the specified height from the
384    /// given validator.
385    #[instrument(level = "trace", skip_all)]
386    async fn download_certificates_from(
387        &self,
388        remote_node: &RemoteNode<Env::ValidatorNode>,
389        chain_id: ChainId,
390        stop: BlockHeight,
391    ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
392        let mut last_info = None;
393        // First load any blocks from local storage, if available.
394        let mut hashes = Vec::new();
395        let mut next_height = BlockHeight::ZERO;
396        {
397            let chain = self.local_node.chain_state_view(chain_id).await?;
398            next_height = next_height.max(chain.tip_state.get().next_block_height);
399            while next_height < stop {
400                let Some(hash) = chain.preprocessed_blocks.get(&next_height).await? else {
401                    break;
402                };
403                hashes.push(hash);
404                next_height = next_height.try_add_one()?;
405            }
406        }
407        let certificates = self
408            .storage_client()
409            .read_certificates(hashes.clone())
410            .await?;
411        let certificates = match ResultReadCertificates::new(certificates, hashes) {
412            ResultReadCertificates::Certificates(certificates) => certificates,
413            ResultReadCertificates::InvalidHashes(hashes) => {
414                return Err(ChainClientError::ReadCertificatesError(hashes))
415            }
416        };
417        for certificate in certificates {
418            last_info = Some(self.handle_certificate(Box::new(certificate)).await?.info);
419        }
420        // Now download the rest in batches from the remote node.
421        while next_height < stop {
422            // TODO(#2045): Analyze network errors instead of guessing the batch size.
423            let limit = u64::from(stop)
424                .checked_sub(u64::from(next_height))
425                .ok_or(ArithmeticError::Overflow)?
426                .min(1000);
427            let certificates = remote_node
428                .query_certificates_from(chain_id, next_height, limit)
429                .await?;
430            let Some(info) = self.process_certificates(remote_node, certificates).await? else {
431                break;
432            };
433            assert!(info.next_block_height > next_height);
434            next_height = info.next_block_height;
435            last_info = Some(info);
436        }
437        Ok(last_info)
438    }
439
440    /// Tries to process all the certificates, requesting any missing blobs from the given node.
441    /// Returns the chain info of the last successfully processed certificate.
442    #[instrument(level = "trace", skip_all)]
443    async fn process_certificates(
444        &self,
445        remote_node: &RemoteNode<impl ValidatorNode>,
446        certificates: Vec<ConfirmedBlockCertificate>,
447    ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
448        let mut info = None;
449        for certificate in certificates {
450            let certificate = Box::new(certificate);
451            let mut result = self.handle_certificate(certificate.clone()).await;
452
453            if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
454                let blobs = future::join_all(blob_ids.iter().map(|blob_id| async move {
455                    remote_node.try_download_blob(*blob_id).await.unwrap()
456                }))
457                .await;
458                self.local_node.store_blobs(&blobs).await?;
459                result = self.handle_certificate(certificate.clone()).await;
460            }
461
462            info = Some(result?.info);
463        }
464        // Done with all certificates.
465        Ok(info)
466    }
467
468    async fn handle_certificate<T: ProcessableCertificate>(
469        &self,
470        certificate: Box<GenericCertificate<T>>,
471    ) -> Result<ChainInfoResponse, LocalNodeError> {
472        self.local_node
473            .handle_certificate(*certificate, &self.notifier)
474            .await
475    }
476
477    async fn chain_info_with_committees(
478        &self,
479        chain_id: ChainId,
480    ) -> Result<Box<ChainInfo>, LocalNodeError> {
481        let query = ChainInfoQuery::new(chain_id).with_committees();
482        let info = self.local_node.handle_chain_info_query(query).await?.info;
483        Ok(info)
484    }
485
486    /// Obtains all the committees trusted by any of the given chains. Also returns the highest
487    /// of their epochs.
488    #[instrument(level = "trace", skip_all)]
489    async fn admin_committees(
490        &self,
491    ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
492        let info = self.chain_info_with_committees(self.admin_id).await?;
493        Ok((info.epoch, info.into_committees()?))
494    }
495
496    /// Obtains the committee for the latest epoch on the admin chain.
497    pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
498        let info = self.chain_info_with_committees(self.admin_id).await?;
499        Ok((info.epoch, info.into_current_committee()?))
500    }
501
502    /// Obtains the validators for the latest epoch.
503    async fn validator_nodes(
504        &self,
505    ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, ChainClientError> {
506        let (_, committee) = self.admin_committee().await?;
507        Ok(self.make_nodes(&committee)?)
508    }
509
510    /// Creates a [`RemoteNode`] for each validator in the committee.
511    fn make_nodes(
512        &self,
513        committee: &Committee,
514    ) -> Result<Vec<RemoteNode<Env::ValidatorNode>>, NodeError> {
515        Ok(self
516            .validator_node_provider()
517            .make_nodes(committee)?
518            .map(|(public_key, node)| RemoteNode { public_key, node })
519            .collect())
520    }
521
522    /// Ensures that the client has the `ChainDescription` blob corresponding to this
523    /// client's `ChainId`.
524    pub async fn get_chain_description(
525        &self,
526        chain_id: ChainId,
527    ) -> Result<ChainDescription, ChainClientError> {
528        let chain_desc_id = BlobId::new(chain_id.0, BlobType::ChainDescription);
529        let blob = self
530            .local_node
531            .storage_client()
532            .read_blob(chain_desc_id)
533            .await?;
534        if let Some(blob) = blob {
535            // We have the blob - return it.
536            return Ok(bcs::from_bytes(blob.bytes())?);
537        };
538        // Recover history from the current validators, according to the admin chain.
539        self.synchronize_chain_state(self.admin_id).await?;
540        let nodes = self.validator_nodes().await?;
541        let blob = self
542            .update_local_node_with_blobs_from(vec![chain_desc_id], &nodes)
543            .await?
544            .pop()
545            .unwrap(); // Returns exactly as many blobs as passed-in IDs.
546        Ok(bcs::from_bytes(blob.bytes())?)
547    }
548
549    /// Updates the latest block and next block height and round information from the chain info.
550    #[instrument(level = "trace", skip_all, fields(chain_id = format!("{:.8}", info.chain_id)))]
551    fn update_from_info(&self, info: &ChainInfo) {
552        self.chains.pin().update(info.chain_id, |state| {
553            let mut state = state.clone_for_update_unchecked();
554            state.update_from_info(info);
555            state
556        });
557    }
558
559    /// Handles the certificate in the local node and the resulting notifications.
560    #[instrument(level = "trace", skip_all)]
561    async fn process_certificate<T: ProcessableCertificate>(
562        &self,
563        certificate: Box<GenericCertificate<T>>,
564    ) -> Result<(), LocalNodeError> {
565        let info = self.handle_certificate(certificate).await?.info;
566        self.update_from_info(&info);
567        Ok(())
568    }
569
570    /// Submits a validated block for finalization and returns the confirmed block certificate.
571    #[instrument(level = "trace", skip_all)]
572    async fn finalize_block(
573        &self,
574        committee: &Committee,
575        certificate: ValidatedBlockCertificate,
576    ) -> Result<ConfirmedBlockCertificate, ChainClientError> {
577        debug!(round = %certificate.round, "Submitting block for confirmation");
578        let hashed_value = ConfirmedBlock::new(certificate.inner().block().clone());
579        let finalize_action = CommunicateAction::FinalizeBlock {
580            certificate: Box::new(certificate),
581            delivery: self.options.cross_chain_message_delivery,
582        };
583        let certificate = self
584            .communicate_chain_action(committee, finalize_action, hashed_value)
585            .await?;
586        self.receive_certificate(certificate.clone(), ReceiveCertificateMode::AlreadyChecked)
587            .await?;
588        Ok(certificate)
589    }
590
591    /// Submits a block proposal to the validators.
592    #[instrument(level = "trace", skip_all)]
593    async fn submit_block_proposal<T: ProcessableCertificate>(
594        &self,
595        committee: &Committee,
596        proposal: Box<BlockProposal>,
597        value: T,
598    ) -> Result<GenericCertificate<T>, ChainClientError> {
599        debug!(
600            round = %proposal.content.round,
601            "Submitting block proposal to validators"
602        );
603        let submit_action = CommunicateAction::SubmitBlock {
604            proposal,
605            blob_ids: value.required_blob_ids().into_iter().collect(),
606        };
607        let certificate = self
608            .communicate_chain_action(committee, submit_action, value)
609            .await?;
610        self.process_certificate(Box::new(certificate.clone()))
611            .await?;
612        Ok(certificate)
613    }
614
615    /// Broadcasts certified blocks to validators.
616    #[instrument(level = "trace", skip_all, fields(chain_id, block_height, delivery))]
617    async fn communicate_chain_updates(
618        &self,
619        committee: &Committee,
620        chain_id: ChainId,
621        height: BlockHeight,
622        delivery: CrossChainMessageDelivery,
623    ) -> Result<(), ChainClientError> {
624        let nodes = self.make_nodes(committee)?;
625        communicate_with_quorum(
626            &nodes,
627            committee,
628            |_: &()| (),
629            |remote_node| {
630                let mut updater = ValidatorUpdater {
631                    remote_node,
632                    local_node: self.local_node.clone(),
633                    admin_id: self.admin_id,
634                };
635                Box::pin(async move {
636                    updater
637                        .send_chain_information(chain_id, height, delivery)
638                        .await
639                })
640            },
641            self.options.grace_period,
642        )
643        .await?;
644        Ok(())
645    }
646
647    /// Broadcasts certified blocks and optionally a block proposal, certificate or
648    /// leader timeout request.
649    ///
650    /// In that case, it verifies that the validator votes are for the provided value,
651    /// and returns a certificate.
652    #[instrument(level = "trace", skip_all)]
653    async fn communicate_chain_action<T: CertificateValue>(
654        &self,
655        committee: &Committee,
656        action: CommunicateAction,
657        value: T,
658    ) -> Result<GenericCertificate<T>, ChainClientError> {
659        let nodes = self.make_nodes(committee)?;
660        let ((votes_hash, votes_round), votes) = communicate_with_quorum(
661            &nodes,
662            committee,
663            |vote: &LiteVote| (vote.value.value_hash, vote.round),
664            |remote_node| {
665                let mut updater = ValidatorUpdater {
666                    remote_node,
667                    local_node: self.local_node.clone(),
668                    admin_id: self.admin_id,
669                };
670                let action = action.clone();
671                Box::pin(async move { updater.send_chain_update(action).await })
672            },
673            self.options.grace_period,
674        )
675        .await?;
676        ensure!(
677            (votes_hash, votes_round) == (value.hash(), action.round()),
678            ChainClientError::UnexpectedQuorum {
679                hash: votes_hash,
680                round: votes_round,
681                expected_hash: value.hash(),
682                expected_round: action.round(),
683            }
684        );
685        // Certificate is valid because
686        // * `communicate_with_quorum` ensured a sufficient "weight" of
687        // (non-error) answers were returned by validators.
688        // * each answer is a vote signed by the expected validator.
689        let certificate = LiteCertificate::try_from_votes(votes)
690            .ok_or_else(|| {
691                ChainClientError::InternalError("Vote values or rounds don't match; this is a bug")
692            })?
693            .with_value(value)
694            .ok_or_else(|| {
695                ChainClientError::ProtocolError("A quorum voted for an unexpected value")
696            })?;
697        Ok(certificate)
698    }
699
700    /// Processes the confirmed block certificate and its ancestors in the local node, then
701    /// updates the validators up to that certificate.
702    #[instrument(level = "trace", skip_all)]
703    async fn receive_certificate_and_update_validators(
704        &self,
705        certificate: ConfirmedBlockCertificate,
706        mode: ReceiveCertificateMode,
707    ) -> Result<(), ChainClientError> {
708        let block_chain_id = certificate.block().header.chain_id;
709        let block_height = certificate.block().header.height;
710
711        self.receive_certificate(certificate, mode).await?;
712
713        // Make sure a quorum of validators (according to the chain's new committee) are up-to-date
714        // for data availability.
715        let local_committee = self
716            .chain_info_with_committees(block_chain_id)
717            .await?
718            .into_current_committee()?;
719        self.communicate_chain_updates(
720            &local_committee,
721            block_chain_id,
722            block_height.try_add_one()?,
723            CrossChainMessageDelivery::Blocking,
724        )
725        .await?;
726        Ok(())
727    }
728
729    /// Processes the confirmed block certificate in the local node. Also downloads and processes
730    /// all ancestors that are still missing.
731    #[instrument(level = "trace", skip_all)]
732    async fn receive_certificate(
733        &self,
734        certificate: ConfirmedBlockCertificate,
735        mode: ReceiveCertificateMode,
736    ) -> Result<(), ChainClientError> {
737        let certificate = Box::new(certificate);
738        let block = certificate.block();
739
740        // Verify the certificate before doing any expensive networking.
741        let (max_epoch, committees) = self.admin_committees().await?;
742        if let ReceiveCertificateMode::NeedsCheck = mode {
743            Self::check_certificate(max_epoch, &committees, &certificate)?.into_result()?;
744        }
745        // Recover history from the network.
746        self.download_certificates(block.header.chain_id, block.header.height)
747            .await?;
748        // Process the received operations. Download required hashed certificate values if
749        // necessary.
750        if let Err(err) = self.process_certificate(certificate.clone()).await {
751            match &err {
752                LocalNodeError::BlobsNotFound(blob_ids) => {
753                    let blobs = RemoteNode::download_blobs(
754                        blob_ids,
755                        &self.validator_nodes().await?,
756                        self.options.blob_download_timeout,
757                    )
758                    .await
759                    .ok_or(err)?;
760                    self.local_node.store_blobs(&blobs).await?;
761                    self.process_certificate(certificate).await?;
762                }
763                _ => {
764                    // The certificate is not as expected. Give up.
765                    warn!("Failed to process network hashed certificate value");
766                    return Err(err.into());
767                }
768            }
769        }
770
771        Ok(())
772    }
773
774    /// Processes the confirmed block in the local node without executing it.
775    #[instrument(level = "trace", skip_all)]
776    #[allow(dead_code)] // Otherwise CI fails when built for docker.
777    async fn receive_sender_certificate(
778        &self,
779        certificate: ConfirmedBlockCertificate,
780        mode: ReceiveCertificateMode,
781        nodes: Option<Vec<RemoteNode<Env::ValidatorNode>>>,
782    ) -> Result<(), ChainClientError> {
783        let certificate = Box::new(certificate);
784
785        // Verify the certificate before doing any expensive networking.
786        let (max_epoch, committees) = self.admin_committees().await?;
787        if let ReceiveCertificateMode::NeedsCheck = mode {
788            Self::check_certificate(max_epoch, &committees, &certificate)?.into_result()?;
789        }
790        // Recover history from the network.
791        let nodes = if let Some(nodes) = nodes {
792            nodes
793        } else {
794            self.validator_nodes().await?
795        };
796        if let Err(err) = self.handle_certificate(certificate.clone()).await {
797            match &err {
798                LocalNodeError::BlobsNotFound(blob_ids) => {
799                    let blobs = RemoteNode::download_blobs(
800                        blob_ids,
801                        &nodes,
802                        self.options.blob_download_timeout,
803                    )
804                    .await
805                    .ok_or(err)?;
806                    self.local_node.store_blobs(&blobs).await?;
807                    self.handle_certificate(certificate.clone()).await?;
808                }
809                _ => {
810                    // The certificate is not as expected. Give up.
811                    warn!("Failed to process network hashed certificate value");
812                    return Err(err.into());
813                }
814            }
815        }
816
817        Ok(())
818    }
819
820    /// Downloads and preprocesses all confirmed block certificates that sent any message to this
821    /// chain.
822    #[instrument(level = "trace", skip(self))]
823    async fn synchronize_received_certificates_from_validator(
824        &self,
825        chain_id: ChainId,
826        remote_node: &RemoteNode<Env::ValidatorNode>,
827    ) -> Result<ReceivedCertificatesFromValidator, ChainClientError> {
828        let mut tracker = self
829            .local_node
830            .chain_state_view(chain_id)
831            .await?
832            .received_certificate_trackers
833            .get()
834            .get(&remote_node.public_key)
835            .copied()
836            .unwrap_or(0);
837        let (max_epoch, committees) = self.admin_committees().await?;
838
839        // Retrieve the list of newly received certificates from this validator.
840        let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_n(tracker);
841        let info = remote_node.handle_chain_info_query(query).await?;
842        let remote_log = info.requested_received_log;
843        let remote_heights = Self::heights_per_chain(&remote_log);
844
845        // Obtain the next block height we need in the local node, for each chain.
846        let local_next_heights = self
847            .local_node
848            .next_outbox_heights(remote_heights.keys(), chain_id)
849            .await?;
850
851        // We keep track of the height we've successfully downloaded and checked, per chain.
852        let mut downloaded_heights = BTreeMap::new();
853        // And we make a list of chains we already fully have locally. We need to make sure to
854        // put all their sent messages into the inbox.
855        let mut other_sender_chains = Vec::new();
856
857        let certificates = future::try_join_all(remote_heights.into_iter().filter_map(
858            |(sender_chain_id, remote_heights)| {
859                let local_next = *local_next_heights.get(&sender_chain_id)?;
860                if let Ok(height) = local_next.try_sub_one() {
861                    downloaded_heights.insert(sender_chain_id, height);
862                }
863                let remote_heights = remote_heights
864                    .into_iter()
865                    .filter(|h| *h >= local_next)
866                    .collect::<Vec<_>>();
867                if remote_heights.is_empty() {
868                    // Our highest, locally executed block is higher than any block height
869                    // from the current batch. Skip this batch, but remember to wait for
870                    // the messages to be delivered to the inboxes.
871                    other_sender_chains.push(sender_chain_id);
872                    return None;
873                };
874                Some(async move {
875                    let certificates = remote_node
876                        .download_certificates_by_heights(sender_chain_id, remote_heights)
877                        .await?;
878                    Ok::<Vec<_>, ChainClientError>(certificates)
879                })
880            },
881        ))
882        .await?
883        .into_iter()
884        .flatten()
885        .collect::<Vec<_>>();
886
887        let mut certificates_by_height_by_chain = BTreeMap::new();
888
889        // Check the signatures and keep only the ones that are valid.
890        for confirmed_block_certificate in certificates {
891            let block_header = &confirmed_block_certificate.inner().block().header;
892            let sender_chain_id = block_header.chain_id;
893            let height = block_header.height;
894            let epoch = block_header.epoch;
895            match Self::check_certificate(max_epoch, &committees, &confirmed_block_certificate)? {
896                CheckCertificateResult::FutureEpoch => {
897                    warn!(
898                        "Postponing received certificate from {sender_chain_id:.8} at height \
899                         {height} from future epoch {epoch}"
900                    );
901                    // Do not process this certificate now. It can still be
902                    // downloaded later, once our committee is updated.
903                }
904                CheckCertificateResult::OldEpoch => {
905                    // This epoch is not recognized any more. Let's skip the certificate.
906                    // If a higher block with a recognized epoch comes up later from the
907                    // same chain, the call to `receive_certificate` below will download
908                    // the skipped certificate again.
909                    warn!("Skipping received certificate from past epoch {epoch:?}");
910                }
911                CheckCertificateResult::New => {
912                    certificates_by_height_by_chain
913                        .entry(sender_chain_id)
914                        .or_insert_with(BTreeMap::new)
915                        .insert(height, confirmed_block_certificate);
916                }
917            }
918        }
919
920        // Increase the tracker up to the first position we haven't downloaded.
921        for entry in remote_log {
922            if certificates_by_height_by_chain
923                .get(&entry.chain_id)
924                .is_some_and(|certs| certs.contains_key(&entry.height))
925            {
926                tracker += 1;
927            } else {
928                break;
929            }
930        }
931
932        for (sender_chain_id, certs) in &mut certificates_by_height_by_chain {
933            if certs
934                .values()
935                .any(|cert| !cert.block().recipients().contains(&chain_id))
936            {
937                warn!(
938                    "Skipping received certificates from chain {sender_chain_id:.8}:
939                    No messages for {chain_id:.8}."
940                );
941                certs.clear();
942            }
943        }
944
945        Ok(ReceivedCertificatesFromValidator {
946            public_key: remote_node.public_key,
947            tracker,
948            certificates: certificates_by_height_by_chain
949                .into_values()
950                .flat_map(BTreeMap::into_values)
951                .collect(),
952            other_sender_chains,
953        })
954    }
955
956    #[instrument(
957        level = "trace", skip_all,
958        fields(certificate_hash = ?incoming_certificate.hash()),
959    )]
960    fn check_certificate(
961        highest_known_epoch: Epoch,
962        committees: &BTreeMap<Epoch, Committee>,
963        incoming_certificate: &ConfirmedBlockCertificate,
964    ) -> Result<CheckCertificateResult, NodeError> {
965        let block = incoming_certificate.block();
966        // Check that certificates are valid w.r.t one of our trusted committees.
967        if block.header.epoch > highest_known_epoch {
968            return Ok(CheckCertificateResult::FutureEpoch);
969        }
970        if let Some(known_committee) = committees.get(&block.header.epoch) {
971            // This epoch is recognized by our chain. Let's verify the
972            // certificate.
973            incoming_certificate.check(known_committee)?;
974            Ok(CheckCertificateResult::New)
975        } else {
976            // We don't accept a certificate from a committee that was retired.
977            Ok(CheckCertificateResult::OldEpoch)
978        }
979    }
980
981    /// Given a set of chain ID-block height pairs, returns a map that assigns to each chain ID
982    /// the set of heights. The returned map contains no empty values.
983    fn heights_per_chain(
984        remote_log: &[ChainAndHeight],
985    ) -> BTreeMap<ChainId, BTreeSet<BlockHeight>> {
986        remote_log.iter().fold(
987            BTreeMap::<ChainId, BTreeSet<_>>::new(),
988            |mut chain_to_info, entry| {
989                chain_to_info
990                    .entry(entry.chain_id)
991                    .or_default()
992                    .insert(entry.height);
993                chain_to_info
994            },
995        )
996    }
997
998    /// Downloads and processes any certificates we are missing for the given chain.
999    #[instrument(level = "trace", skip_all)]
1000    async fn synchronize_chain_state(
1001        &self,
1002        chain_id: ChainId,
1003    ) -> Result<Box<ChainInfo>, ChainClientError> {
1004        let (_, committee) = self.admin_committee().await?;
1005        self.synchronize_chain_state_from_committee(chain_id, committee)
1006            .await
1007    }
1008
1009    /// Downloads and processes any certificates we are missing for the given chain, from the given
1010    /// committee.
1011    #[instrument(level = "trace", skip_all)]
1012    pub async fn synchronize_chain_state_from_committee(
1013        &self,
1014        chain_id: ChainId,
1015        committee: Committee,
1016    ) -> Result<Box<ChainInfo>, ChainClientError> {
1017        #[cfg(with_metrics)]
1018        let _latency = metrics::SYNCHRONIZE_CHAIN_STATE_LATENCY.measure_latency();
1019
1020        let validators = self.make_nodes(&committee)?;
1021        Box::pin(self.fetch_chain_info(chain_id, &validators)).await?;
1022        communicate_with_quorum(
1023            &validators,
1024            &committee,
1025            |_: &()| (),
1026            |remote_node| async move {
1027                self.synchronize_chain_state_from(&remote_node, chain_id)
1028                    .await
1029            },
1030            self.options.grace_period,
1031        )
1032        .await?;
1033
1034        self.local_node
1035            .chain_info(chain_id)
1036            .await
1037            .map_err(Into::into)
1038    }
1039
1040    /// Downloads any certificates from the specified validator that we are missing for the given
1041    /// chain, and processes them.
1042    #[instrument(level = "trace", skip(self, remote_node, chain_id))]
1043    async fn synchronize_chain_state_from(
1044        &self,
1045        remote_node: &RemoteNode<Env::ValidatorNode>,
1046        chain_id: ChainId,
1047    ) -> Result<(), ChainClientError> {
1048        let mut local_info = self.local_node.chain_info(chain_id).await?;
1049        let query = ChainInfoQuery::new(chain_id).with_manager_values();
1050        let remote_info = remote_node.handle_chain_info_query(query).await?;
1051        if let Some(new_info) = self
1052            .download_certificates_from(remote_node, chain_id, remote_info.next_block_height)
1053            .await?
1054        {
1055            local_info = new_info;
1056        };
1057
1058        // If we are at the same height as the remote node, we also update our chain manager.
1059        if local_info.next_block_height != remote_info.next_block_height {
1060            debug!(
1061                "Synced from validator {}; but remote height is {} and local height is {}",
1062                remote_node.public_key, remote_info.next_block_height, local_info.next_block_height
1063            );
1064            return Ok(());
1065        };
1066
1067        if let Some(timeout) = remote_info.manager.timeout {
1068            self.handle_certificate(Box::new(*timeout)).await?;
1069        }
1070        let mut proposals = Vec::new();
1071        if let Some(proposal) = remote_info.manager.requested_signed_proposal {
1072            proposals.push(*proposal);
1073        }
1074        if let Some(proposal) = remote_info.manager.requested_proposed {
1075            proposals.push(*proposal);
1076        }
1077        if let Some(locking) = remote_info.manager.requested_locking {
1078            match *locking {
1079                LockingBlock::Fast(proposal) => {
1080                    proposals.push(proposal);
1081                }
1082                LockingBlock::Regular(cert) => {
1083                    let hash = cert.hash();
1084                    if let Err(err) = self.try_process_locking_block_from(remote_node, cert).await {
1085                        debug!(
1086                            "Skipping locked block {hash} from validator {} at height {}: {err}",
1087                            remote_node.public_key, local_info.next_block_height,
1088                        );
1089                    }
1090                }
1091            }
1092        }
1093        'proposal_loop: for proposal in proposals {
1094            let owner: AccountOwner = proposal.owner();
1095            if let Err(mut err) = self
1096                .local_node
1097                .handle_block_proposal(proposal.clone())
1098                .await
1099            {
1100                if let LocalNodeError::BlobsNotFound(_) = &err {
1101                    let required_blob_ids = proposal.required_blob_ids().collect::<Vec<_>>();
1102                    if !required_blob_ids.is_empty() {
1103                        let mut blobs = Vec::new();
1104                        for blob_id in required_blob_ids {
1105                            let blob_content = match remote_node
1106                                .node
1107                                .download_pending_blob(chain_id, blob_id)
1108                                .await
1109                            {
1110                                Ok(content) => content,
1111                                Err(err) => {
1112                                    warn!(
1113                                        "Skipping proposal from {owner} and validator {} at \
1114                                        height {}; failed to download {blob_id}: {err}",
1115                                        remote_node.public_key, local_info.next_block_height
1116                                    );
1117                                    continue 'proposal_loop;
1118                                }
1119                            };
1120                            blobs.push(Blob::new(blob_content));
1121                        }
1122                        self.local_node
1123                            .handle_pending_blobs(chain_id, blobs)
1124                            .await?;
1125                        // We found the missing blobs: retry.
1126                        if let Err(new_err) = self
1127                            .local_node
1128                            .handle_block_proposal(proposal.clone())
1129                            .await
1130                        {
1131                            err = new_err;
1132                        } else {
1133                            continue;
1134                        }
1135                    }
1136                    if let LocalNodeError::BlobsNotFound(blob_ids) = &err {
1137                        self.update_local_node_with_blobs_from(
1138                            blob_ids.clone(),
1139                            &[remote_node.clone()],
1140                        )
1141                        .await?;
1142                        // We found the missing blobs: retry.
1143                        if let Err(new_err) = self
1144                            .local_node
1145                            .handle_block_proposal(proposal.clone())
1146                            .await
1147                        {
1148                            err = new_err;
1149                        } else {
1150                            continue;
1151                        }
1152                    }
1153                }
1154
1155                debug!(
1156                    "Skipping proposal from {owner} and validator {} at height {}: {err}",
1157                    remote_node.public_key, local_info.next_block_height
1158                );
1159            }
1160        }
1161        Ok(())
1162    }
1163
1164    async fn try_process_locking_block_from(
1165        &self,
1166        remote_node: &RemoteNode<Env::ValidatorNode>,
1167        certificate: GenericCertificate<ValidatedBlock>,
1168    ) -> Result<(), ChainClientError> {
1169        let chain_id = certificate.inner().chain_id();
1170        let certificate = Box::new(certificate);
1171        match self.process_certificate(certificate.clone()).await {
1172            Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1173                let mut blobs = Vec::new();
1174                for blob_id in blob_ids {
1175                    let blob_content = remote_node
1176                        .node
1177                        .download_pending_blob(chain_id, blob_id)
1178                        .await?;
1179                    blobs.push(Blob::new(blob_content));
1180                }
1181                self.local_node
1182                    .handle_pending_blobs(chain_id, blobs)
1183                    .await?;
1184                self.process_certificate(certificate).await?;
1185                Ok(())
1186            }
1187            Err(err) => Err(err.into()),
1188            Ok(()) => Ok(()),
1189        }
1190    }
1191
1192    /// Downloads and processes from the specified validator a confirmed block certificates that
1193    /// use the given blobs. If this succeeds, the blob will be in our storage.
1194    async fn update_local_node_with_blobs_from(
1195        &self,
1196        blob_ids: Vec<BlobId>,
1197        remote_nodes: &[RemoteNode<Env::ValidatorNode>],
1198    ) -> Result<Vec<Blob>, ChainClientError> {
1199        let timeout = self.options.blob_download_timeout;
1200        future::try_join_all(blob_ids.into_iter().map(|blob_id| async move {
1201            let mut stream = remote_nodes
1202                .iter()
1203                .zip(0..)
1204                .map(|(remote_node, i)| async move {
1205                    linera_base::time::timer::sleep(timeout * i * i).await;
1206                    let certificate = remote_node.download_certificate_for_blob(blob_id).await?;
1207                    // This will download all ancestors of the certificate and process all of them locally.
1208                    self.receive_sender_certificate(
1209                        certificate,
1210                        ReceiveCertificateMode::NeedsCheck,
1211                        Some(vec![remote_node.clone()]),
1212                    )
1213                    .await?;
1214                    let blob = self
1215                        .local_node
1216                        .storage_client()
1217                        .read_blob(blob_id)
1218                        .await?
1219                        .ok_or_else(|| LocalNodeError::BlobsNotFound(vec![blob_id]))?;
1220                    Result::<_, ChainClientError>::Ok(blob)
1221                })
1222                .collect::<FuturesUnordered<_>>();
1223            while let Some(maybe_blob) = stream.next().await {
1224                if let Ok(blob) = maybe_blob {
1225                    return Ok(blob);
1226                }
1227            }
1228            Err(LocalNodeError::BlobsNotFound(vec![blob_id]).into())
1229        }))
1230        .await
1231    }
1232
1233    /// Downloads and processes confirmed block certificates that use the given blobs.
1234    /// If this succeeds, the blobs will be in our storage.
1235    async fn receive_certificates_for_blobs(
1236        &self,
1237        blob_ids: Vec<BlobId>,
1238    ) -> Result<(), ChainClientError> {
1239        // Deduplicate IDs.
1240        let blob_ids = blob_ids.into_iter().collect::<BTreeSet<_>>();
1241        let validators = self.validator_nodes().await?;
1242
1243        let mut missing_blobs = Vec::new();
1244        for blob_id in blob_ids {
1245            let mut certificate_stream = validators
1246                .iter()
1247                .map(|remote_node| async move {
1248                    let cert = remote_node.download_certificate_for_blob(blob_id).await?;
1249                    Ok::<_, NodeError>((remote_node.clone(), cert))
1250                })
1251                .collect::<FuturesUnordered<_>>();
1252            loop {
1253                let Some(result) = certificate_stream.next().await else {
1254                    missing_blobs.push(blob_id);
1255                    break;
1256                };
1257                if let Ok((remote_node, cert)) = result {
1258                    if self
1259                        .receive_sender_certificate(
1260                            cert,
1261                            ReceiveCertificateMode::NeedsCheck,
1262                            Some(vec![remote_node]),
1263                        )
1264                        .await
1265                        .is_ok()
1266                    {
1267                        break;
1268                    }
1269                }
1270            }
1271        }
1272
1273        if missing_blobs.is_empty() {
1274            Ok(())
1275        } else {
1276            Err(NodeError::BlobsNotFound(missing_blobs).into())
1277        }
1278    }
1279
1280    /// Attempts to execute the block locally. If any incoming message execution fails, that
1281    /// message is rejected and execution is retried, until the block accepts only messages
1282    /// that succeed.
1283    // TODO(#2806): Measure how failing messages affect the execution times.
1284    #[tracing::instrument(level = "trace", skip(self, block))]
1285    async fn stage_block_execution_and_discard_failing_messages(
1286        &self,
1287        mut block: ProposedBlock,
1288        round: Option<u32>,
1289        published_blobs: Vec<Blob>,
1290    ) -> Result<(Block, ChainInfoResponse), ChainClientError> {
1291        loop {
1292            let result = self
1293                .stage_block_execution(block.clone(), round, published_blobs.clone())
1294                .await;
1295            if let Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
1296                WorkerError::ChainError(chain_error),
1297            ))) = &result
1298            {
1299                if let ChainError::ExecutionError(
1300                    error,
1301                    ChainExecutionContext::IncomingBundle(index),
1302                ) = &**chain_error
1303                {
1304                    let transaction = block
1305                        .transactions
1306                        .get_mut(*index as usize)
1307                        .expect("Transaction at given index should exist");
1308                    let Transaction::ReceiveMessages(message) = transaction else {
1309                        panic!(
1310                            "Expected incoming bundle at transaction index {}, found operation",
1311                            index
1312                        );
1313                    };
1314                    ensure!(
1315                        !message.bundle.is_protected(),
1316                        ChainClientError::BlockProposalError(
1317                            "Protected incoming message failed to execute locally"
1318                        )
1319                    );
1320                    // Reject the faulty message from the block and continue.
1321                    // TODO(#1420): This is potentially a bit heavy-handed for
1322                    // retryable errors.
1323                    info!(
1324                        %error, origin = ?message.origin,
1325                        "Message failed to execute locally and will be rejected."
1326                    );
1327                    message.action = MessageAction::Reject;
1328                    continue;
1329                }
1330            }
1331            return result;
1332        }
1333    }
1334
1335    /// Attempts to execute the block locally. If any attempt to read a blob fails, the blob is
1336    /// downloaded and execution is retried.
1337    #[instrument(level = "trace", skip(self, block))]
1338    async fn stage_block_execution(
1339        &self,
1340        block: ProposedBlock,
1341        round: Option<u32>,
1342        published_blobs: Vec<Blob>,
1343    ) -> Result<(Block, ChainInfoResponse), ChainClientError> {
1344        loop {
1345            let result = self
1346                .local_node
1347                .stage_block_execution(block.clone(), round, published_blobs.clone())
1348                .await;
1349            if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1350                self.receive_certificates_for_blobs(blob_ids.clone())
1351                    .await?;
1352                continue; // We found the missing blob: retry.
1353            }
1354            return Ok(result?);
1355        }
1356    }
1357}
1358
1359/// Policies for automatically handling incoming messages.
1360#[derive(Clone, Debug)]
1361pub struct MessagePolicy {
1362    /// The blanket policy applied to all messages.
1363    blanket: BlanketMessagePolicy,
1364    /// A collection of chains which restrict the origin of messages to be
1365    /// accepted. `Option::None` means that messages from all chains are accepted. An empty
1366    /// `HashSet` denotes that messages from no chains are accepted.
1367    restrict_chain_ids_to: Option<HashSet<ChainId>>,
1368}
1369
1370#[derive(Copy, Clone, Debug, clap::ValueEnum)]
1371pub enum BlanketMessagePolicy {
1372    /// Automatically accept all incoming messages. Reject them only if execution fails.
1373    Accept,
1374    /// Automatically reject tracked messages, ignore or skip untracked messages, but accept
1375    /// protected ones.
1376    Reject,
1377    /// Don't include any messages in blocks, and don't make any decision whether to accept or
1378    /// reject.
1379    Ignore,
1380}
1381
1382impl MessagePolicy {
1383    pub fn new(
1384        blanket: BlanketMessagePolicy,
1385        restrict_chain_ids_to: Option<HashSet<ChainId>>,
1386    ) -> Self {
1387        Self {
1388            blanket,
1389            restrict_chain_ids_to,
1390        }
1391    }
1392
1393    #[cfg(with_testing)]
1394    pub fn new_accept_all() -> Self {
1395        Self {
1396            blanket: BlanketMessagePolicy::Accept,
1397            restrict_chain_ids_to: None,
1398        }
1399    }
1400
1401    #[instrument(level = "trace", skip(self))]
1402    fn must_handle(&self, bundle: &mut IncomingBundle) -> bool {
1403        if self.is_reject() {
1404            if bundle.bundle.is_skippable() {
1405                return false;
1406            } else if !bundle.bundle.is_protected() {
1407                bundle.action = MessageAction::Reject;
1408            }
1409        }
1410        match &self.restrict_chain_ids_to {
1411            None => true,
1412            Some(chains) => chains.contains(&bundle.origin),
1413        }
1414    }
1415
1416    #[instrument(level = "trace", skip(self))]
1417    fn is_ignore(&self) -> bool {
1418        matches!(self.blanket, BlanketMessagePolicy::Ignore)
1419    }
1420
1421    #[instrument(level = "trace", skip(self))]
1422    fn is_reject(&self) -> bool {
1423        matches!(self.blanket, BlanketMessagePolicy::Reject)
1424    }
1425}
1426
1427#[derive(Debug, Clone, Copy)]
1428pub enum TimingType {
1429    ExecuteOperations,
1430    ExecuteBlock,
1431    SubmitBlockProposal,
1432    UpdateValidators,
1433}
1434
1435#[derive(Debug, Clone)]
1436pub struct ChainClientOptions {
1437    /// Maximum number of pending message bundles processed at a time in a block.
1438    pub max_pending_message_bundles: usize,
1439    /// The policy for automatically handling incoming messages.
1440    pub message_policy: MessagePolicy,
1441    /// Whether to block on cross-chain message delivery.
1442    pub cross_chain_message_delivery: CrossChainMessageDelivery,
1443    /// An additional delay, after reaching a quorum, to wait for additional validator signatures,
1444    /// as a fraction of time taken to reach quorum.
1445    pub grace_period: f64,
1446    /// The delay when downloading a blob, after which we try a second validator.
1447    pub blob_download_timeout: Duration,
1448}
1449
1450#[cfg(with_testing)]
1451impl ChainClientOptions {
1452    pub fn test_default() -> Self {
1453        use crate::DEFAULT_GRACE_PERIOD;
1454
1455        ChainClientOptions {
1456            max_pending_message_bundles: 10,
1457            message_policy: MessagePolicy::new_accept_all(),
1458            cross_chain_message_delivery: CrossChainMessageDelivery::NonBlocking,
1459            grace_period: DEFAULT_GRACE_PERIOD,
1460            blob_download_timeout: Duration::from_secs(1),
1461        }
1462    }
1463}
1464
1465/// Client to operate a chain by interacting with validators and the given local storage
1466/// implementation.
1467/// * The chain being operated is called the "local chain" or just the "chain".
1468/// * As a rule, operations are considered successful (and communication may stop) when
1469///   they succeeded in gathering a quorum of responses.
1470#[derive(Debug)]
1471pub struct ChainClient<Env: Environment> {
1472    /// The Linera [`Client`] that manages operations for this chain client.
1473    #[debug(skip)]
1474    client: Arc<Client<Env>>,
1475    /// The off-chain chain ID.
1476    chain_id: ChainId,
1477    /// The client options.
1478    #[debug(skip)]
1479    options: ChainClientOptions,
1480    /// The preferred owner of the chain used to sign proposals.
1481    /// `None` if we cannot propose on this chain.
1482    preferred_owner: Option<AccountOwner>,
1483    /// The next block height as read from the wallet.
1484    initial_next_block_height: BlockHeight,
1485    /// The last block hash as read from the wallet.
1486    initial_block_hash: Option<CryptoHash>,
1487    /// Optional timing sender for benchmarking.
1488    timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
1489}
1490
1491impl<Env: Environment> Clone for ChainClient<Env> {
1492    fn clone(&self) -> Self {
1493        Self {
1494            client: self.client.clone(),
1495            chain_id: self.chain_id,
1496            options: self.options.clone(),
1497            preferred_owner: self.preferred_owner,
1498            initial_next_block_height: self.initial_next_block_height,
1499            initial_block_hash: self.initial_block_hash,
1500            timing_sender: self.timing_sender.clone(),
1501        }
1502    }
1503}
1504
1505/// Error type for [`ChainClient`].
1506#[derive(Debug, Error)]
1507pub enum ChainClientError {
1508    #[error("Local node operation failed: {0}")]
1509    LocalNodeError(#[from] LocalNodeError),
1510
1511    #[error("Remote node operation failed: {0}")]
1512    RemoteNodeError(#[from] NodeError),
1513
1514    #[error(transparent)]
1515    ArithmeticError(#[from] ArithmeticError),
1516
1517    #[error("Missing certificates: {0:?}")]
1518    ReadCertificatesError(Vec<CryptoHash>),
1519
1520    #[error("Missing confirmed block: {0:?}")]
1521    MissingConfirmedBlock(CryptoHash),
1522
1523    #[error("JSON (de)serialization error: {0}")]
1524    JsonError(#[from] serde_json::Error),
1525
1526    #[error("Chain operation failed: {0}")]
1527    ChainError(#[from] ChainError),
1528
1529    #[error(transparent)]
1530    CommunicationError(#[from] CommunicationError<NodeError>),
1531
1532    #[error("Internal error within chain client: {0}")]
1533    InternalError(&'static str),
1534
1535    #[error(
1536        "Cannot accept a certificate from an unknown committee in the future. \
1537         Please synchronize the local view of the admin chain"
1538    )]
1539    CommitteeSynchronizationError,
1540
1541    #[error("The local node is behind the trusted state in wallet and needs synchronization with validators")]
1542    WalletSynchronizationError,
1543
1544    #[error("The state of the client is incompatible with the proposed block: {0}")]
1545    BlockProposalError(&'static str),
1546
1547    #[error(
1548        "Cannot accept a certificate from a committee that was retired. \
1549         Try a newer certificate from the same origin"
1550    )]
1551    CommitteeDeprecationError,
1552
1553    #[error("Protocol error within chain client: {0}")]
1554    ProtocolError(&'static str),
1555
1556    #[error("Signer doesn't have key to sign for chain {0}")]
1557    CannotFindKeyForChain(ChainId),
1558
1559    #[error("client is not configured to propose on chain {0}")]
1560    NoAccountKeyConfigured(ChainId),
1561
1562    #[error("The chain client isn't owner on chain {0}")]
1563    NotAnOwner(ChainId),
1564
1565    #[error(transparent)]
1566    ViewError(#[from] ViewError),
1567
1568    #[error(
1569        "Failed to download certificates and update local node to the next height \
1570         {target_next_block_height} of chain {chain_id:?}"
1571    )]
1572    CannotDownloadCertificates {
1573        chain_id: ChainId,
1574        target_next_block_height: BlockHeight,
1575    },
1576
1577    #[error(transparent)]
1578    BcsError(#[from] bcs::Error),
1579
1580    #[error(
1581        "Unexpected quorum: validators voted for block {hash} in {round}, \
1582         expected block {expected_hash} in {expected_round}"
1583    )]
1584    UnexpectedQuorum {
1585        hash: CryptoHash,
1586        round: Round,
1587        expected_hash: CryptoHash,
1588        expected_round: Round,
1589    },
1590
1591    #[error("signer error: {0:?}")]
1592    Signer(#[source] Box<dyn signer::Error>),
1593
1594    #[error("Cannot revoke the current epoch {0}")]
1595    CannotRevokeCurrentEpoch(Epoch),
1596
1597    #[error("Epoch is already revoked")]
1598    EpochAlreadyRevoked,
1599}
1600
1601impl From<Infallible> for ChainClientError {
1602    fn from(infallible: Infallible) -> Self {
1603        match infallible {}
1604    }
1605}
1606
1607impl ChainClientError {
1608    pub fn signer_failure(err: impl signer::Error + 'static) -> Self {
1609        Self::Signer(Box::new(err))
1610    }
1611}
1612
1613impl<Env: Environment> ChainClient<Env> {
1614    /// Gets the client mutex from the chain's state.
1615    #[instrument(level = "trace", skip(self))]
1616    fn client_mutex(&self) -> Arc<tokio::sync::Mutex<()>> {
1617        self.client
1618            .chains
1619            .pin()
1620            .get(&self.chain_id)
1621            .expect("Chain client constructed for invalid chain")
1622            .client_mutex()
1623    }
1624
1625    /// Gets the next pending block.
1626    #[instrument(level = "trace", skip(self))]
1627    pub fn pending_proposal(&self) -> Option<PendingProposal> {
1628        self.client
1629            .chains
1630            .pin()
1631            .get(&self.chain_id)
1632            .expect("Chain client constructed for invalid chain")
1633            .pending_proposal()
1634            .clone()
1635    }
1636
1637    /// Updates the chain's state using a closure.
1638    #[instrument(level = "trace", skip(self, f))]
1639    fn update_state<F>(&self, f: F)
1640    where
1641        F: Fn(&mut ChainClientState),
1642    {
1643        let chains = self.client.chains.pin();
1644        chains
1645            .update(self.chain_id, |state| {
1646                let mut state = state.clone_for_update_unchecked();
1647                f(&mut state);
1648                state
1649            })
1650            .expect("Chain client constructed for invalid chain");
1651    }
1652
1653    /// Gets a reference to the client's signer instance.
1654    #[instrument(level = "trace", skip(self))]
1655    pub fn signer(&self) -> &impl Signer {
1656        self.client.signer()
1657    }
1658
1659    /// Gets a mutable reference to the per-`ChainClient` options.
1660    #[instrument(level = "trace", skip(self))]
1661    pub fn options_mut(&mut self) -> &mut ChainClientOptions {
1662        &mut self.options
1663    }
1664
1665    /// Gets a reference to the per-`ChainClient` options.
1666    #[instrument(level = "trace", skip(self))]
1667    pub fn options(&self) -> &ChainClientOptions {
1668        &self.options
1669    }
1670
1671    /// Gets the ID of the associated chain.
1672    #[instrument(level = "trace", skip(self))]
1673    pub fn chain_id(&self) -> ChainId {
1674        self.chain_id
1675    }
1676
1677    /// Gets a clone of the timing sender for benchmarking.
1678    pub fn timing_sender(&self) -> Option<mpsc::UnboundedSender<(u64, TimingType)>> {
1679        self.timing_sender.clone()
1680    }
1681
1682    /// Gets the ID of the admin chain.
1683    #[instrument(level = "trace", skip(self))]
1684    pub fn admin_id(&self) -> ChainId {
1685        self.client.admin_id
1686    }
1687
1688    /// Gets the currently preferred owner for signing the blocks.
1689    #[instrument(level = "trace", skip(self))]
1690    pub fn preferred_owner(&self) -> Option<AccountOwner> {
1691        self.preferred_owner
1692    }
1693
1694    /// Sets the new, preferred owner for signing the blocks.
1695    #[instrument(level = "trace", skip(self))]
1696    pub fn set_preferred_owner(&mut self, preferred_owner: AccountOwner) {
1697        self.preferred_owner = Some(preferred_owner);
1698    }
1699
1700    /// Unsets the preferred owner for signing the blocks.
1701    #[instrument(level = "trace", skip(self))]
1702    pub fn unset_preferred_owner(&mut self) {
1703        self.preferred_owner = None;
1704    }
1705
1706    /// Obtains a `ChainStateView` for this client's chain.
1707    #[instrument(level = "trace")]
1708    pub async fn chain_state_view(
1709        &self,
1710    ) -> Result<OwnedRwLockReadGuard<ChainStateView<Env::StorageContext>>, LocalNodeError> {
1711        self.client.local_node.chain_state_view(self.chain_id).await
1712    }
1713
1714    /// Returns chain IDs that this chain subscribes to.
1715    #[instrument(level = "trace", skip(self))]
1716    pub async fn event_stream_publishers(
1717        &self,
1718    ) -> Result<BTreeMap<ChainId, BTreeSet<StreamId>>, LocalNodeError> {
1719        let mut publishers = self
1720            .chain_state_view()
1721            .await?
1722            .execution_state
1723            .system
1724            .event_subscriptions
1725            .indices()
1726            .await?
1727            .into_iter()
1728            .fold(
1729                BTreeMap::<ChainId, BTreeSet<StreamId>>::new(),
1730                |mut map, (chain_id, stream_id)| {
1731                    map.entry(chain_id).or_default().insert(stream_id);
1732                    map
1733                },
1734            );
1735        if self.chain_id != self.client.admin_id {
1736            publishers.insert(
1737                self.client.admin_id,
1738                vec![
1739                    StreamId::system(EPOCH_STREAM_NAME),
1740                    StreamId::system(REMOVED_EPOCH_STREAM_NAME),
1741                ]
1742                .into_iter()
1743                .collect(),
1744            );
1745        }
1746        Ok(publishers)
1747    }
1748
1749    /// Subscribes to notifications from this client's chain.
1750    #[instrument(level = "trace")]
1751    pub fn subscribe(&self) -> Result<NotificationStream, LocalNodeError> {
1752        self.subscribe_to(self.chain_id)
1753    }
1754
1755    /// Subscribes to notifications from the specified chain.
1756    #[instrument(level = "trace")]
1757    pub fn subscribe_to(&self, chain_id: ChainId) -> Result<NotificationStream, LocalNodeError> {
1758        Ok(Box::pin(UnboundedReceiverStream::new(
1759            self.client.notifier.subscribe(vec![chain_id]),
1760        )))
1761    }
1762
1763    /// Returns the storage client used by this client's local node.
1764    #[instrument(level = "trace")]
1765    pub fn storage_client(&self) -> &Env::Storage {
1766        self.client.storage_client()
1767    }
1768
1769    /// Obtains the basic `ChainInfo` data for the local chain.
1770    #[instrument(level = "trace")]
1771    pub async fn chain_info(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
1772        let query = ChainInfoQuery::new(self.chain_id);
1773        let response = self
1774            .client
1775            .local_node
1776            .handle_chain_info_query(query)
1777            .await?;
1778        self.client.update_from_info(&response.info);
1779        Ok(response.info)
1780    }
1781
1782    /// Obtains the basic `ChainInfo` data for the local chain, with chain manager values.
1783    #[instrument(level = "trace")]
1784    async fn chain_info_with_manager_values(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
1785        let query = ChainInfoQuery::new(self.chain_id).with_manager_values();
1786        let response = self
1787            .client
1788            .local_node
1789            .handle_chain_info_query(query)
1790            .await?;
1791        self.client.update_from_info(&response.info);
1792        Ok(response.info)
1793    }
1794
1795    /// Returns the chain's description. Fetches it from the validators if necessary.
1796    pub async fn get_chain_description(&self) -> Result<ChainDescription, ChainClientError> {
1797        self.client.get_chain_description(self.chain_id).await
1798    }
1799
1800    /// Obtains up to `self.options.max_pending_message_bundles` pending message bundles for the
1801    /// local chain.
1802    #[instrument(level = "trace")]
1803    async fn pending_message_bundles(&self) -> Result<Vec<IncomingBundle>, ChainClientError> {
1804        if self.options.message_policy.is_ignore() {
1805            // Ignore all messages.
1806            return Ok(Vec::new());
1807        }
1808
1809        let query = ChainInfoQuery::new(self.chain_id).with_pending_message_bundles();
1810        let info = self
1811            .client
1812            .local_node
1813            .handle_chain_info_query(query)
1814            .await?
1815            .info;
1816        {
1817            ensure!(
1818                self.has_other_owners(&info.manager.ownership)
1819                    || info.next_block_height >= self.initial_next_block_height,
1820                ChainClientError::WalletSynchronizationError
1821            );
1822        }
1823
1824        Ok(info
1825            .requested_pending_message_bundles
1826            .into_iter()
1827            .filter_map(|mut bundle| {
1828                self.options
1829                    .message_policy
1830                    .must_handle(&mut bundle)
1831                    .then_some(bundle)
1832            })
1833            .take(self.options.max_pending_message_bundles)
1834            .collect())
1835    }
1836
1837    /// Returns an `UpdateStreams` operation that updates this client's chain about new events
1838    /// in any of the streams its applications are subscribing to. Returns `None` if there are no
1839    /// new events.
1840    #[instrument(level = "trace")]
1841    async fn collect_stream_updates(&self) -> Result<Option<Operation>, ChainClientError> {
1842        // Load all our subscriptions.
1843        let subscription_map = self
1844            .chain_state_view()
1845            .await?
1846            .execution_state
1847            .system
1848            .event_subscriptions
1849            .index_values()
1850            .await?;
1851        // Collect the indices of all new events.
1852        let futures = subscription_map
1853            .into_iter()
1854            .map(|((chain_id, stream_id), subscriptions)| {
1855                let client = self.client.clone();
1856                async move {
1857                    let chain = client.local_node.chain_state_view(chain_id).await?;
1858                    if let Some(next_expected_index) = chain
1859                        .next_expected_events
1860                        .get(&stream_id)
1861                        .await?
1862                        .filter(|next_index| *next_index > subscriptions.next_index)
1863                    {
1864                        Ok(Some((chain_id, stream_id, next_expected_index)))
1865                    } else {
1866                        Ok::<_, ChainClientError>(None)
1867                    }
1868                }
1869            });
1870        let updates = future::try_join_all(futures)
1871            .await?
1872            .into_iter()
1873            .flatten()
1874            .collect::<Vec<_>>();
1875        if updates.is_empty() {
1876            return Ok(None);
1877        }
1878        Ok(Some(SystemOperation::UpdateStreams(updates).into()))
1879    }
1880
1881    #[instrument(level = "trace")]
1882    async fn chain_info_with_committees(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
1883        self.client.chain_info_with_committees(self.chain_id).await
1884    }
1885
1886    /// Obtains the current epoch of the local chain as well as its set of trusted committees.
1887    #[instrument(level = "trace")]
1888    async fn epoch_and_committees(
1889        &self,
1890    ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
1891        let info = self.chain_info_with_committees().await?;
1892        let epoch = info.epoch;
1893        let committees = info.into_committees()?;
1894        Ok((epoch, committees))
1895    }
1896
1897    /// Obtains the committee for the current epoch of the local chain.
1898    #[instrument(level = "trace")]
1899    pub async fn local_committee(&self) -> Result<Committee, ChainClientError> {
1900        let info = match self.chain_info_with_committees().await {
1901            Ok(info) => info,
1902            Err(LocalNodeError::BlobsNotFound(_)) => {
1903                self.synchronize_chain_state(self.chain_id).await?;
1904                self.chain_info_with_committees().await?
1905            }
1906            Err(err) => return Err(err.into()),
1907        };
1908        Ok(info.into_current_committee()?)
1909    }
1910
1911    /// Obtains the committee for the latest epoch on the admin chain.
1912    #[instrument(level = "trace")]
1913    pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
1914        self.client.admin_committee().await
1915    }
1916
1917    /// Obtains the identity of the current owner of the chain.
1918    ///
1919    /// Returns an error if we don't have the private key for the identity.
1920    #[instrument(level = "trace")]
1921    pub async fn identity(&self) -> Result<AccountOwner, ChainClientError> {
1922        let Some(preferred_owner) = self.preferred_owner else {
1923            return Err(ChainClientError::NoAccountKeyConfigured(self.chain_id));
1924        };
1925        let manager = self.chain_info().await?.manager;
1926        ensure!(
1927            manager.ownership.is_active(),
1928            LocalNodeError::InactiveChain(self.chain_id)
1929        );
1930
1931        let is_owner = manager
1932            .ownership
1933            .all_owners()
1934            .chain(&manager.leader)
1935            .any(|owner| *owner == preferred_owner);
1936
1937        if !is_owner {
1938            let accepted_owners = manager
1939                .ownership
1940                .all_owners()
1941                .chain(&manager.leader)
1942                .collect::<Vec<_>>();
1943            warn!(%self.chain_id, ?accepted_owners, ?preferred_owner,
1944                "Chain has multiple owners configured but none is preferred owner",
1945            );
1946            return Err(ChainClientError::NotAnOwner(self.chain_id));
1947        }
1948
1949        let has_signer = self
1950            .signer()
1951            .contains_key(&preferred_owner)
1952            .await
1953            .map_err(ChainClientError::signer_failure)?;
1954
1955        if !has_signer {
1956            warn!(%self.chain_id, ?preferred_owner,
1957                "Chain is one of the owners but its Signer instance doesn't contain the key",
1958            );
1959            return Err(ChainClientError::CannotFindKeyForChain(self.chain_id));
1960        }
1961
1962        Ok(preferred_owner)
1963    }
1964
1965    /// Prepares the chain for the next operation, i.e. makes sure we have synchronized it up to
1966    /// its current height and are not missing any received messages from the inbox.
1967    #[instrument(level = "trace")]
1968    pub async fn prepare_chain(&self) -> Result<Box<ChainInfo>, ChainClientError> {
1969        #[cfg(with_metrics)]
1970        let _latency = metrics::PREPARE_CHAIN_LATENCY.measure_latency();
1971
1972        let mut info = self.synchronize_to_known_height().await?;
1973
1974        if self.has_other_owners(&info.manager.ownership) {
1975            // For chains with any owner other than ourselves, we could be missing recent
1976            // certificates created by other owners. Further synchronize blocks from the network.
1977            // This is a best-effort that depends on network conditions.
1978            info = self.client.synchronize_chain_state(self.chain_id).await?;
1979        }
1980
1981        if info.epoch > self.client.admin_committees().await?.0 {
1982            self.client
1983                .synchronize_chain_state(self.client.admin_id)
1984                .await?;
1985        }
1986
1987        let result = self
1988            .chain_state_view()
1989            .await?
1990            .validate_incoming_bundles()
1991            .await;
1992        if matches!(result, Err(ChainError::MissingCrossChainUpdate { .. })) {
1993            self.find_received_certificates().await?;
1994        }
1995        self.client.update_from_info(&info);
1996        Ok(info)
1997    }
1998
1999    // Verifies that our local storage contains enough history compared to the
2000    // known block height. Otherwise, downloads the missing history from the
2001    // network.
2002    // The known height only differs if the wallet is ahead of storage.
2003    async fn synchronize_to_known_height(&self) -> Result<Box<ChainInfo>, ChainClientError> {
2004        let info = self
2005            .client
2006            .download_certificates(self.chain_id, self.initial_next_block_height)
2007            .await?;
2008        if info.next_block_height == self.initial_next_block_height {
2009            // Check that our local node has the expected block hash.
2010            ensure!(
2011                self.initial_block_hash == info.block_hash,
2012                ChainClientError::InternalError("Invalid chain of blocks in local node")
2013            );
2014        }
2015        Ok(info)
2016    }
2017
2018    /// Submits a fast block proposal to the validators.
2019    ///
2020    /// This must only be used with valid epoch and super owner.
2021    #[instrument(level = "trace", skip(committee, operations))]
2022    pub async fn submit_fast_block_proposal(
2023        &self,
2024        committee: &Committee,
2025        operations: &[Operation],
2026        incoming_bundles: &[IncomingBundle],
2027        super_owner: AccountOwner,
2028    ) -> Result<(u64, u64, u64, u64), ChainClientError> {
2029        let creating_proposal_start = Instant::now();
2030        let info = self.chain_info().await?;
2031        let timestamp = self.next_timestamp(incoming_bundles, info.timestamp);
2032        let transactions = incoming_bundles
2033            .iter()
2034            .map(|bundle| Transaction::ReceiveMessages(bundle.clone()))
2035            .chain(
2036                operations
2037                    .iter()
2038                    .map(|operation| Transaction::ExecuteOperation(operation.clone())),
2039            )
2040            .collect::<Vec<_>>();
2041        let proposed_block = ProposedBlock {
2042            epoch: info.epoch,
2043            chain_id: self.chain_id,
2044            transactions,
2045            previous_block_hash: info.block_hash,
2046            height: info.next_block_height,
2047            authenticated_signer: Some(super_owner),
2048            timestamp,
2049        };
2050        let proposal = Box::new(
2051            BlockProposal::new_initial(
2052                super_owner,
2053                Round::Fast,
2054                proposed_block.clone(),
2055                self.signer(),
2056            )
2057            .await
2058            .map_err(ChainClientError::signer_failure)?,
2059        );
2060        let creating_proposal_ms = creating_proposal_start.elapsed().as_millis() as u64;
2061        let stage_block_execution_start = Instant::now();
2062        let block = self
2063            .client
2064            .local_node
2065            .stage_block_execution(proposed_block, None, Vec::new())
2066            .await?
2067            .0;
2068        let stage_block_execution_ms = stage_block_execution_start.elapsed().as_millis() as u64;
2069        let creating_confirmed_block_start = Instant::now();
2070        let value = ConfirmedBlock::new(block);
2071        let creating_confirmed_block_ms =
2072            creating_confirmed_block_start.elapsed().as_millis() as u64;
2073        let submitting_block_proposal_start = Instant::now();
2074        self.client
2075            .submit_block_proposal(committee, proposal, value)
2076            .await?;
2077        let submitting_block_proposal_ms =
2078            submitting_block_proposal_start.elapsed().as_millis() as u64;
2079        Ok((
2080            creating_proposal_ms,
2081            stage_block_execution_ms,
2082            creating_confirmed_block_ms,
2083            submitting_block_proposal_ms,
2084        ))
2085    }
2086
2087    /// Attempts to update all validators about the local chain.
2088    #[instrument(level = "trace", skip(old_committee))]
2089    pub async fn update_validators(
2090        &self,
2091        old_committee: Option<&Committee>,
2092    ) -> Result<(), ChainClientError> {
2093        let update_validators_start = linera_base::time::Instant::now();
2094        // Communicate the new certificate now.
2095        if let Some(old_committee) = old_committee {
2096            self.communicate_chain_updates(old_committee).await?
2097        };
2098        if let Ok(new_committee) = self.local_committee().await {
2099            if Some(&new_committee) != old_committee {
2100                // If the configuration just changed, communicate to the new committee as well.
2101                // (This is actually more important that updating the previous committee.)
2102                self.communicate_chain_updates(&new_committee).await?;
2103            }
2104        }
2105        self.send_timing(update_validators_start, TimingType::UpdateValidators);
2106        Ok(())
2107    }
2108
2109    /// Broadcasts certified blocks to validators.
2110    #[instrument(level = "trace", skip(committee))]
2111    pub async fn communicate_chain_updates(
2112        &self,
2113        committee: &Committee,
2114    ) -> Result<(), ChainClientError> {
2115        let delivery = self.options.cross_chain_message_delivery;
2116        let height = self.chain_info().await?.next_block_height;
2117        self.client
2118            .communicate_chain_updates(committee, self.chain_id, height, delivery)
2119            .await
2120    }
2121
2122    /// Processes the results of [`synchronize_received_certificates_from_validator`] and updates
2123    /// the trackers for the validators.
2124    #[tracing::instrument(level = "trace", skip(received_certificates_batches))]
2125    async fn receive_certificates_from_validators(
2126        &self,
2127        received_certificates_batches: Vec<ReceivedCertificatesFromValidator>,
2128    ) {
2129        let validator_count = received_certificates_batches.len();
2130        let mut other_sender_chains = BTreeSet::new();
2131        let mut certificates =
2132            BTreeMap::<ChainId, BTreeMap<BlockHeight, ConfirmedBlockCertificate>>::new();
2133        let mut new_trackers = BTreeMap::new();
2134        for response in received_certificates_batches {
2135            other_sender_chains.extend(response.other_sender_chains);
2136            new_trackers.insert(response.public_key, response.tracker);
2137            for certificate in response.certificates {
2138                certificates
2139                    .entry(certificate.block().header.chain_id)
2140                    .or_default()
2141                    .insert(certificate.block().header.height, certificate);
2142            }
2143        }
2144        let certificate_count = certificates.values().map(BTreeMap::len).sum::<usize>();
2145
2146        tracing::info!(
2147            "Received {certificate_count} certificates from {validator_count} validator(s)."
2148        );
2149
2150        // Process the certificates sorted by chain and in ascending order of block height.
2151        let stream = FuturesUnordered::from_iter(certificates.into_values().map(|certificates| {
2152            let client = self.client.clone();
2153            async move {
2154                for certificate in certificates.into_values() {
2155                    let hash = certificate.hash();
2156                    let mode = ReceiveCertificateMode::AlreadyChecked;
2157                    if let Err(err) = client
2158                        .receive_sender_certificate(certificate, mode, None)
2159                        .await
2160                    {
2161                        error!("Received invalid certificate {hash}: {err}");
2162                    }
2163                }
2164            }
2165        }));
2166        stream.for_each(future::ready).await;
2167
2168        // Certificates for these chains were omitted from `certificates` because they were
2169        // already processed locally. If they were processed in a concurrent task, it is not
2170        // guaranteed that their cross-chain messages were already handled.
2171        let stream = FuturesUnordered::from_iter(other_sender_chains.into_iter().map(|chain_id| {
2172            let local_node = self.client.local_node.clone();
2173            async move {
2174                if let Err(error) = local_node
2175                    .retry_pending_cross_chain_requests(chain_id)
2176                    .await
2177                {
2178                    error!("Failed to retry outgoing messages from {chain_id}: {error}");
2179                }
2180            }
2181        }));
2182        stream.for_each(future::ready).await;
2183
2184        // Update the trackers.
2185        if let Err(error) = self
2186            .client
2187            .local_node
2188            .update_received_certificate_trackers(self.chain_id, new_trackers)
2189            .await
2190        {
2191            error!(
2192                "Failed to update the certificate trackers for chain {:.8}: {error}",
2193                self.chain_id
2194            );
2195        }
2196    }
2197
2198    /// Synchronizes all chains that any application on this chain subscribes to.
2199    /// We always consider the admin chain a relevant publishing chain, for new epochs.
2200    async fn synchronize_publisher_chains(&self) -> Result<(), ChainClientError> {
2201        let chain_ids = self
2202            .chain_state_view()
2203            .await?
2204            .execution_state
2205            .system
2206            .event_subscriptions
2207            .indices()
2208            .await?
2209            .iter()
2210            .map(|(chain_id, _)| *chain_id)
2211            .chain(iter::once(self.client.admin_id))
2212            .filter(|chain_id| *chain_id != self.chain_id)
2213            .collect::<BTreeSet<_>>();
2214        future::try_join_all(
2215            chain_ids
2216                .into_iter()
2217                .map(|chain_id| self.client.synchronize_chain_state(chain_id)),
2218        )
2219        .await?;
2220        Ok(())
2221    }
2222
2223    /// Attempts to download new received certificates.
2224    ///
2225    /// This is a best effort: it will only find certificates that have been confirmed
2226    /// amongst sufficiently many validators of the current committee of the target
2227    /// chain.
2228    ///
2229    /// However, this should be the case whenever a sender's chain is still in use and
2230    /// is regularly upgraded to new committees.
2231    #[instrument(level = "trace")]
2232    async fn find_received_certificates(&self) -> Result<(), ChainClientError> {
2233        #[cfg(with_metrics)]
2234        let _latency = metrics::FIND_RECEIVED_CERTIFICATES_LATENCY.measure_latency();
2235
2236        // Use network information from the local chain.
2237        let chain_id = self.chain_id;
2238        let (_, committee) = self.admin_committee().await?;
2239        let nodes = self.client.make_nodes(&committee)?;
2240        // Proceed to downloading received certificates.
2241        let result = communicate_with_quorum(
2242            &nodes,
2243            &committee,
2244            |_| (),
2245            |remote_node| {
2246                let client = &self.client;
2247                Box::pin(async move {
2248                    client
2249                        .synchronize_received_certificates_from_validator(chain_id, &remote_node)
2250                        .await
2251                })
2252            },
2253            self.options.grace_period,
2254        )
2255        .await;
2256        let received_certificate_batches = match result {
2257            Ok(((), received_certificate_batches)) => received_certificate_batches
2258                .into_iter()
2259                .map(|(_, batch)| batch)
2260                .collect(),
2261            Err(CommunicationError::Trusted(NodeError::InactiveChain(id))) if id == chain_id => {
2262                // The chain is visibly not active (yet or any more) so there is no need
2263                // to synchronize received certificates.
2264                return Ok(());
2265            }
2266            Err(error) => {
2267                return Err(error.into());
2268            }
2269        };
2270        self.receive_certificates_from_validators(received_certificate_batches)
2271            .await;
2272        Ok(())
2273    }
2274
2275    /// Sends money.
2276    #[instrument(level = "trace")]
2277    pub async fn transfer(
2278        &self,
2279        owner: AccountOwner,
2280        amount: Amount,
2281        recipient: Account,
2282    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2283        // TODO(#467): check the balance of `owner` before signing any block proposal.
2284        self.execute_operation(SystemOperation::Transfer {
2285            owner,
2286            recipient,
2287            amount,
2288        })
2289        .await
2290    }
2291
2292    /// Verify if a data blob is readable from storage.
2293    // TODO(#2490): Consider removing or renaming this.
2294    #[instrument(level = "trace")]
2295    pub async fn read_data_blob(
2296        &self,
2297        hash: CryptoHash,
2298    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2299        let blob_id = BlobId {
2300            hash,
2301            blob_type: BlobType::Data,
2302        };
2303        self.execute_operation(SystemOperation::VerifyBlob { blob_id })
2304            .await
2305    }
2306
2307    /// Claims money in a remote chain.
2308    #[instrument(level = "trace")]
2309    pub async fn claim(
2310        &self,
2311        owner: AccountOwner,
2312        target_id: ChainId,
2313        recipient: Account,
2314        amount: Amount,
2315    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2316        self.execute_operation(SystemOperation::Claim {
2317            owner,
2318            target_id,
2319            recipient,
2320            amount,
2321        })
2322        .await
2323    }
2324
2325    /// Requests a leader timeout vote from all validators. If a quorum signs it, creates a
2326    /// certificate and sends it to all validators, to make them enter the next round.
2327    #[instrument(level = "trace")]
2328    pub async fn request_leader_timeout(&self) -> Result<TimeoutCertificate, ChainClientError> {
2329        let chain_id = self.chain_id;
2330        let info = self.chain_info_with_committees().await?;
2331        let committee = info.current_committee()?;
2332        let height = info.next_block_height;
2333        let round = info.manager.current_round;
2334        let action = CommunicateAction::RequestTimeout {
2335            height,
2336            round,
2337            chain_id,
2338        };
2339        let value = Timeout::new(chain_id, height, info.epoch);
2340        let certificate = Box::new(
2341            self.client
2342                .communicate_chain_action(committee, action, value)
2343                .await?,
2344        );
2345        self.client.process_certificate(certificate.clone()).await?;
2346        // The block height didn't increase, but this will communicate the timeout as well.
2347        self.client
2348            .communicate_chain_updates(
2349                committee,
2350                chain_id,
2351                height,
2352                CrossChainMessageDelivery::NonBlocking,
2353            )
2354            .await?;
2355        Ok(*certificate)
2356    }
2357
2358    /// Downloads and processes any certificates we are missing for the given chain.
2359    #[instrument(level = "trace", skip_all)]
2360    pub async fn synchronize_chain_state(
2361        &self,
2362        chain_id: ChainId,
2363    ) -> Result<Box<ChainInfo>, ChainClientError> {
2364        self.client.synchronize_chain_state(chain_id).await
2365    }
2366
2367    /// Downloads and processes any certificates we are missing for this chain, from the given
2368    /// committee.
2369    #[instrument(level = "trace", skip_all)]
2370    pub async fn synchronize_chain_state_from_committee(
2371        &self,
2372        committee: Committee,
2373    ) -> Result<Box<ChainInfo>, ChainClientError> {
2374        self.client
2375            .synchronize_chain_state_from_committee(self.chain_id, committee)
2376            .await
2377    }
2378
2379    /// Executes a list of operations.
2380    #[instrument(level = "trace", skip(operations, blobs))]
2381    pub async fn execute_operations(
2382        &self,
2383        operations: Vec<Operation>,
2384        blobs: Vec<Blob>,
2385    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2386        let timing_start = linera_base::time::Instant::now();
2387
2388        let result = loop {
2389            let execute_block_start = linera_base::time::Instant::now();
2390            // TODO(#2066): Remove boxing once the call-stack is shallower
2391            match Box::pin(self.execute_block(operations.clone(), blobs.clone())).await {
2392                Ok(ExecuteBlockOutcome::Executed(certificate)) => {
2393                    self.send_timing(execute_block_start, TimingType::ExecuteBlock);
2394                    break Ok(ClientOutcome::Committed(certificate));
2395                }
2396                Ok(ExecuteBlockOutcome::WaitForTimeout(timeout)) => {
2397                    break Ok(ClientOutcome::WaitForTimeout(timeout));
2398                }
2399                Ok(ExecuteBlockOutcome::Conflict(certificate)) => {
2400                    info!(
2401                        height = %certificate.block().header.height,
2402                        "Another block was committed; retrying."
2403                    );
2404                }
2405                Err(ChainClientError::CommunicationError(CommunicationError::Trusted(
2406                    NodeError::UnexpectedBlockHeight {
2407                        expected_block_height,
2408                        found_block_height,
2409                    },
2410                ))) if expected_block_height > found_block_height => {
2411                    tracing::info!(
2412                        "Local state is outdated; synchronizing chain {:.8}",
2413                        self.chain_id
2414                    );
2415                    self.synchronize_chain_state(self.chain_id).await?;
2416                }
2417                Err(err) => return Err(err),
2418            };
2419        };
2420
2421        self.send_timing(timing_start, TimingType::ExecuteOperations);
2422
2423        result
2424    }
2425
2426    /// Executes an operation.
2427    pub async fn execute_operation(
2428        &self,
2429        operation: impl Into<Operation>,
2430    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2431        self.execute_operations(vec![operation.into()], vec![])
2432            .await
2433    }
2434
2435    /// Executes a new block.
2436    ///
2437    /// This must be preceded by a call to `prepare_chain()`.
2438    #[instrument(level = "trace", skip(operations, blobs))]
2439    async fn execute_block(
2440        &self,
2441        operations: Vec<Operation>,
2442        blobs: Vec<Blob>,
2443    ) -> Result<ExecuteBlockOutcome, ChainClientError> {
2444        #[cfg(with_metrics)]
2445        let _latency = metrics::EXECUTE_BLOCK_LATENCY.measure_latency();
2446
2447        let mutex = self.client_mutex();
2448        let _guard = mutex.lock_owned().await;
2449        // TOOD: We shouldn't need to call this explicitly.
2450        match self.process_pending_block_without_prepare().await? {
2451            ClientOutcome::Committed(Some(certificate)) => {
2452                return Ok(ExecuteBlockOutcome::Conflict(certificate))
2453            }
2454            ClientOutcome::WaitForTimeout(timeout) => {
2455                return Ok(ExecuteBlockOutcome::WaitForTimeout(timeout))
2456            }
2457            ClientOutcome::Committed(None) => {}
2458        }
2459
2460        let incoming_bundles = self.pending_message_bundles().await?;
2461        let identity = self.identity().await?;
2462        let confirmed_value = self
2463            .new_pending_block(incoming_bundles, operations, blobs, identity)
2464            .await?;
2465
2466        match self.process_pending_block_without_prepare().await? {
2467            ClientOutcome::Committed(Some(certificate))
2468                if certificate.block() == confirmed_value.block() =>
2469            {
2470                Ok(ExecuteBlockOutcome::Executed(certificate))
2471            }
2472            ClientOutcome::Committed(Some(certificate)) => {
2473                Ok(ExecuteBlockOutcome::Conflict(certificate))
2474            }
2475            // Should be unreachable: We did set a pending block.
2476            ClientOutcome::Committed(None) => Err(ChainClientError::BlockProposalError(
2477                "Unexpected block proposal error",
2478            )),
2479            ClientOutcome::WaitForTimeout(timeout) => {
2480                Ok(ExecuteBlockOutcome::WaitForTimeout(timeout))
2481            }
2482        }
2483    }
2484
2485    /// Creates a new pending block and handles the proposal in the local node.
2486    /// Next time `process_pending_block_without_prepare` is called, this block will be proposed
2487    /// to the validators.
2488    #[instrument(level = "trace", skip(incoming_bundles, operations, blobs))]
2489    async fn new_pending_block(
2490        &self,
2491        incoming_bundles: Vec<IncomingBundle>,
2492        operations: Vec<Operation>,
2493        blobs: Vec<Blob>,
2494        identity: AccountOwner,
2495    ) -> Result<ConfirmedBlock, ChainClientError> {
2496        ensure!(
2497            self.pending_proposal().is_none(),
2498            ChainClientError::BlockProposalError(
2499                "Client state already has a pending block; \
2500                use the `linera retry-pending-block` command to commit that first"
2501            )
2502        );
2503        let info = self.chain_info().await?;
2504        let timestamp = self.next_timestamp(&incoming_bundles, info.timestamp);
2505        let transactions = incoming_bundles
2506            .into_iter()
2507            .map(Transaction::ReceiveMessages)
2508            .chain(operations.into_iter().map(Transaction::ExecuteOperation))
2509            .collect::<Vec<_>>();
2510        let proposed_block = ProposedBlock {
2511            epoch: info.epoch,
2512            chain_id: self.chain_id,
2513            transactions,
2514            previous_block_hash: info.block_hash,
2515            height: info.next_block_height,
2516            authenticated_signer: Some(identity),
2517            timestamp,
2518        };
2519
2520        // Use the round number assuming there are oracle responses.
2521        // Using the round number during execution counts as an oracle.
2522        // Accessing the round number in single-leader rounds where we are not the leader
2523        // is not currently supported.
2524        let round = match Self::round_for_new_proposal(&info, &identity, true)? {
2525            Either::Left(round) => round.multi_leader(),
2526            Either::Right(_) => None,
2527        };
2528        // Make sure every incoming message succeeds and otherwise remove them.
2529        // Also, compute the final certified hash while we're at it.
2530        let (block, _) = self
2531            .client
2532            .stage_block_execution_and_discard_failing_messages(
2533                proposed_block,
2534                round,
2535                blobs.clone(),
2536            )
2537            .await?;
2538        let (proposed_block, _) = block.clone().into_proposal();
2539        self.update_state(|state| {
2540            state.set_pending_proposal(proposed_block.clone(), blobs.clone())
2541        });
2542        Ok(ConfirmedBlock::new(block))
2543    }
2544
2545    /// Returns a suitable timestamp for the next block.
2546    ///
2547    /// This will usually be the current time according to the local clock, but may be slightly
2548    /// ahead to make sure it's not earlier than the incoming messages or the previous block.
2549    #[instrument(level = "trace", skip(incoming_bundles))]
2550    fn next_timestamp(
2551        &self,
2552        incoming_bundles: &[IncomingBundle],
2553        block_time: Timestamp,
2554    ) -> Timestamp {
2555        let local_time = self.storage_client().clock().current_time();
2556        incoming_bundles
2557            .iter()
2558            .map(|msg| msg.bundle.timestamp)
2559            .max()
2560            .map_or(local_time, |timestamp| timestamp.max(local_time))
2561            .max(block_time)
2562    }
2563
2564    /// Queries an application.
2565    #[instrument(level = "trace", skip(query))]
2566    pub async fn query_application(&self, query: Query) -> Result<QueryOutcome, ChainClientError> {
2567        loop {
2568            let result = self
2569                .client
2570                .local_node
2571                .query_application(self.chain_id, query.clone())
2572                .await;
2573            if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
2574                self.client
2575                    .receive_certificates_for_blobs(blob_ids.clone())
2576                    .await?;
2577                continue; // We found the missing blob: retry.
2578            }
2579            return Ok(result?);
2580        }
2581    }
2582
2583    /// Queries a system application.
2584    #[instrument(level = "trace", skip(query))]
2585    pub async fn query_system_application(
2586        &self,
2587        query: SystemQuery,
2588    ) -> Result<QueryOutcome<SystemResponse>, ChainClientError> {
2589        let QueryOutcome {
2590            response,
2591            operations,
2592        } = self.query_application(Query::System(query)).await?;
2593        match response {
2594            QueryResponse::System(response) => Ok(QueryOutcome {
2595                response,
2596                operations,
2597            }),
2598            _ => Err(ChainClientError::InternalError(
2599                "Unexpected response for system query",
2600            )),
2601        }
2602    }
2603
2604    /// Queries a user application.
2605    #[instrument(level = "trace", skip(application_id, query))]
2606    pub async fn query_user_application<A: Abi>(
2607        &self,
2608        application_id: ApplicationId<A>,
2609        query: &A::Query,
2610    ) -> Result<QueryOutcome<A::QueryResponse>, ChainClientError> {
2611        let query = Query::user(application_id, query)?;
2612        let QueryOutcome {
2613            response,
2614            operations,
2615        } = self.query_application(query).await?;
2616        match response {
2617            QueryResponse::User(response_bytes) => {
2618                let response = serde_json::from_slice(&response_bytes)?;
2619                Ok(QueryOutcome {
2620                    response,
2621                    operations,
2622                })
2623            }
2624            _ => Err(ChainClientError::InternalError(
2625                "Unexpected response for user query",
2626            )),
2627        }
2628    }
2629
2630    /// Obtains the local balance of the chain account after staging the execution of
2631    /// incoming messages in a new block.
2632    ///
2633    /// Does not attempt to synchronize with validators. The result will reflect up to
2634    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
2635    /// block.
2636    #[instrument(level = "trace")]
2637    pub async fn query_balance(&self) -> Result<Amount, ChainClientError> {
2638        let (balance, _) = self.query_balances_with_owner(AccountOwner::CHAIN).await?;
2639        Ok(balance)
2640    }
2641
2642    /// Obtains the local balance of an account after staging the execution of incoming messages in
2643    /// a new block.
2644    ///
2645    /// Does not attempt to synchronize with validators. The result will reflect up to
2646    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
2647    /// block.
2648    #[instrument(level = "trace", skip(owner))]
2649    pub async fn query_owner_balance(
2650        &self,
2651        owner: AccountOwner,
2652    ) -> Result<Amount, ChainClientError> {
2653        if owner.is_chain() {
2654            self.query_balance().await
2655        } else {
2656            Ok(self
2657                .query_balances_with_owner(owner)
2658                .await?
2659                .1
2660                .unwrap_or(Amount::ZERO))
2661        }
2662    }
2663
2664    /// Obtains the local balance of an account and optionally another user after staging the
2665    /// execution of incoming messages in a new block.
2666    ///
2667    /// Does not attempt to synchronize with validators. The result will reflect up to
2668    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
2669    /// block.
2670    #[instrument(level = "trace", skip(owner))]
2671    async fn query_balances_with_owner(
2672        &self,
2673        owner: AccountOwner,
2674    ) -> Result<(Amount, Option<Amount>), ChainClientError> {
2675        let incoming_bundles = self.pending_message_bundles().await?;
2676        // Since we disallow empty blocks, and there is no incoming messages,
2677        // that could change it, we query for the balance immediately.
2678        if incoming_bundles.is_empty() {
2679            let chain_balance = self.local_balance().await?;
2680            let owner_balance = self.local_owner_balance(owner).await?;
2681            return Ok((chain_balance, Some(owner_balance)));
2682        }
2683        let info = self.chain_info().await?;
2684        let timestamp = self.next_timestamp(&incoming_bundles, info.timestamp);
2685        let transactions = incoming_bundles
2686            .into_iter()
2687            .map(Transaction::ReceiveMessages)
2688            .collect::<Vec<_>>();
2689        let block = ProposedBlock {
2690            epoch: info.epoch,
2691            chain_id: self.chain_id,
2692            transactions,
2693            previous_block_hash: info.block_hash,
2694            height: info.next_block_height,
2695            authenticated_signer: if owner == AccountOwner::CHAIN {
2696                None
2697            } else {
2698                Some(owner)
2699            },
2700            timestamp,
2701        };
2702        match self
2703            .client
2704            .stage_block_execution_and_discard_failing_messages(block, None, Vec::new())
2705            .await
2706        {
2707            Ok((_, response)) => Ok((
2708                response.info.chain_balance,
2709                response.info.requested_owner_balance,
2710            )),
2711            Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
2712                WorkerError::ChainError(error),
2713            ))) if matches!(
2714                &*error,
2715                ChainError::ExecutionError(
2716                    execution_error,
2717                    ChainExecutionContext::Block
2718                ) if matches!(
2719                    **execution_error,
2720                    ExecutionError::FeesExceedFunding { .. }
2721                )
2722            ) =>
2723            {
2724                // We can't even pay for the execution of one empty block. Let's return zero.
2725                Ok((Amount::ZERO, Some(Amount::ZERO)))
2726            }
2727            Err(error) => Err(error),
2728        }
2729    }
2730
2731    /// Reads the local balance of the chain account.
2732    ///
2733    /// Does not process the inbox or attempt to synchronize with validators.
2734    #[instrument(level = "trace")]
2735    pub async fn local_balance(&self) -> Result<Amount, ChainClientError> {
2736        let (balance, _) = self.local_balances_with_owner(AccountOwner::CHAIN).await?;
2737        Ok(balance)
2738    }
2739
2740    /// Reads the local balance of a user account.
2741    ///
2742    /// Does not process the inbox or attempt to synchronize with validators.
2743    #[instrument(level = "trace", skip(owner))]
2744    pub async fn local_owner_balance(
2745        &self,
2746        owner: AccountOwner,
2747    ) -> Result<Amount, ChainClientError> {
2748        if owner.is_chain() {
2749            self.local_balance().await
2750        } else {
2751            Ok(self
2752                .local_balances_with_owner(owner)
2753                .await?
2754                .1
2755                .unwrap_or(Amount::ZERO))
2756        }
2757    }
2758
2759    /// Reads the local balance of the chain account and optionally another user.
2760    ///
2761    /// Does not process the inbox or attempt to synchronize with validators.
2762    #[instrument(level = "trace", skip(owner))]
2763    async fn local_balances_with_owner(
2764        &self,
2765        owner: AccountOwner,
2766    ) -> Result<(Amount, Option<Amount>), ChainClientError> {
2767        ensure!(
2768            self.chain_info().await?.next_block_height >= self.initial_next_block_height,
2769            ChainClientError::WalletSynchronizationError
2770        );
2771        let mut query = ChainInfoQuery::new(self.chain_id);
2772        query.request_owner_balance = owner;
2773        let response = self
2774            .client
2775            .local_node
2776            .handle_chain_info_query(query)
2777            .await?;
2778        Ok((
2779            response.info.chain_balance,
2780            response.info.requested_owner_balance,
2781        ))
2782    }
2783
2784    /// Sends tokens to a chain.
2785    #[instrument(level = "trace")]
2786    pub async fn transfer_to_account(
2787        &self,
2788        from: AccountOwner,
2789        amount: Amount,
2790        account: Account,
2791    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2792        self.transfer(from, amount, account).await
2793    }
2794
2795    /// Burns tokens (transfer to a special address).
2796    #[cfg(with_testing)]
2797    #[instrument(level = "trace")]
2798    pub async fn burn(
2799        &self,
2800        owner: AccountOwner,
2801        amount: Amount,
2802    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
2803        let recipient = Account::burn_address(self.chain_id);
2804        self.transfer(owner, amount, recipient).await
2805    }
2806
2807    /// Attempts to synchronize chains that have sent us messages and populate our local
2808    /// inbox.
2809    ///
2810    /// To create a block that actually executes the messages in the inbox,
2811    /// `process_inbox` must be called separately.
2812    #[instrument(level = "trace")]
2813    pub async fn synchronize_from_validators(&self) -> Result<Box<ChainInfo>, ChainClientError> {
2814        let info = self.prepare_chain().await?;
2815        self.synchronize_publisher_chains().await?;
2816        self.find_received_certificates().await?;
2817        Ok(info)
2818    }
2819
2820    /// Processes the last pending block
2821    #[instrument(level = "trace")]
2822    pub async fn process_pending_block(
2823        &self,
2824    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
2825        self.synchronize_from_validators().await?;
2826        self.process_pending_block_without_prepare().await
2827    }
2828
2829    /// Processes the last pending block. Assumes that the local chain is up to date.
2830    #[instrument(level = "trace")]
2831    async fn process_pending_block_without_prepare(
2832        &self,
2833    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
2834        let info = self.request_leader_timeout_if_needed().await?;
2835
2836        // If there is a validated block in the current round, finalize it.
2837        if info.manager.has_locking_block_in_current_round()
2838            && !info.manager.current_round.is_fast()
2839        {
2840            return self.finalize_locking_block(info).await;
2841        }
2842        let owner = self.identity().await?;
2843
2844        let local_node = &self.client.local_node;
2845        // Otherwise we have to re-propose the highest validated block, if there is one.
2846        let pending_proposal = self.pending_proposal();
2847        let (block, blobs) = if let Some(locking) = &info.manager.requested_locking {
2848            match &**locking {
2849                LockingBlock::Regular(certificate) => {
2850                    let blob_ids = certificate.block().required_blob_ids();
2851                    let blobs = local_node
2852                        .get_locking_blobs(&blob_ids, self.chain_id)
2853                        .await?
2854                        .ok_or_else(|| {
2855                            ChainClientError::InternalError("Missing local locking blobs")
2856                        })?;
2857                    debug!("Retrying locking block from round {}", certificate.round);
2858                    (certificate.block().clone(), blobs)
2859                }
2860                LockingBlock::Fast(proposal) => {
2861                    let proposed_block = proposal.content.block.clone();
2862                    let blob_ids = proposed_block.published_blob_ids();
2863                    let blobs = local_node
2864                        .get_locking_blobs(&blob_ids, self.chain_id)
2865                        .await?
2866                        .ok_or_else(|| {
2867                            ChainClientError::InternalError("Missing local locking blobs")
2868                        })?;
2869                    let block = self
2870                        .client
2871                        .stage_block_execution(proposed_block, None, blobs.clone())
2872                        .await?
2873                        .0;
2874                    debug!("Retrying locking block from fast round.");
2875                    (block, blobs)
2876                }
2877            }
2878        } else if let Some(pending_proposal) = pending_proposal {
2879            // Otherwise we are free to propose our own pending block.
2880            // Use the round number assuming there are oracle responses.
2881            // Using the round number during execution counts as an oracle.
2882            let proposed_block = pending_proposal.block;
2883            let round = match Self::round_for_new_proposal(&info, &owner, true)? {
2884                Either::Left(round) => round.multi_leader(),
2885                Either::Right(_) => None,
2886            };
2887            let (block, _) = self
2888                .client
2889                .stage_block_execution(proposed_block, round, pending_proposal.blobs.clone())
2890                .await?;
2891            debug!("Proposing the local pending block.");
2892            (block, pending_proposal.blobs)
2893        } else {
2894            return Ok(ClientOutcome::Committed(None)); // Nothing to do.
2895        };
2896
2897        let has_oracle_responses = block.has_oracle_responses();
2898        let (proposed_block, outcome) = block.into_proposal();
2899        let round = match Self::round_for_new_proposal(&info, &owner, has_oracle_responses)? {
2900            Either::Left(round) => round,
2901            Either::Right(timeout) => return Ok(ClientOutcome::WaitForTimeout(timeout)),
2902        };
2903        debug!("Proposing block for round {}", round);
2904
2905        let already_handled_locally = info
2906            .manager
2907            .already_handled_proposal(round, &proposed_block);
2908        // Create the final block proposal.
2909        let proposal = if let Some(locking) = info.manager.requested_locking {
2910            Box::new(match *locking {
2911                LockingBlock::Regular(cert) => {
2912                    BlockProposal::new_retry_regular(owner, round, cert, self.signer())
2913                        .await
2914                        .map_err(ChainClientError::signer_failure)?
2915                }
2916                LockingBlock::Fast(proposal) => {
2917                    BlockProposal::new_retry_fast(owner, round, proposal, self.signer())
2918                        .await
2919                        .map_err(ChainClientError::signer_failure)?
2920                }
2921            })
2922        } else {
2923            Box::new(
2924                BlockProposal::new_initial(owner, round, proposed_block.clone(), self.signer())
2925                    .await
2926                    .map_err(ChainClientError::signer_failure)?,
2927            )
2928        };
2929        if !already_handled_locally {
2930            // Check the final block proposal. This will be cheaper after #1401.
2931            if let Err(err) = local_node.handle_block_proposal(*proposal.clone()).await {
2932                match err {
2933                    LocalNodeError::BlobsNotFound(_) => {
2934                        local_node
2935                            .handle_pending_blobs(self.chain_id, blobs)
2936                            .await?;
2937                        local_node.handle_block_proposal(*proposal.clone()).await?;
2938                    }
2939                    err => return Err(err.into()),
2940                }
2941            }
2942        }
2943        let committee = self.local_committee().await?;
2944        let block = Block::new(proposed_block, outcome);
2945        // Send the query to validators.
2946        let submit_block_proposal_start = linera_base::time::Instant::now();
2947        let certificate = if round.is_fast() {
2948            let hashed_value = ConfirmedBlock::new(block);
2949            self.client
2950                .submit_block_proposal(&committee, proposal, hashed_value)
2951                .await?
2952        } else {
2953            let hashed_value = ValidatedBlock::new(block);
2954            let certificate = self
2955                .client
2956                .submit_block_proposal(&committee, proposal, hashed_value.clone())
2957                .await?;
2958            self.client.finalize_block(&committee, certificate).await?
2959        };
2960        self.send_timing(submit_block_proposal_start, TimingType::SubmitBlockProposal);
2961        debug!(round = %certificate.round, "Sending confirmed block to validators");
2962        self.update_validators(Some(&committee)).await?;
2963        Ok(ClientOutcome::Committed(Some(certificate)))
2964    }
2965
2966    fn send_timing(&self, start: Instant, timing_type: TimingType) {
2967        let Some(sender) = &self.timing_sender else {
2968            return;
2969        };
2970        if let Err(err) = sender.send((start.elapsed().as_millis() as u64, timing_type)) {
2971            tracing::warn!(%err, "Failed to send timing info");
2972        }
2973    }
2974
2975    /// Requests a leader timeout certificate if the current round has timed out. Returns the
2976    /// chain info for the (possibly new) current round.
2977    async fn request_leader_timeout_if_needed(&self) -> Result<Box<ChainInfo>, ChainClientError> {
2978        let mut info = self.chain_info_with_manager_values().await?;
2979        // If the current round has timed out, we request a timeout certificate and retry in
2980        // the next round.
2981        if let Some(round_timeout) = info.manager.round_timeout {
2982            if round_timeout <= self.storage_client().clock().current_time() {
2983                self.request_leader_timeout().await?;
2984                info = self.chain_info_with_manager_values().await?;
2985            }
2986        }
2987        Ok(info)
2988    }
2989
2990    /// Finalizes the locking block.
2991    ///
2992    /// Panics if there is no locking block; fails if the locking block is not in the current round.
2993    async fn finalize_locking_block(
2994        &self,
2995        info: Box<ChainInfo>,
2996    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
2997        let locking = info
2998            .manager
2999            .requested_locking
3000            .expect("Should have a locking block");
3001        let LockingBlock::Regular(certificate) = *locking else {
3002            panic!("Should have a locking validated block");
3003        };
3004        debug!(
3005            round = %certificate.round,
3006            "Finalizing locking block"
3007        );
3008        let committee = self.local_committee().await?;
3009        match self
3010            .client
3011            .finalize_block(&committee, certificate.clone())
3012            .await
3013        {
3014            Ok(certificate) => {
3015                self.update_validators(Some(&committee)).await?;
3016                Ok(ClientOutcome::Committed(Some(certificate)))
3017            }
3018            Err(ChainClientError::CommunicationError(error)) => {
3019                // Communication errors in this case often mean that someone else already
3020                // finalized the block or started another round.
3021                let timestamp = info.manager.round_timeout.ok_or(error)?;
3022                Ok(ClientOutcome::WaitForTimeout(RoundTimeout {
3023                    timestamp,
3024                    current_round: info.manager.current_round,
3025                    next_block_height: info.next_block_height,
3026                }))
3027            }
3028            Err(error) => Err(error),
3029        }
3030    }
3031
3032    /// Returns a round in which we can propose a new block or the given one, if possible.
3033    fn round_for_new_proposal(
3034        info: &ChainInfo,
3035        identity: &AccountOwner,
3036        has_oracle_responses: bool,
3037    ) -> Result<Either<Round, RoundTimeout>, ChainClientError> {
3038        let manager = &info.manager;
3039        // If there is a conflicting proposal in the current round, we can only propose if the
3040        // next round can be started without a timeout, i.e. if we are in a multi-leader round.
3041        // Similarly, we cannot propose a block that uses oracles in the fast round.
3042        let conflict = manager
3043            .requested_signed_proposal
3044            .as_ref()
3045            .into_iter()
3046            .chain(&manager.requested_proposed)
3047            .any(|proposal| proposal.content.round == manager.current_round)
3048            || (manager.current_round.is_fast() && has_oracle_responses);
3049        let round = if !conflict {
3050            manager.current_round
3051        } else if let Some(round) = manager
3052            .ownership
3053            .next_round(manager.current_round)
3054            .filter(|_| manager.current_round.is_multi_leader() || manager.current_round.is_fast())
3055        {
3056            round
3057        } else if let Some(timeout) = info.round_timeout() {
3058            return Ok(Either::Right(timeout));
3059        } else {
3060            return Err(ChainClientError::BlockProposalError(
3061                "Conflicting proposal in the current round",
3062            ));
3063        };
3064        if manager.can_propose(identity, round) {
3065            return Ok(Either::Left(round));
3066        }
3067        if let Some(timeout) = info.round_timeout() {
3068            return Ok(Either::Right(timeout));
3069        }
3070        Err(ChainClientError::BlockProposalError(
3071            "Not a leader in the current round",
3072        ))
3073    }
3074
3075    /// Clears the information on any operation that previously failed.
3076    #[cfg(with_testing)]
3077    #[instrument(level = "trace")]
3078    pub fn clear_pending_proposal(&self) {
3079        self.update_state(|state| state.clear_pending_proposal());
3080    }
3081
3082    /// Processes a confirmed block for which this chain is a recipient and updates validators.
3083    #[instrument(
3084        level = "trace",
3085        skip(certificate),
3086        fields(certificate_hash = ?certificate.hash()),
3087    )]
3088    pub async fn receive_certificate_and_update_validators(
3089        &self,
3090        certificate: ConfirmedBlockCertificate,
3091    ) -> Result<(), ChainClientError> {
3092        self.client
3093            .receive_certificate_and_update_validators(
3094                certificate,
3095                ReceiveCertificateMode::NeedsCheck,
3096            )
3097            .await
3098    }
3099
3100    /// Rotates the key of the chain.
3101    ///
3102    /// Replaces current owners of the chain with the new key pair.
3103    #[instrument(level = "trace")]
3104    pub async fn rotate_key_pair(
3105        &self,
3106        public_key: AccountPublicKey,
3107    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3108        self.transfer_ownership(public_key.into()).await
3109    }
3110
3111    /// Transfers ownership of the chain to a single super owner.
3112    #[instrument(level = "trace")]
3113    pub async fn transfer_ownership(
3114        &self,
3115        new_owner: AccountOwner,
3116    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3117        self.execute_operation(SystemOperation::ChangeOwnership {
3118            super_owners: vec![new_owner],
3119            owners: Vec::new(),
3120            multi_leader_rounds: 2,
3121            open_multi_leader_rounds: false,
3122            timeout_config: TimeoutConfig::default(),
3123        })
3124        .await
3125    }
3126
3127    /// Adds another owner to the chain, and turns existing super owners into regular owners.
3128    #[instrument(level = "trace")]
3129    pub async fn share_ownership(
3130        &self,
3131        new_owner: AccountOwner,
3132        new_weight: u64,
3133    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3134        loop {
3135            let ownership = self.prepare_chain().await?.manager.ownership;
3136            ensure!(
3137                ownership.is_active(),
3138                ChainError::InactiveChain(self.chain_id)
3139            );
3140            let mut owners = ownership.owners.into_iter().collect::<Vec<_>>();
3141            owners.extend(ownership.super_owners.into_iter().zip(iter::repeat(100)));
3142            owners.push((new_owner, new_weight));
3143            let operations = vec![Operation::system(SystemOperation::ChangeOwnership {
3144                super_owners: Vec::new(),
3145                owners,
3146                multi_leader_rounds: ownership.multi_leader_rounds,
3147                open_multi_leader_rounds: ownership.open_multi_leader_rounds,
3148                timeout_config: ownership.timeout_config,
3149            })];
3150            match self.execute_block(operations, vec![]).await? {
3151                ExecuteBlockOutcome::Executed(certificate) => {
3152                    return Ok(ClientOutcome::Committed(certificate));
3153                }
3154                ExecuteBlockOutcome::Conflict(certificate) => {
3155                    info!(
3156                        height = %certificate.block().header.height,
3157                        "Another block was committed; retrying."
3158                    );
3159                }
3160                ExecuteBlockOutcome::WaitForTimeout(timeout) => {
3161                    return Ok(ClientOutcome::WaitForTimeout(timeout));
3162                }
3163            };
3164        }
3165    }
3166
3167    /// Changes the ownership of this chain. Fails if it would remove existing owners, unless
3168    /// `remove_owners` is `true`.
3169    #[instrument(level = "trace")]
3170    pub async fn change_ownership(
3171        &self,
3172        ownership: ChainOwnership,
3173    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3174        self.execute_operation(SystemOperation::ChangeOwnership {
3175            super_owners: ownership.super_owners.into_iter().collect(),
3176            owners: ownership.owners.into_iter().collect(),
3177            multi_leader_rounds: ownership.multi_leader_rounds,
3178            open_multi_leader_rounds: ownership.open_multi_leader_rounds,
3179            timeout_config: ownership.timeout_config.clone(),
3180        })
3181        .await
3182    }
3183
3184    /// Changes the application permissions configuration on this chain.
3185    #[instrument(level = "trace", skip(application_permissions))]
3186    pub async fn change_application_permissions(
3187        &self,
3188        application_permissions: ApplicationPermissions,
3189    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3190        self.execute_operation(SystemOperation::ChangeApplicationPermissions(
3191            application_permissions,
3192        ))
3193        .await
3194    }
3195
3196    /// Opens a new chain with a derived UID.
3197    #[instrument(level = "trace", skip(self))]
3198    pub async fn open_chain(
3199        &self,
3200        ownership: ChainOwnership,
3201        application_permissions: ApplicationPermissions,
3202        balance: Amount,
3203    ) -> Result<ClientOutcome<(ChainDescription, ConfirmedBlockCertificate)>, ChainClientError>
3204    {
3205        loop {
3206            let config = OpenChainConfig {
3207                ownership: ownership.clone(),
3208                balance,
3209                application_permissions: application_permissions.clone(),
3210            };
3211            let operation = Operation::system(SystemOperation::OpenChain(config));
3212            let certificate = match self.execute_block(vec![operation], vec![]).await? {
3213                ExecuteBlockOutcome::Executed(certificate) => certificate,
3214                ExecuteBlockOutcome::Conflict(_) => continue,
3215                ExecuteBlockOutcome::WaitForTimeout(timeout) => {
3216                    return Ok(ClientOutcome::WaitForTimeout(timeout));
3217                }
3218            };
3219            // The only operation, i.e. the last transaction, created the new chain.
3220            let chain_blob = certificate
3221                .block()
3222                .body
3223                .blobs
3224                .last()
3225                .and_then(|blobs| blobs.last())
3226                .ok_or_else(|| ChainClientError::InternalError("Failed to create a new chain"))?;
3227            let description = bcs::from_bytes::<ChainDescription>(chain_blob.bytes())?;
3228            // Add the new chain to the list of tracked chains
3229            self.client.track_chain(description.id());
3230            self.client
3231                .local_node
3232                .retry_pending_cross_chain_requests(self.chain_id)
3233                .await?;
3234            return Ok(ClientOutcome::Committed((description, certificate)));
3235        }
3236    }
3237
3238    /// Closes the chain (and loses everything in it!!).
3239    /// Returns `None` if the chain was already closed.
3240    #[instrument(level = "trace")]
3241    pub async fn close_chain(
3242        &self,
3243    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, ChainClientError> {
3244        match self.execute_operation(SystemOperation::CloseChain).await {
3245            Ok(outcome) => Ok(outcome.map(Some)),
3246            Err(ChainClientError::LocalNodeError(LocalNodeError::WorkerError(
3247                WorkerError::ChainError(chain_error),
3248            ))) if matches!(*chain_error, ChainError::ClosedChain) => {
3249                Ok(ClientOutcome::Committed(None)) // Chain is already closed.
3250            }
3251            Err(error) => Err(error),
3252        }
3253    }
3254
3255    /// Publishes some module.
3256    #[cfg(not(target_arch = "wasm32"))]
3257    #[instrument(level = "trace", skip(contract, service))]
3258    pub async fn publish_module(
3259        &self,
3260        contract: Bytecode,
3261        service: Bytecode,
3262        vm_runtime: VmRuntime,
3263    ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, ChainClientError> {
3264        let (blobs, module_id) = create_bytecode_blobs(contract, service, vm_runtime).await;
3265        self.publish_module_blobs(blobs, module_id).await
3266    }
3267
3268    /// Publishes some module.
3269    #[cfg(not(target_arch = "wasm32"))]
3270    #[instrument(level = "trace", skip(blobs, module_id))]
3271    pub async fn publish_module_blobs(
3272        &self,
3273        blobs: Vec<Blob>,
3274        module_id: ModuleId,
3275    ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, ChainClientError> {
3276        self.execute_operations(
3277            vec![Operation::system(SystemOperation::PublishModule {
3278                module_id,
3279            })],
3280            blobs,
3281        )
3282        .await?
3283        .try_map(|certificate| Ok((module_id, certificate)))
3284    }
3285
3286    /// Publishes some data blobs.
3287    #[instrument(level = "trace", skip(bytes))]
3288    pub async fn publish_data_blobs(
3289        &self,
3290        bytes: Vec<Vec<u8>>,
3291    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3292        let blobs = bytes.into_iter().map(Blob::new_data);
3293        let publish_blob_operations = blobs
3294            .clone()
3295            .map(|blob| {
3296                Operation::system(SystemOperation::PublishDataBlob {
3297                    blob_hash: blob.id().hash,
3298                })
3299            })
3300            .collect();
3301        self.execute_operations(publish_blob_operations, blobs.collect())
3302            .await
3303    }
3304
3305    /// Publishes some data blob.
3306    #[instrument(level = "trace", skip(bytes))]
3307    pub async fn publish_data_blob(
3308        &self,
3309        bytes: Vec<u8>,
3310    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3311        self.publish_data_blobs(vec![bytes]).await
3312    }
3313
3314    /// Creates an application by instantiating some bytecode.
3315    #[instrument(
3316        level = "trace",
3317        skip(self, parameters, instantiation_argument, required_application_ids)
3318    )]
3319    pub async fn create_application<
3320        A: Abi,
3321        Parameters: Serialize,
3322        InstantiationArgument: Serialize,
3323    >(
3324        &self,
3325        module_id: ModuleId<A, Parameters, InstantiationArgument>,
3326        parameters: &Parameters,
3327        instantiation_argument: &InstantiationArgument,
3328        required_application_ids: Vec<ApplicationId>,
3329    ) -> Result<ClientOutcome<(ApplicationId<A>, ConfirmedBlockCertificate)>, ChainClientError>
3330    {
3331        let instantiation_argument = serde_json::to_vec(instantiation_argument)?;
3332        let parameters = serde_json::to_vec(parameters)?;
3333        Ok(self
3334            .create_application_untyped(
3335                module_id.forget_abi(),
3336                parameters,
3337                instantiation_argument,
3338                required_application_ids,
3339            )
3340            .await?
3341            .map(|(app_id, cert)| (app_id.with_abi(), cert)))
3342    }
3343
3344    /// Creates an application by instantiating some bytecode.
3345    #[instrument(
3346        level = "trace",
3347        skip(
3348            self,
3349            module_id,
3350            parameters,
3351            instantiation_argument,
3352            required_application_ids
3353        )
3354    )]
3355    pub async fn create_application_untyped(
3356        &self,
3357        module_id: ModuleId,
3358        parameters: Vec<u8>,
3359        instantiation_argument: Vec<u8>,
3360        required_application_ids: Vec<ApplicationId>,
3361    ) -> Result<ClientOutcome<(ApplicationId, ConfirmedBlockCertificate)>, ChainClientError> {
3362        self.execute_operation(SystemOperation::CreateApplication {
3363            module_id,
3364            parameters,
3365            instantiation_argument,
3366            required_application_ids,
3367        })
3368        .await?
3369        .try_map(|certificate| {
3370            // The first message of the only operation created the application.
3371            let mut creation: Vec<_> = certificate
3372                .block()
3373                .created_blob_ids()
3374                .into_iter()
3375                .filter(|blob_id| blob_id.blob_type == BlobType::ApplicationDescription)
3376                .collect();
3377            if creation.len() > 1 {
3378                return Err(ChainClientError::InternalError(
3379                    "Unexpected number of application descriptions published",
3380                ));
3381            }
3382            let blob_id = creation.pop().ok_or(ChainClientError::InternalError(
3383                "ApplicationDescription blob not found.",
3384            ))?;
3385            let id = ApplicationId::new(blob_id.hash);
3386            Ok((id, certificate))
3387        })
3388    }
3389
3390    /// Creates a new committee and starts using it (admin chains only).
3391    #[instrument(level = "trace", skip(committee))]
3392    pub async fn stage_new_committee(
3393        &self,
3394        committee: Committee,
3395    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3396        let blob = Blob::new(BlobContent::new_committee(bcs::to_bytes(&committee)?));
3397        let blob_hash = blob.id().hash;
3398        match self
3399            .execute_operations(
3400                vec![Operation::system(SystemOperation::Admin(
3401                    AdminOperation::PublishCommitteeBlob { blob_hash },
3402                ))],
3403                vec![blob],
3404            )
3405            .await?
3406        {
3407            ClientOutcome::Committed(_) => {}
3408            outcome @ ClientOutcome::WaitForTimeout(_) => return Ok(outcome),
3409        }
3410        let epoch = self.chain_info().await?.epoch.try_add_one()?;
3411        self.execute_operation(SystemOperation::Admin(AdminOperation::CreateCommittee {
3412            epoch,
3413            blob_hash,
3414        }))
3415        .await
3416    }
3417
3418    /// Synchronizes the chain with the validators and creates blocks without any operations to
3419    /// process all incoming messages. This may require several blocks.
3420    ///
3421    /// If not all certificates could be processed due to a timeout, the timestamp for when to retry
3422    /// is returned, too.
3423    #[instrument(level = "trace")]
3424    pub async fn process_inbox(
3425        &self,
3426    ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), ChainClientError> {
3427        self.prepare_chain().await?;
3428        self.process_inbox_without_prepare().await
3429    }
3430
3431    /// Creates blocks without any operations to process all incoming messages. This may require
3432    /// several blocks.
3433    ///
3434    /// If not all certificates could be processed due to a timeout, the timestamp for when to retry
3435    /// is returned, too.
3436    #[instrument(level = "trace")]
3437    pub async fn process_inbox_without_prepare(
3438        &self,
3439    ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), ChainClientError> {
3440        #[cfg(with_metrics)]
3441        let _latency = metrics::PROCESS_INBOX_WITHOUT_PREPARE_LATENCY.measure_latency();
3442
3443        let mut epoch_change_ops = self.collect_epoch_changes().await?.into_iter();
3444
3445        let mut certificates = Vec::new();
3446        loop {
3447            let incoming_bundles = self.pending_message_bundles().await?;
3448            let stream_updates = self.collect_stream_updates().await?;
3449            let block_operations = stream_updates
3450                .into_iter()
3451                .chain(epoch_change_ops.next())
3452                .collect::<Vec<_>>();
3453            if incoming_bundles.is_empty() && block_operations.is_empty() {
3454                return Ok((certificates, None));
3455            }
3456            match self.execute_block(block_operations, vec![]).await {
3457                Ok(ExecuteBlockOutcome::Executed(certificate))
3458                | Ok(ExecuteBlockOutcome::Conflict(certificate)) => certificates.push(certificate),
3459                Ok(ExecuteBlockOutcome::WaitForTimeout(timeout)) => {
3460                    return Ok((certificates, Some(timeout)));
3461                }
3462                Err(error) => return Err(error),
3463            };
3464        }
3465    }
3466
3467    /// Returns operations to process all pending epoch changes: first the new epochs, in order,
3468    /// then the removed epochs, in order.
3469    async fn collect_epoch_changes(&self) -> Result<Vec<Operation>, ChainClientError> {
3470        let (mut min_epoch, mut next_epoch) = {
3471            let (epoch, committees) = self.epoch_and_committees().await?;
3472            let min_epoch = *committees.keys().next().unwrap_or(&Epoch::ZERO);
3473            (min_epoch, epoch.try_add_one()?)
3474        };
3475        let mut epoch_change_ops = Vec::new();
3476        while self
3477            .has_admin_event(EPOCH_STREAM_NAME, next_epoch.0)
3478            .await?
3479        {
3480            epoch_change_ops.push(Operation::system(SystemOperation::ProcessNewEpoch(
3481                next_epoch,
3482            )));
3483            next_epoch.try_add_assign_one()?;
3484        }
3485        while self
3486            .has_admin_event(REMOVED_EPOCH_STREAM_NAME, min_epoch.0)
3487            .await?
3488        {
3489            epoch_change_ops.push(Operation::system(SystemOperation::ProcessRemovedEpoch(
3490                min_epoch,
3491            )));
3492            min_epoch.try_add_assign_one()?;
3493        }
3494        Ok(epoch_change_ops)
3495    }
3496
3497    /// Returns whether the system event on the admin chain with the given stream name and key
3498    /// exists in storage.
3499    async fn has_admin_event(
3500        &self,
3501        stream_name: &[u8],
3502        index: u32,
3503    ) -> Result<bool, ChainClientError> {
3504        let event_id = EventId {
3505            chain_id: self.client.admin_id,
3506            stream_id: StreamId::system(stream_name),
3507            index,
3508        };
3509        Ok(self
3510            .client
3511            .storage_client()
3512            .read_event(event_id)
3513            .await?
3514            .is_some())
3515    }
3516
3517    /// Returns the indices and events from the storage
3518    pub async fn events_from_index(
3519        &self,
3520        stream_id: StreamId,
3521        start_index: u32,
3522    ) -> Result<Vec<IndexAndEvent>, ChainClientError> {
3523        Ok(self
3524            .client
3525            .storage_client()
3526            .read_events_from_index(&self.chain_id, &stream_id, start_index)
3527            .await?)
3528    }
3529
3530    /// Deprecates all the configurations of voting rights up to the given one (admin chains
3531    /// only). Currently, each individual chain is still entitled to wait before accepting
3532    /// this command. However, it is expected that deprecated validators stop functioning
3533    /// shortly after such command is issued.
3534    #[instrument(level = "trace")]
3535    pub async fn revoke_epochs(
3536        &self,
3537        revoked_epoch: Epoch,
3538    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3539        self.prepare_chain().await?;
3540        let (current_epoch, committees) = self.epoch_and_committees().await?;
3541        ensure!(
3542            revoked_epoch < current_epoch,
3543            ChainClientError::CannotRevokeCurrentEpoch(current_epoch)
3544        );
3545        ensure!(
3546            committees.contains_key(&revoked_epoch),
3547            ChainClientError::EpochAlreadyRevoked
3548        );
3549        let operations = committees
3550            .keys()
3551            .filter_map(|epoch| {
3552                if *epoch <= revoked_epoch {
3553                    Some(Operation::system(SystemOperation::Admin(
3554                        AdminOperation::RemoveCommittee { epoch: *epoch },
3555                    )))
3556                } else {
3557                    None
3558                }
3559            })
3560            .collect();
3561        self.execute_operations(operations, vec![]).await
3562    }
3563
3564    /// Sends money to a chain.
3565    /// Do not check balance. (This may block the client)
3566    /// Do not confirm the transaction.
3567    #[instrument(level = "trace")]
3568    pub async fn transfer_to_account_unsafe_unconfirmed(
3569        &self,
3570        owner: AccountOwner,
3571        amount: Amount,
3572        recipient: Account,
3573    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, ChainClientError> {
3574        self.execute_operation(SystemOperation::Transfer {
3575            owner,
3576            recipient,
3577            amount,
3578        })
3579        .await
3580    }
3581
3582    #[instrument(level = "trace", skip(hash))]
3583    pub async fn read_confirmed_block(
3584        &self,
3585        hash: CryptoHash,
3586    ) -> Result<ConfirmedBlock, ChainClientError> {
3587        let block = self
3588            .client
3589            .storage_client()
3590            .read_confirmed_block(hash)
3591            .await?;
3592        block.ok_or(ChainClientError::MissingConfirmedBlock(hash))
3593    }
3594
3595    /// Handles any cross-chain requests for any pending outgoing messages.
3596    #[instrument(level = "trace")]
3597    pub async fn retry_pending_outgoing_messages(&self) -> Result<(), ChainClientError> {
3598        self.client
3599            .local_node
3600            .retry_pending_cross_chain_requests(self.chain_id)
3601            .await?;
3602        Ok(())
3603    }
3604
3605    #[instrument(level = "trace", skip(local_node))]
3606    async fn local_chain_info(
3607        &self,
3608        chain_id: ChainId,
3609        local_node: &mut LocalNodeClient<Env::Storage>,
3610    ) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
3611        match local_node.chain_info(chain_id).await {
3612            Ok(info) => {
3613                // Useful in case `chain_id` is the same as a local chain.
3614                self.client.update_from_info(&info);
3615                Ok(Some(info))
3616            }
3617            Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => Ok(None),
3618            Err(err) => Err(err.into()),
3619        }
3620    }
3621
3622    #[instrument(level = "trace", skip(chain_id, local_node))]
3623    async fn local_next_block_height(
3624        &self,
3625        chain_id: ChainId,
3626        local_node: &mut LocalNodeClient<Env::Storage>,
3627    ) -> Result<BlockHeight, ChainClientError> {
3628        Ok(self
3629            .local_chain_info(chain_id, local_node)
3630            .await?
3631            .map_or(BlockHeight::ZERO, |info| info.next_block_height))
3632    }
3633
3634    /// Returns the next height we expect to receive from the given sender chain, according to the
3635    /// local inbox.
3636    #[instrument(level = "trace")]
3637    async fn local_next_height_to_receive(
3638        &self,
3639        origin: ChainId,
3640    ) -> Result<BlockHeight, ChainClientError> {
3641        let chain = self.chain_state_view().await?;
3642        Ok(match chain.inboxes.try_load_entry(&origin).await? {
3643            Some(inbox) => inbox.next_block_height_to_receive()?,
3644            None => BlockHeight::ZERO,
3645        })
3646    }
3647
3648    #[instrument(level = "trace", skip(remote_node, local_node, notification))]
3649    async fn process_notification(
3650        &self,
3651        remote_node: RemoteNode<Env::ValidatorNode>,
3652        mut local_node: LocalNodeClient<Env::Storage>,
3653        notification: Notification,
3654        listening_mode: &ListeningMode,
3655    ) -> Result<(), ChainClientError> {
3656        match notification.reason {
3657            Reason::NewIncomingBundle { origin, height } => {
3658                if self.local_next_height_to_receive(origin).await? > height {
3659                    debug!(
3660                        chain_id = %self.chain_id,
3661                        "Accepting redundant notification for new message"
3662                    );
3663                    return Ok(());
3664                }
3665                self.find_received_certificates_from_validator(remote_node)
3666                    .await?;
3667                if self.local_next_height_to_receive(origin).await? <= height {
3668                    warn!(
3669                        chain_id = %self.chain_id,
3670                        "NewIncomingBundle: Fail to synchronize new message after notification"
3671                    );
3672                }
3673            }
3674            Reason::NewBlock { height, .. } => {
3675                let chain_id = notification.chain_id;
3676                if self
3677                    .local_next_block_height(chain_id, &mut local_node)
3678                    .await?
3679                    > height
3680                {
3681                    debug!(
3682                        chain_id = %self.chain_id,
3683                        "Accepting redundant notification for new block"
3684                    );
3685                    return Ok(());
3686                }
3687                match listening_mode {
3688                    ListeningMode::FullChain => {
3689                        self.client
3690                            .synchronize_chain_state_from(&remote_node, chain_id)
3691                            .await?;
3692                        if self
3693                            .local_next_block_height(chain_id, &mut local_node)
3694                            .await?
3695                            <= height
3696                        {
3697                            error!("NewBlock: Fail to synchronize new block after notification");
3698                        }
3699                        trace!(
3700                            chain_id = %self.chain_id,
3701                            %height,
3702                            "NewBlock: processed notification",
3703                        );
3704                    }
3705                    ListeningMode::EventsOnly(_) => {
3706                        debug!(
3707                            chain_id = %self.chain_id,
3708                            %height,
3709                            "NewBlock: ignoring notification due to listening in EventsOnly mode"
3710                        );
3711                    }
3712                }
3713            }
3714            Reason::NewEvents {
3715                height,
3716                hash,
3717                event_streams,
3718            } => {
3719                if self
3720                    .local_next_block_height(notification.chain_id, &mut local_node)
3721                    .await?
3722                    > height
3723                {
3724                    debug!(
3725                        chain_id = %self.chain_id,
3726                        "Accepting redundant notification for new block"
3727                    );
3728                    return Ok(());
3729                }
3730                let should_process = match listening_mode {
3731                    ListeningMode::FullChain => true,
3732                    ListeningMode::EventsOnly(relevant_events) => relevant_events
3733                        .intersection(&event_streams)
3734                        .next()
3735                        .is_some(),
3736                };
3737                if !should_process {
3738                    debug!(
3739                        chain_id = %self.chain_id,
3740                        %height,
3741                        "NewEvents: got a notification, but no relevant event streams in it"
3742                    );
3743                    return Ok(());
3744                }
3745                trace!(
3746                    chain_id = %self.chain_id,
3747                    %height,
3748                    "NewEvents: processing notification"
3749                );
3750                let mut certificates = remote_node.node.download_certificates(vec![hash]).await?;
3751                // download_certificates ensures that we will get exactly one
3752                // certificate in the result
3753                let certificate = certificates
3754                    .pop()
3755                    .expect("download_certificates should have returned one certificate");
3756                self.client
3757                    .receive_sender_certificate(
3758                        certificate,
3759                        ReceiveCertificateMode::NeedsCheck,
3760                        None,
3761                    )
3762                    .await?;
3763            }
3764            Reason::NewRound { height, round } => {
3765                if matches!(listening_mode, ListeningMode::EventsOnly(_)) {
3766                    debug!("NewRound: ignoring a notification due to listening mode");
3767                    return Ok(());
3768                }
3769                let chain_id = notification.chain_id;
3770                if let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? {
3771                    if (info.next_block_height, info.manager.current_round) >= (height, round) {
3772                        debug!(
3773                            chain_id = %self.chain_id,
3774                            "Accepting redundant notification for new round"
3775                        );
3776                        return Ok(());
3777                    }
3778                }
3779                self.client
3780                    .synchronize_chain_state_from(&remote_node, chain_id)
3781                    .await?;
3782                let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? else {
3783                    error!(
3784                        chain_id = %self.chain_id,
3785                        "NewRound: Fail to read local chain info for {chain_id}"
3786                    );
3787                    return Ok(());
3788                };
3789                if (info.next_block_height, info.manager.current_round) < (height, round) {
3790                    error!(
3791                        chain_id = %self.chain_id,
3792                        "NewRound: Fail to synchronize new block after notification"
3793                    );
3794                }
3795            }
3796        }
3797        Ok(())
3798    }
3799
3800    /// Returns whether this chain is tracked by the client, i.e. we are updating its inbox.
3801    pub fn is_tracked(&self) -> bool {
3802        self.client
3803            .tracked_chains
3804            .read()
3805            .unwrap()
3806            .contains(&self.chain_id)
3807    }
3808
3809    /// Spawns a task that listens to notifications about the current chain from all validators,
3810    /// and synchronizes the local state accordingly.
3811    #[instrument(level = "trace", fields(chain_id = ?self.chain_id))]
3812    pub async fn listen(
3813        &self,
3814        listening_mode: ListeningMode,
3815    ) -> Result<(impl Future<Output = ()>, AbortOnDrop, NotificationStream), ChainClientError> {
3816        use future::FutureExt as _;
3817
3818        async fn await_while_polling<F: FusedFuture>(
3819            future: F,
3820            background_work: impl FusedStream<Item = ()>,
3821        ) -> F::Output {
3822            tokio::pin!(future);
3823            tokio::pin!(background_work);
3824            loop {
3825                futures::select! {
3826                    _ = background_work.next() => (),
3827                    result = future => return result,
3828                }
3829            }
3830        }
3831
3832        let mut senders = HashMap::new(); // Senders to cancel notification streams.
3833        let notifications = self.subscribe()?;
3834        let (abortable_notifications, abort) = stream::abortable(self.subscribe()?);
3835
3836        // Beware: if this future ceases to make progress, notification processing will
3837        // deadlock, because of the issue described in
3838        // https://github.com/linera-io/linera-protocol/pull/1173.
3839
3840        // TODO(#2013): replace this lock with an asynchronous communication channel
3841
3842        let mut process_notifications = FuturesUnordered::new();
3843
3844        match self
3845            .update_notification_streams(&mut senders, &listening_mode)
3846            .await
3847        {
3848            Ok(handler) => process_notifications.push(handler),
3849            Err(error) => error!("Failed to update committee: {error}"),
3850        };
3851
3852        let this = self.clone();
3853        let update_streams = async move {
3854            let mut abortable_notifications = abortable_notifications.fuse();
3855
3856            while let Some(notification) =
3857                await_while_polling(abortable_notifications.next(), &mut process_notifications)
3858                    .await
3859            {
3860                if let Reason::NewBlock { .. } = notification.reason {
3861                    match Box::pin(await_while_polling(
3862                        this.update_notification_streams(&mut senders, &listening_mode)
3863                            .fuse(),
3864                        &mut process_notifications,
3865                    ))
3866                    .await
3867                    {
3868                        Ok(handler) => process_notifications.push(handler),
3869                        Err(error) => error!("Failed to update committee: {error}"),
3870                    }
3871                }
3872            }
3873
3874            for abort in senders.into_values() {
3875                abort.abort();
3876            }
3877
3878            let () = process_notifications.collect().await;
3879        }
3880        .in_current_span();
3881
3882        Ok((update_streams, AbortOnDrop(abort), notifications))
3883    }
3884
3885    #[instrument(level = "trace", skip(senders))]
3886    async fn update_notification_streams(
3887        &self,
3888        senders: &mut HashMap<ValidatorPublicKey, AbortHandle>,
3889        listening_mode: &ListeningMode,
3890    ) -> Result<impl Future<Output = ()>, ChainClientError> {
3891        let (nodes, local_node) = {
3892            let committee = self.local_committee().await?;
3893            let nodes: HashMap<_, _> = self
3894                .client
3895                .validator_node_provider()
3896                .make_nodes(&committee)?
3897                .collect();
3898            (nodes, self.client.local_node.clone())
3899        };
3900        // Drop removed validators.
3901        senders.retain(|validator, abort| {
3902            if !nodes.contains_key(validator) {
3903                abort.abort();
3904            }
3905            !abort.is_aborted()
3906        });
3907        // Add tasks for new validators.
3908        let validator_tasks = FuturesUnordered::new();
3909        for (public_key, node) in nodes {
3910            let hash_map::Entry::Vacant(entry) = senders.entry(public_key) else {
3911                continue;
3912            };
3913            let this = self.clone();
3914            let stream = stream::once({
3915                let node = node.clone();
3916                async move {
3917                    let stream = node.subscribe(vec![this.chain_id]).await?;
3918                    // Only now the notification stream is established. We may have missed
3919                    // notifications since the last time we synchronized.
3920                    let remote_node = RemoteNode { public_key, node };
3921                    this.client
3922                        .synchronize_chain_state_from(&remote_node, this.chain_id)
3923                        .await?;
3924                    Ok::<_, ChainClientError>(stream)
3925                }
3926            })
3927            .filter_map(move |result| async move {
3928                if let Err(error) = &result {
3929                    warn!(?error, "Could not connect to validator {public_key}");
3930                } else {
3931                    info!("Connected to validator {public_key}");
3932                }
3933                result.ok()
3934            })
3935            .flatten();
3936            let (stream, abort) = stream::abortable(stream);
3937            let mut stream = Box::pin(stream);
3938            let this = self.clone();
3939            let local_node = local_node.clone();
3940            let remote_node = RemoteNode { public_key, node };
3941            let listening_mode_cloned = listening_mode.clone();
3942            validator_tasks.push(async move {
3943                while let Some(notification) = stream.next().await {
3944                    if let Err(err) = this
3945                        .process_notification(
3946                            remote_node.clone(),
3947                            local_node.clone(),
3948                            notification.clone(),
3949                            &listening_mode_cloned,
3950                        )
3951                        .await
3952                    {
3953                        tracing::warn!(
3954                            chain_id = %this.chain_id,
3955                            validator_public_key = ?remote_node.public_key,
3956                            ?notification,
3957                            "Failed to process notification: {err}",
3958                        );
3959                    }
3960                }
3961            });
3962            entry.insert(abort);
3963        }
3964        Ok(validator_tasks.collect())
3965    }
3966
3967    /// Attempts to download new received certificates from a particular validator.
3968    ///
3969    /// This is similar to `find_received_certificates` but for only one validator.
3970    /// We also don't try to synchronize the admin chain.
3971    #[instrument(level = "trace")]
3972    async fn find_received_certificates_from_validator(
3973        &self,
3974        remote_node: RemoteNode<Env::ValidatorNode>,
3975    ) -> Result<(), ChainClientError> {
3976        let chain_id = self.chain_id;
3977        // Proceed to downloading received certificates.
3978        let received_certificates = self
3979            .client
3980            .synchronize_received_certificates_from_validator(chain_id, &remote_node)
3981            .await?;
3982        // Process received certificates. If the client state has changed during the
3983        // network calls, we should still be fine.
3984        self.receive_certificates_from_validators(vec![received_certificates])
3985            .await;
3986        Ok(())
3987    }
3988
3989    /// Attempts to update a validator with the local information.
3990    #[instrument(level = "trace", skip(remote_node))]
3991    pub async fn sync_validator(
3992        &self,
3993        remote_node: Env::ValidatorNode,
3994    ) -> Result<(), ChainClientError> {
3995        let validator_next_block_height = match remote_node
3996            .handle_chain_info_query(ChainInfoQuery::new(self.chain_id))
3997            .await
3998        {
3999            Ok(info) => info.info.next_block_height.0,
4000            Err(NodeError::BlobsNotFound(_)) => 0,
4001            Err(err) => return Err(err.into()),
4002        };
4003        let local_chain_state = self.chain_info().await?;
4004
4005        let Some(missing_certificate_count) = local_chain_state
4006            .next_block_height
4007            .0
4008            .checked_sub(validator_next_block_height)
4009            .filter(|count| *count > 0)
4010        else {
4011            debug!("Validator is up-to-date with local state");
4012            return Ok(());
4013        };
4014
4015        let missing_certificates_end = usize::try_from(local_chain_state.next_block_height.0)
4016            .expect("`usize` should be at least `u64`");
4017        let missing_certificates_start = missing_certificates_end
4018            - usize::try_from(missing_certificate_count).expect("`usize` should be at least `u64`");
4019
4020        let missing_certificate_hashes = self
4021            .chain_state_view()
4022            .await?
4023            .confirmed_log
4024            .read(missing_certificates_start..missing_certificates_end)
4025            .await?;
4026
4027        let certificates = self
4028            .client
4029            .storage_client()
4030            .read_certificates(missing_certificate_hashes.clone())
4031            .await?;
4032        let certificates =
4033            match ResultReadCertificates::new(certificates, missing_certificate_hashes) {
4034                ResultReadCertificates::Certificates(certificates) => certificates,
4035                ResultReadCertificates::InvalidHashes(hashes) => {
4036                    return Err(ChainClientError::ReadCertificatesError(hashes))
4037                }
4038            };
4039        for certificate in certificates {
4040            match remote_node
4041                .handle_confirmed_certificate(
4042                    certificate.clone(),
4043                    CrossChainMessageDelivery::NonBlocking,
4044                )
4045                .await
4046            {
4047                Ok(_) => (),
4048                Err(NodeError::BlobsNotFound(missing_blob_ids)) => {
4049                    // Upload the missing blobs we have and retry.
4050                    let missing_blobs: Vec<_> = self
4051                        .client
4052                        .storage_client()
4053                        .read_blobs(&missing_blob_ids)
4054                        .await?
4055                        .into_iter()
4056                        .flatten()
4057                        .collect();
4058                    remote_node.upload_blobs(missing_blobs).await?;
4059                    remote_node
4060                        .handle_confirmed_certificate(
4061                            certificate,
4062                            CrossChainMessageDelivery::NonBlocking,
4063                        )
4064                        .await?;
4065                }
4066                Err(err) => return Err(err.into()),
4067            }
4068        }
4069
4070        Ok(())
4071    }
4072
4073    /// Returns whether the given ownership includes anyone whose secret key we don't have.
4074    fn has_other_owners(&self, ownership: &ChainOwnership) -> bool {
4075        ownership
4076            .all_owners()
4077            .any(|owner| Some(owner) != self.preferred_owner.as_ref())
4078    }
4079}
4080
4081#[cfg(with_testing)]
4082impl<Env: Environment> ChainClient<Env> {
4083    pub async fn process_notification_from(
4084        &self,
4085        notification: Notification,
4086        validator: (ValidatorPublicKey, &str),
4087    ) {
4088        let mut node_list = self
4089            .client
4090            .validator_node_provider()
4091            .make_nodes_from_list(vec![validator])
4092            .unwrap();
4093        let (public_key, node) = node_list.next().unwrap();
4094        let remote_node = RemoteNode { node, public_key };
4095        let local_node = self.client.local_node.clone();
4096        self.process_notification(
4097            remote_node,
4098            local_node,
4099            notification,
4100            &ListeningMode::FullChain,
4101        )
4102        .await
4103        .unwrap();
4104    }
4105}
4106
4107/// The outcome of trying to commit a list of incoming messages and operations to the chain.
4108#[derive(Debug)]
4109enum ExecuteBlockOutcome {
4110    /// A block with the messages and operations was committed.
4111    Executed(ConfirmedBlockCertificate),
4112    /// A different block was already proposed and got committed. Check whether the messages and
4113    /// operations are still suitable, and try again at the next block height.
4114    Conflict(ConfirmedBlockCertificate),
4115    /// We are not the round leader and cannot do anything. Try again at the specified time or
4116    /// or whenever the round or block height changes.
4117    WaitForTimeout(RoundTimeout),
4118}
4119
4120/// Wrapper for `AbortHandle` that aborts when its dropped.
4121#[must_use]
4122pub struct AbortOnDrop(pub AbortHandle);
4123
4124impl Drop for AbortOnDrop {
4125    #[instrument(level = "trace", skip(self))]
4126    fn drop(&mut self) {
4127        self.0.abort();
4128    }
4129}
4130
4131/// The result of `synchronize_received_certificates_from_validator`.
4132struct ReceivedCertificatesFromValidator {
4133    /// The name of the validator we downloaded from.
4134    public_key: ValidatorPublicKey,
4135    /// The new tracker value for that validator.
4136    tracker: u64,
4137    /// The downloaded certificates. The signatures were already checked and they are ready
4138    /// to be processed.
4139    certificates: Vec<ConfirmedBlockCertificate>,
4140    /// Sender chains that were already up to date locally. We need to ensure their messages
4141    /// are delivered.
4142    other_sender_chains: Vec<ChainId>,
4143}
4144
4145/// A pending proposed block, together with its published blobs.
4146#[derive(Clone, Serialize, Deserialize)]
4147pub struct PendingProposal {
4148    pub block: ProposedBlock,
4149    pub blobs: Vec<Blob>,
4150}
4151
4152enum ReceiveCertificateMode {
4153    NeedsCheck,
4154    AlreadyChecked,
4155}
4156
4157enum CheckCertificateResult {
4158    OldEpoch,
4159    New,
4160    FutureEpoch,
4161}
4162
4163impl CheckCertificateResult {
4164    fn into_result(self) -> Result<(), ChainClientError> {
4165        match self {
4166            Self::OldEpoch => Err(ChainClientError::CommitteeDeprecationError),
4167            Self::New => Ok(()),
4168            Self::FutureEpoch => Err(ChainClientError::CommitteeSynchronizationError),
4169        }
4170    }
4171}
4172
4173/// Creates a compressed Contract, Service and bytecode.
4174#[cfg(not(target_arch = "wasm32"))]
4175pub async fn create_bytecode_blobs(
4176    contract: Bytecode,
4177    service: Bytecode,
4178    vm_runtime: VmRuntime,
4179) -> (Vec<Blob>, ModuleId) {
4180    match vm_runtime {
4181        VmRuntime::Wasm => {
4182            let (compressed_contract, compressed_service) =
4183                tokio::task::spawn_blocking(move || (contract.compress(), service.compress()))
4184                    .await
4185                    .expect("Compression should not panic");
4186            let contract_blob = Blob::new_contract_bytecode(compressed_contract);
4187            let service_blob = Blob::new_service_bytecode(compressed_service);
4188            let module_id =
4189                ModuleId::new(contract_blob.id().hash, service_blob.id().hash, vm_runtime);
4190            (vec![contract_blob, service_blob], module_id)
4191        }
4192        VmRuntime::Evm => {
4193            let compressed_contract = contract.compress();
4194            let evm_contract_blob = Blob::new_evm_bytecode(compressed_contract);
4195            let module_id = ModuleId::new(
4196                evm_contract_blob.id().hash,
4197                evm_contract_blob.id().hash,
4198                vm_runtime,
4199            );
4200            (vec![evm_contract_blob], module_id)
4201        }
4202    }
4203}