linera_core/client/chain_client/
mod.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4mod state;
5use std::{
6    collections::{hash_map, BTreeMap, BTreeSet, HashMap},
7    convert::Infallible,
8    iter,
9    sync::Arc,
10};
11
12use custom_debug_derive::Debug;
13use futures::{
14    future::{self, Either, FusedFuture, Future, FutureExt},
15    select,
16    stream::{self, AbortHandle, FusedStream, FuturesUnordered, StreamExt, TryStreamExt},
17};
18#[cfg(with_metrics)]
19use linera_base::prometheus_util::MeasureLatency as _;
20use linera_base::{
21    abi::Abi,
22    crypto::{signer, AccountPublicKey, CryptoHash, Signer, ValidatorPublicKey},
23    data_types::{
24        Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
25        ChainDescription, Epoch, Round, Timestamp,
26    },
27    ensure,
28    identifiers::{
29        Account, AccountOwner, ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent,
30        ModuleId, StreamId,
31    },
32    ownership::{ChainOwnership, TimeoutConfig},
33    time::{Duration, Instant},
34};
35#[cfg(not(target_arch = "wasm32"))]
36use linera_base::{data_types::Bytecode, vm::VmRuntime};
37use linera_chain::{
38    data_types::{BlockProposal, ChainAndHeight, IncomingBundle, ProposedBlock, Transaction},
39    manager::LockingBlock,
40    types::{
41        Block, ConfirmedBlock, ConfirmedBlockCertificate, Timeout, TimeoutCertificate,
42        ValidatedBlock,
43    },
44    ChainError, ChainExecutionContext, ChainStateView,
45};
46use linera_execution::{
47    committee::Committee,
48    system::{
49        AdminOperation, OpenChainConfig, SystemOperation, EPOCH_STREAM_NAME,
50        REMOVED_EPOCH_STREAM_NAME,
51    },
52    ExecutionError, Operation, Query, QueryOutcome, QueryResponse, SystemQuery, SystemResponse,
53};
54use linera_storage::{Clock as _, ResultReadCertificates, Storage as _};
55use linera_views::ViewError;
56use rand::seq::SliceRandom;
57use serde::Serialize;
58pub use state::State;
59use thiserror::Error;
60use tokio::sync::{mpsc, OwnedRwLockReadGuard};
61use tokio_stream::wrappers::UnboundedReceiverStream;
62use tokio_util::sync::CancellationToken;
63use tracing::{debug, error, info, instrument, trace, warn, Instrument as _};
64
65use super::{
66    received_log::ReceivedLogs, validator_trackers::ValidatorTrackers, AbortOnDrop, Client,
67    ExecuteBlockOutcome, ListeningMode, MessagePolicy, PendingProposal, ReceiveCertificateMode,
68    TimingType,
69};
70use crate::{
71    data_types::{ChainInfo, ChainInfoQuery, ClientOutcome, RoundTimeout},
72    environment::Environment,
73    local_node::{LocalChainInfoExt as _, LocalNodeClient, LocalNodeError},
74    node::{
75        CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
76        ValidatorNodeProvider as _,
77    },
78    remote_node::RemoteNode,
79    updater::{communicate_with_quorum, CommunicateAction, CommunicationError},
80    worker::{Notification, Reason, WorkerError},
81};
82
83#[derive(Debug, Clone)]
84pub struct Options {
85    /// Maximum number of pending message bundles processed at a time in a block.
86    pub max_pending_message_bundles: usize,
87    /// The policy for automatically handling incoming messages.
88    pub message_policy: MessagePolicy,
89    /// Whether to block on cross-chain message delivery.
90    pub cross_chain_message_delivery: CrossChainMessageDelivery,
91    /// An additional delay, after reaching a quorum, to wait for additional validator signatures,
92    /// as a fraction of time taken to reach quorum.
93    pub grace_period: f64,
94    /// The delay when downloading a blob, after which we try a second validator.
95    pub blob_download_timeout: Duration,
96    /// The delay when downloading a batch of certificates, after which we try a second validator.
97    pub certificate_batch_download_timeout: Duration,
98    /// Maximum number of certificates that we download at a time from one validator when
99    /// synchronizing one of our chains.
100    pub certificate_download_batch_size: u64,
101    /// Maximum number of sender certificates we try to download and receive in one go
102    /// when syncing sender chains.
103    pub sender_certificate_download_batch_size: usize,
104    /// Maximum number of tasks that can be joined concurrently using buffer_unordered.
105    pub max_joined_tasks: usize,
106}
107
108#[cfg(with_testing)]
109impl Options {
110    pub fn test_default() -> Self {
111        use super::{
112            DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE, DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
113        };
114        use crate::DEFAULT_GRACE_PERIOD;
115
116        Options {
117            max_pending_message_bundles: 10,
118            message_policy: MessagePolicy::new_accept_all(),
119            cross_chain_message_delivery: CrossChainMessageDelivery::NonBlocking,
120            grace_period: DEFAULT_GRACE_PERIOD,
121            blob_download_timeout: Duration::from_secs(1),
122            certificate_batch_download_timeout: Duration::from_secs(1),
123            certificate_download_batch_size: DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
124            sender_certificate_download_batch_size: DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
125            max_joined_tasks: 100,
126        }
127    }
128}
129
130/// Client to operate a chain by interacting with validators and the given local storage
131/// implementation.
132/// * The chain being operated is called the "local chain" or just the "chain".
133/// * As a rule, operations are considered successful (and communication may stop) when
134///   they succeeded in gathering a quorum of responses.
135#[derive(Debug)]
136pub struct ChainClient<Env: Environment> {
137    /// The Linera [`Client`] that manages operations for this chain client.
138    #[debug(skip)]
139    pub(crate) client: Arc<Client<Env>>,
140    /// The off-chain chain ID.
141    chain_id: ChainId,
142    /// The client options.
143    #[debug(skip)]
144    options: Options,
145    /// The preferred owner of the chain used to sign proposals.
146    /// `None` if we cannot propose on this chain.
147    preferred_owner: Option<AccountOwner>,
148    /// The next block height as read from the wallet.
149    initial_next_block_height: BlockHeight,
150    /// The last block hash as read from the wallet.
151    initial_block_hash: Option<CryptoHash>,
152    /// Optional timing sender for benchmarking.
153    timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
154}
155
156impl<Env: Environment> Clone for ChainClient<Env> {
157    fn clone(&self) -> Self {
158        Self {
159            client: self.client.clone(),
160            chain_id: self.chain_id,
161            options: self.options.clone(),
162            preferred_owner: self.preferred_owner,
163            initial_next_block_height: self.initial_next_block_height,
164            initial_block_hash: self.initial_block_hash,
165            timing_sender: self.timing_sender.clone(),
166        }
167    }
168}
169
170/// Error type for [`ChainClient`].
171#[derive(Debug, Error)]
172pub enum Error {
173    #[error("Local node operation failed: {0}")]
174    LocalNodeError(#[from] LocalNodeError),
175
176    #[error("Remote node operation failed: {0}")]
177    RemoteNodeError(#[from] NodeError),
178
179    #[error(transparent)]
180    ArithmeticError(#[from] ArithmeticError),
181
182    #[error("Missing certificates: {0:?}")]
183    ReadCertificatesError(Vec<CryptoHash>),
184
185    #[error("Missing confirmed block: {0:?}")]
186    MissingConfirmedBlock(CryptoHash),
187
188    #[error("JSON (de)serialization error: {0}")]
189    JsonError(#[from] serde_json::Error),
190
191    #[error("Chain operation failed: {0}")]
192    ChainError(#[from] ChainError),
193
194    #[error(transparent)]
195    CommunicationError(#[from] CommunicationError<NodeError>),
196
197    #[error("Internal error within chain client: {0}")]
198    InternalError(&'static str),
199
200    #[error(
201        "Cannot accept a certificate from an unknown committee in the future. \
202         Please synchronize the local view of the admin chain"
203    )]
204    CommitteeSynchronizationError,
205
206    #[error("The local node is behind the trusted state in wallet and needs synchronization with validators")]
207    WalletSynchronizationError,
208
209    #[error("The state of the client is incompatible with the proposed block: {0}")]
210    BlockProposalError(&'static str),
211
212    #[error(
213        "Cannot accept a certificate from a committee that was retired. \
214         Try a newer certificate from the same origin"
215    )]
216    CommitteeDeprecationError,
217
218    #[error("Protocol error within chain client: {0}")]
219    ProtocolError(&'static str),
220
221    #[error("Signer doesn't have key to sign for chain {0}")]
222    CannotFindKeyForChain(ChainId),
223
224    #[error("client is not configured to propose on chain {0}")]
225    NoAccountKeyConfigured(ChainId),
226
227    #[error("The chain client isn't owner on chain {0}")]
228    NotAnOwner(ChainId),
229
230    #[error(transparent)]
231    ViewError(#[from] ViewError),
232
233    #[error(
234        "Failed to download certificates and update local node to the next height \
235         {target_next_block_height} of chain {chain_id}"
236    )]
237    CannotDownloadCertificates {
238        chain_id: ChainId,
239        target_next_block_height: BlockHeight,
240    },
241
242    #[error(transparent)]
243    BcsError(#[from] bcs::Error),
244
245    #[error(
246        "Unexpected quorum: validators voted for block {hash} in {round}, \
247         expected block {expected_hash} in {expected_round}"
248    )]
249    UnexpectedQuorum {
250        hash: CryptoHash,
251        round: Round,
252        expected_hash: CryptoHash,
253        expected_round: Round,
254    },
255
256    #[error("signer error: {0:?}")]
257    Signer(#[source] Box<dyn signer::Error>),
258
259    #[error("Cannot revoke the current epoch {0}")]
260    CannotRevokeCurrentEpoch(Epoch),
261
262    #[error("Epoch is already revoked")]
263    EpochAlreadyRevoked,
264
265    #[error("Failed to download missing sender blocks from chain {chain_id} at height {height}")]
266    CannotDownloadMissingSenderBlock {
267        chain_id: ChainId,
268        height: BlockHeight,
269    },
270}
271
272impl From<Infallible> for Error {
273    fn from(infallible: Infallible) -> Self {
274        match infallible {}
275    }
276}
277
278impl Error {
279    pub fn signer_failure(err: impl signer::Error + 'static) -> Self {
280        Self::Signer(Box::new(err))
281    }
282}
283
284impl<Env: Environment> ChainClient<Env> {
285    pub fn new(
286        client: Arc<Client<Env>>,
287        chain_id: ChainId,
288        options: Options,
289        initial_block_hash: Option<CryptoHash>,
290        initial_next_block_height: BlockHeight,
291        preferred_owner: Option<AccountOwner>,
292        timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
293    ) -> Self {
294        ChainClient {
295            client,
296            chain_id,
297            options,
298            preferred_owner,
299            initial_block_hash,
300            initial_next_block_height,
301            timing_sender,
302        }
303    }
304
305    /// Gets the client mutex from the chain's state.
306    #[instrument(level = "trace", skip(self))]
307    fn client_mutex(&self) -> Arc<tokio::sync::Mutex<()>> {
308        self.client
309            .chains
310            .pin()
311            .get(&self.chain_id)
312            .expect("Chain client constructed for invalid chain")
313            .client_mutex()
314    }
315
316    /// Gets the next pending block.
317    #[instrument(level = "trace", skip(self))]
318    pub fn pending_proposal(&self) -> Option<PendingProposal> {
319        self.client
320            .chains
321            .pin()
322            .get(&self.chain_id)
323            .expect("Chain client constructed for invalid chain")
324            .pending_proposal()
325            .clone()
326    }
327
328    /// Updates the chain's state using a closure.
329    #[instrument(level = "trace", skip(self, f))]
330    fn update_state<F>(&self, f: F)
331    where
332        F: Fn(&mut State),
333    {
334        let chains = self.client.chains.pin();
335        chains
336            .update(self.chain_id, |state| {
337                let mut state = state.clone_for_update_unchecked();
338                f(&mut state);
339                state
340            })
341            .expect("Chain client constructed for invalid chain");
342    }
343
344    /// Gets a reference to the client's signer instance.
345    #[instrument(level = "trace", skip(self))]
346    pub fn signer(&self) -> &impl Signer {
347        self.client.signer()
348    }
349
350    /// Gets a mutable reference to the per-`ChainClient` options.
351    #[instrument(level = "trace", skip(self))]
352    pub fn options_mut(&mut self) -> &mut Options {
353        &mut self.options
354    }
355
356    /// Gets a reference to the per-`ChainClient` options.
357    #[instrument(level = "trace", skip(self))]
358    pub fn options(&self) -> &Options {
359        &self.options
360    }
361
362    /// Gets the ID of the associated chain.
363    #[instrument(level = "trace", skip(self))]
364    pub fn chain_id(&self) -> ChainId {
365        self.chain_id
366    }
367
368    /// Gets a clone of the timing sender for benchmarking.
369    pub fn timing_sender(&self) -> Option<mpsc::UnboundedSender<(u64, TimingType)>> {
370        self.timing_sender.clone()
371    }
372
373    /// Gets the ID of the admin chain.
374    #[instrument(level = "trace", skip(self))]
375    pub fn admin_id(&self) -> ChainId {
376        self.client.admin_id
377    }
378
379    /// Gets the currently preferred owner for signing the blocks.
380    #[instrument(level = "trace", skip(self))]
381    pub fn preferred_owner(&self) -> Option<AccountOwner> {
382        self.preferred_owner
383    }
384
385    /// Sets the new, preferred owner for signing the blocks.
386    #[instrument(level = "trace", skip(self))]
387    pub fn set_preferred_owner(&mut self, preferred_owner: AccountOwner) {
388        self.preferred_owner = Some(preferred_owner);
389    }
390
391    /// Unsets the preferred owner for signing the blocks.
392    #[instrument(level = "trace", skip(self))]
393    pub fn unset_preferred_owner(&mut self) {
394        self.preferred_owner = None;
395    }
396
397    /// Obtains a `ChainStateView` for this client's chain.
398    #[instrument(level = "trace")]
399    pub async fn chain_state_view(
400        &self,
401    ) -> Result<OwnedRwLockReadGuard<ChainStateView<Env::StorageContext>>, LocalNodeError> {
402        self.client.local_node.chain_state_view(self.chain_id).await
403    }
404
405    /// Returns chain IDs that this chain subscribes to.
406    #[instrument(level = "trace", skip(self))]
407    pub async fn event_stream_publishers(
408        &self,
409    ) -> Result<BTreeMap<ChainId, BTreeSet<StreamId>>, LocalNodeError> {
410        let mut publishers = self
411            .chain_state_view()
412            .await?
413            .execution_state
414            .system
415            .event_subscriptions
416            .indices()
417            .await?
418            .into_iter()
419            .fold(
420                BTreeMap::<ChainId, BTreeSet<StreamId>>::new(),
421                |mut map, (chain_id, stream_id)| {
422                    map.entry(chain_id).or_default().insert(stream_id);
423                    map
424                },
425            );
426        if self.chain_id != self.client.admin_id {
427            publishers.insert(
428                self.client.admin_id,
429                vec![
430                    StreamId::system(EPOCH_STREAM_NAME),
431                    StreamId::system(REMOVED_EPOCH_STREAM_NAME),
432                ]
433                .into_iter()
434                .collect(),
435            );
436        }
437        Ok(publishers)
438    }
439
440    /// Subscribes to notifications from this client's chain.
441    #[instrument(level = "trace")]
442    pub fn subscribe(&self) -> Result<NotificationStream, LocalNodeError> {
443        self.subscribe_to(self.chain_id)
444    }
445
446    /// Subscribes to notifications from the specified chain.
447    #[instrument(level = "trace")]
448    pub fn subscribe_to(&self, chain_id: ChainId) -> Result<NotificationStream, LocalNodeError> {
449        Ok(Box::pin(UnboundedReceiverStream::new(
450            self.client.notifier.subscribe(vec![chain_id]),
451        )))
452    }
453
454    /// Returns the storage client used by this client's local node.
455    #[instrument(level = "trace")]
456    pub fn storage_client(&self) -> &Env::Storage {
457        self.client.storage_client()
458    }
459
460    /// Obtains the basic `ChainInfo` data for the local chain.
461    #[instrument(level = "trace")]
462    pub async fn chain_info(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
463        let query = ChainInfoQuery::new(self.chain_id);
464        let response = self
465            .client
466            .local_node
467            .handle_chain_info_query(query)
468            .await?;
469        self.client.update_from_info(&response.info);
470        Ok(response.info)
471    }
472
473    /// Obtains the basic `ChainInfo` data for the local chain, with chain manager values.
474    #[instrument(level = "trace")]
475    pub async fn chain_info_with_manager_values(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
476        let query = ChainInfoQuery::new(self.chain_id)
477            .with_manager_values()
478            .with_committees();
479        let response = self
480            .client
481            .local_node
482            .handle_chain_info_query(query)
483            .await?;
484        self.client.update_from_info(&response.info);
485        Ok(response.info)
486    }
487
488    /// Returns the chain's description. Fetches it from the validators if necessary.
489    pub async fn get_chain_description(&self) -> Result<ChainDescription, Error> {
490        self.client.get_chain_description(self.chain_id).await
491    }
492
493    /// Obtains up to `self.options.max_pending_message_bundles` pending message bundles for the
494    /// local chain.
495    #[instrument(level = "trace")]
496    async fn pending_message_bundles(&self) -> Result<Vec<IncomingBundle>, Error> {
497        if self.options.message_policy.is_ignore() {
498            // Ignore all messages.
499            return Ok(Vec::new());
500        }
501
502        let query = ChainInfoQuery::new(self.chain_id).with_pending_message_bundles();
503        let info = self
504            .client
505            .local_node
506            .handle_chain_info_query(query)
507            .await?
508            .info;
509        if self.preferred_owner.is_some_and(|owner| {
510            info.manager
511                .ownership
512                .is_super_owner_no_regular_owners(&owner)
513        }) {
514            // There are only super owners; they are expected to sync manually.
515            ensure!(
516                info.next_block_height >= self.initial_next_block_height,
517                Error::WalletSynchronizationError
518            );
519        }
520
521        Ok(info
522            .requested_pending_message_bundles
523            .into_iter()
524            .filter_map(|mut bundle| {
525                self.options
526                    .message_policy
527                    .must_handle(&mut bundle)
528                    .then_some(bundle)
529            })
530            .take(self.options.max_pending_message_bundles)
531            .collect())
532    }
533
534    /// Returns an `UpdateStreams` operation that updates this client's chain about new events
535    /// in any of the streams its applications are subscribing to. Returns `None` if there are no
536    /// new events.
537    #[instrument(level = "trace")]
538    async fn collect_stream_updates(&self) -> Result<Option<Operation>, Error> {
539        // Load all our subscriptions.
540        let subscription_map = self
541            .chain_state_view()
542            .await?
543            .execution_state
544            .system
545            .event_subscriptions
546            .index_values()
547            .await?;
548        // Collect the indices of all new events.
549        let futures = subscription_map
550            .into_iter()
551            .map(|((chain_id, stream_id), subscriptions)| {
552                let client = self.client.clone();
553                async move {
554                    let chain = client.local_node.chain_state_view(chain_id).await?;
555                    if let Some(next_expected_index) = chain
556                        .next_expected_events
557                        .get(&stream_id)
558                        .await?
559                        .filter(|next_index| *next_index > subscriptions.next_index)
560                    {
561                        Ok(Some((chain_id, stream_id, next_expected_index)))
562                    } else {
563                        Ok::<_, Error>(None)
564                    }
565                }
566            });
567        let updates = futures::stream::iter(futures)
568            .buffer_unordered(self.options.max_joined_tasks)
569            .try_collect::<Vec<_>>()
570            .await?
571            .into_iter()
572            .flatten()
573            .collect::<Vec<_>>();
574        if updates.is_empty() {
575            return Ok(None);
576        }
577        Ok(Some(SystemOperation::UpdateStreams(updates).into()))
578    }
579
580    #[instrument(level = "trace")]
581    async fn chain_info_with_committees(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
582        self.client.chain_info_with_committees(self.chain_id).await
583    }
584
585    /// Obtains the current epoch of the local chain as well as its set of trusted committees.
586    #[instrument(level = "trace")]
587    async fn epoch_and_committees(
588        &self,
589    ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
590        let info = self.chain_info_with_committees().await?;
591        let epoch = info.epoch;
592        let committees = info.into_committees()?;
593        Ok((epoch, committees))
594    }
595
596    /// Obtains the committee for the current epoch of the local chain.
597    #[instrument(level = "trace")]
598    pub async fn local_committee(&self) -> Result<Committee, Error> {
599        let info = match self.chain_info_with_committees().await {
600            Ok(info) => info,
601            Err(LocalNodeError::BlobsNotFound(_)) => {
602                self.synchronize_chain_state(self.chain_id).await?;
603                self.chain_info_with_committees().await?
604            }
605            Err(err) => return Err(err.into()),
606        };
607        Ok(info.into_current_committee()?)
608    }
609
610    /// Obtains the committee for the latest epoch on the admin chain.
611    #[instrument(level = "trace")]
612    pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
613        self.client.admin_committee().await
614    }
615
616    /// Obtains the identity of the current owner of the chain.
617    ///
618    /// Returns an error if we don't have the private key for the identity.
619    #[instrument(level = "trace")]
620    pub async fn identity(&self) -> Result<AccountOwner, Error> {
621        let Some(preferred_owner) = self.preferred_owner else {
622            return Err(Error::NoAccountKeyConfigured(self.chain_id));
623        };
624        let manager = self.chain_info().await?.manager;
625        ensure!(
626            manager.ownership.is_active(),
627            LocalNodeError::InactiveChain(self.chain_id)
628        );
629
630        let is_owner = manager
631            .ownership
632            .all_owners()
633            .chain(&manager.leader)
634            .any(|owner| *owner == preferred_owner);
635
636        if !is_owner {
637            let accepted_owners = manager
638                .ownership
639                .all_owners()
640                .chain(&manager.leader)
641                .collect::<Vec<_>>();
642            warn!(%self.chain_id, ?accepted_owners, ?preferred_owner,
643                "Chain has multiple owners configured but none is preferred owner",
644            );
645            return Err(Error::NotAnOwner(self.chain_id));
646        }
647
648        let has_signer = self
649            .signer()
650            .contains_key(&preferred_owner)
651            .await
652            .map_err(Error::signer_failure)?;
653
654        if !has_signer {
655            warn!(%self.chain_id, ?preferred_owner,
656                "Chain is one of the owners but its Signer instance doesn't contain the key",
657            );
658            return Err(Error::CannotFindKeyForChain(self.chain_id));
659        }
660
661        Ok(preferred_owner)
662    }
663
664    /// Prepares the chain for the next operation, i.e. makes sure we have synchronized it up to
665    /// its current height.
666    #[instrument(level = "trace")]
667    pub async fn prepare_chain(&self) -> Result<Box<ChainInfo>, Error> {
668        #[cfg(with_metrics)]
669        let _latency = super::metrics::PREPARE_CHAIN_LATENCY.measure_latency();
670
671        let mut info = self.synchronize_to_known_height().await?;
672
673        if self.preferred_owner.is_none_or(|owner| {
674            !info
675                .manager
676                .ownership
677                .is_super_owner_no_regular_owners(&owner)
678        }) {
679            // If we are not a super owner or there are regular owners, we could be missing recent
680            // certificates created by other clients. Further synchronize blocks from the network.
681            // This is a best-effort that depends on network conditions.
682            info = self.client.synchronize_chain_state(self.chain_id).await?;
683        }
684
685        if info.epoch > self.client.admin_committees().await?.0 {
686            self.client
687                .synchronize_chain_state(self.client.admin_id)
688                .await?;
689        }
690
691        self.client.update_from_info(&info);
692        Ok(info)
693    }
694
695    // Verifies that our local storage contains enough history compared to the
696    // known block height. Otherwise, downloads the missing history from the
697    // network.
698    // The known height only differs if the wallet is ahead of storage.
699    async fn synchronize_to_known_height(&self) -> Result<Box<ChainInfo>, Error> {
700        let info = self
701            .client
702            .download_certificates(self.chain_id, self.initial_next_block_height)
703            .await?;
704        if info.next_block_height == self.initial_next_block_height {
705            // Check that our local node has the expected block hash.
706            ensure!(
707                self.initial_block_hash == info.block_hash,
708                Error::InternalError("Invalid chain of blocks in local node")
709            );
710        }
711        Ok(info)
712    }
713
714    /// Attempts to update all validators about the local chain.
715    #[instrument(level = "trace", skip(old_committee))]
716    pub async fn update_validators(&self, old_committee: Option<&Committee>) -> Result<(), Error> {
717        let update_validators_start = linera_base::time::Instant::now();
718        // Communicate the new certificate now.
719        if let Some(old_committee) = old_committee {
720            self.communicate_chain_updates(old_committee).await?
721        };
722        if let Ok(new_committee) = self.local_committee().await {
723            if Some(&new_committee) != old_committee {
724                // If the configuration just changed, communicate to the new committee as well.
725                // (This is actually more important that updating the previous committee.)
726                self.communicate_chain_updates(&new_committee).await?;
727            }
728        }
729        self.send_timing(update_validators_start, TimingType::UpdateValidators);
730        Ok(())
731    }
732
733    /// Broadcasts certified blocks to validators.
734    #[instrument(level = "trace", skip(committee))]
735    pub async fn communicate_chain_updates(&self, committee: &Committee) -> Result<(), Error> {
736        let delivery = self.options.cross_chain_message_delivery;
737        let height = self.chain_info().await?.next_block_height;
738        self.client
739            .communicate_chain_updates(committee, self.chain_id, height, delivery)
740            .await
741    }
742
743    /// Synchronizes all chains that any application on this chain subscribes to.
744    /// We always consider the admin chain a relevant publishing chain, for new epochs.
745    async fn synchronize_publisher_chains(&self) -> Result<(), Error> {
746        let chain_ids = self
747            .chain_state_view()
748            .await?
749            .execution_state
750            .system
751            .event_subscriptions
752            .indices()
753            .await?
754            .iter()
755            .map(|(chain_id, _)| *chain_id)
756            .chain(iter::once(self.client.admin_id))
757            .filter(|chain_id| *chain_id != self.chain_id)
758            .collect::<BTreeSet<_>>();
759        stream::iter(
760            chain_ids
761                .into_iter()
762                .map(|chain_id| self.client.synchronize_chain_state(chain_id)),
763        )
764        .buffer_unordered(self.options.max_joined_tasks)
765        .collect::<Vec<_>>()
766        .await
767        .into_iter()
768        .collect::<Result<Vec<_>, _>>()?;
769        Ok(())
770    }
771
772    /// Attempts to download new received certificates.
773    ///
774    /// This is a best effort: it will only find certificates that have been confirmed
775    /// amongst sufficiently many validators of the current committee of the target
776    /// chain.
777    ///
778    /// However, this should be the case whenever a sender's chain is still in use and
779    /// is regularly upgraded to new committees.
780    #[instrument(level = "trace")]
781    pub async fn find_received_certificates(
782        &self,
783        cancellation_token: Option<CancellationToken>,
784    ) -> Result<(), Error> {
785        debug!(chain_id = %self.chain_id, "starting find_received_certificates");
786        #[cfg(with_metrics)]
787        let _latency = super::metrics::FIND_RECEIVED_CERTIFICATES_LATENCY.measure_latency();
788        // Use network information from the local chain.
789        let chain_id = self.chain_id;
790        let (_, committee) = self.admin_committee().await?;
791        let nodes = self.client.make_nodes(&committee)?;
792
793        let trackers = self
794            .client
795            .local_node
796            .chain_state_view(chain_id)
797            .await?
798            .received_certificate_trackers
799            .get()
800            .clone();
801
802        trace!("find_received_certificates: read trackers");
803
804        let received_log_batches = Arc::new(std::sync::Mutex::new(Vec::new()));
805        // Proceed to downloading received logs.
806        let result = communicate_with_quorum(
807            &nodes,
808            &committee,
809            |_| (),
810            |remote_node| {
811                let client = &self.client;
812                let tracker = trackers.get(&remote_node.public_key).copied().unwrap_or(0);
813                let received_log_batches = Arc::clone(&received_log_batches);
814                Box::pin(async move {
815                    let batch = client
816                        .get_received_log_from_validator(chain_id, &remote_node, tracker)
817                        .await?;
818                    let mut batches = received_log_batches.lock().unwrap();
819                    batches.push((remote_node.public_key, batch));
820                    Ok(())
821                })
822            },
823            self.options.grace_period,
824        )
825        .await;
826
827        if let Err(error) = result {
828            error!(
829                %error,
830                "Failed to synchronize received_logs from at least a quorum of validators",
831            );
832        }
833
834        let received_logs: Vec<_> = {
835            let mut received_log_batches = received_log_batches.lock().unwrap();
836            std::mem::take(received_log_batches.as_mut())
837        };
838
839        debug!(
840            received_logs_len = %received_logs.len(),
841            received_logs_total = %received_logs.iter().map(|x| x.1.len()).sum::<usize>(),
842            "collected received logs"
843        );
844
845        let (received_logs, mut validator_trackers) = {
846            (
847                ReceivedLogs::from_received_result(received_logs.clone()),
848                ValidatorTrackers::new(received_logs, &trackers),
849            )
850        };
851
852        debug!(
853            num_chains = %received_logs.num_chains(),
854            num_certs = %received_logs.num_certs(),
855            "find_received_certificates: total number of chains and certificates to sync",
856        );
857
858        let max_blocks_per_chain =
859            self.options.sender_certificate_download_batch_size / self.options.max_joined_tasks * 2;
860        for received_log in received_logs.into_batches(
861            self.options.sender_certificate_download_batch_size,
862            max_blocks_per_chain,
863        ) {
864            validator_trackers = self
865                .receive_sender_certificates(
866                    received_log,
867                    validator_trackers,
868                    &nodes,
869                    cancellation_token.clone(),
870                )
871                .await?;
872
873            self.update_received_certificate_trackers(&validator_trackers)
874                .await;
875        }
876
877        info!("find_received_certificates finished");
878
879        Ok(())
880    }
881
882    async fn update_received_certificate_trackers(&self, trackers: &ValidatorTrackers) {
883        let updated_trackers = trackers.to_map();
884        trace!(?updated_trackers, "updated tracker values");
885
886        // Update the trackers.
887        if let Err(error) = self
888            .client
889            .local_node
890            .update_received_certificate_trackers(self.chain_id, updated_trackers)
891            .await
892        {
893            error!(
894                chain_id = %self.chain_id,
895                %error,
896                "Failed to update the certificate trackers for chain",
897            );
898        }
899    }
900
901    /// Downloads and processes or preprocesses the certificates for blocks sending messages to
902    /// this chain that we are still missing.
903    async fn receive_sender_certificates(
904        &self,
905        mut received_logs: ReceivedLogs,
906        mut validator_trackers: ValidatorTrackers,
907        nodes: &[RemoteNode<Env::ValidatorNode>],
908        cancellation_token: Option<CancellationToken>,
909    ) -> Result<ValidatorTrackers, Error> {
910        debug!(
911            num_chains = %received_logs.num_chains(),
912            num_certs = %received_logs.num_certs(),
913            "receive_sender_certificates: number of chains and certificates to sync",
914        );
915
916        // Obtain the next block height we need in the local node, for each chain.
917        let local_next_heights = self
918            .client
919            .local_node
920            .next_outbox_heights(received_logs.chains(), self.chain_id)
921            .await?;
922
923        validator_trackers.filter_out_already_known(&mut received_logs, local_next_heights);
924
925        debug!(
926            remaining_total_certificates = %received_logs.num_certs(),
927            "receive_sender_certificates: computed remote_heights"
928        );
929
930        let mut other_sender_chains = Vec::new();
931        let (sender, mut receiver) = mpsc::unbounded_channel::<ChainAndHeight>();
932
933        let cert_futures = received_logs.heights_per_chain().into_iter().filter_map(
934            |(sender_chain_id, remote_heights)| {
935                if remote_heights.is_empty() {
936                    // Our highest, locally executed block is higher than any block height
937                    // from the current batch. Skip this batch, but remember to wait for
938                    // the messages to be delivered to the inboxes.
939                    other_sender_chains.push(sender_chain_id);
940                    return None;
941                };
942                let remote_heights = remote_heights.into_iter().collect::<Vec<_>>();
943                let sender = sender.clone();
944                let client = self.client.clone();
945                let mut nodes = nodes.to_vec();
946                nodes.shuffle(&mut rand::thread_rng());
947                let received_logs_ref = &received_logs;
948                Some(async move {
949                    client
950                        .download_and_process_sender_chain(
951                            sender_chain_id,
952                            &nodes,
953                            received_logs_ref,
954                            remote_heights,
955                            sender,
956                        )
957                        .await
958                })
959            },
960        );
961
962        let update_trackers = linera_base::task::spawn(async move {
963            while let Some(chain_and_height) = receiver.recv().await {
964                validator_trackers.downloaded_cert(chain_and_height);
965            }
966            validator_trackers
967        });
968
969        let mut cancellation_future = Box::pin(
970            async move {
971                if let Some(token) = cancellation_token {
972                    token.cancelled().await
973                } else {
974                    future::pending().await
975                }
976            }
977            .fuse(),
978        );
979
980        select! {
981            _ = stream::iter(cert_futures)
982            .buffer_unordered(self.options.max_joined_tasks)
983            .for_each(future::ready)
984            => (),
985            _ = cancellation_future => ()
986        };
987
988        drop(sender);
989
990        let validator_trackers = update_trackers
991            .await
992            .map_err(|_| Error::InternalError("could not join the update_trackers task"))?;
993
994        debug!(
995            num_other_chains = %other_sender_chains.len(),
996            "receive_sender_certificates: processing certificates finished"
997        );
998
999        // Certificates for these chains were omitted from `certificates` because they were
1000        // already processed locally. If they were processed in a concurrent task, it is not
1001        // guaranteed that their cross-chain messages were already handled.
1002        self.retry_pending_cross_chain_requests(nodes, other_sender_chains)
1003            .await;
1004
1005        debug!("receive_sender_certificates: finished processing other_sender_chains");
1006
1007        Ok(validator_trackers)
1008    }
1009
1010    /// Retries cross chain requests on the chains which may have been processed on
1011    /// another task without the messages being correctly handled.
1012    async fn retry_pending_cross_chain_requests(
1013        &self,
1014        nodes: &[RemoteNode<Env::ValidatorNode>],
1015        other_sender_chains: Vec<ChainId>,
1016    ) {
1017        let stream = FuturesUnordered::from_iter(other_sender_chains.into_iter().map(|chain_id| {
1018            let local_node = self.client.local_node.clone();
1019            async move {
1020                if let Err(error) = match local_node
1021                    .retry_pending_cross_chain_requests(chain_id)
1022                    .await
1023                {
1024                    Ok(()) => Ok(()),
1025                    Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1026                        if let Err(error) = self
1027                            .client
1028                            .update_local_node_with_blobs_from(blob_ids.clone(), nodes)
1029                            .await
1030                        {
1031                            error!(
1032                                ?blob_ids,
1033                                %error,
1034                                "Error while attempting to download blobs during retrying outgoing \
1035                                messages"
1036                            );
1037                        }
1038                        local_node
1039                            .retry_pending_cross_chain_requests(chain_id)
1040                            .await
1041                    }
1042                    err => err,
1043                } {
1044                    error!(
1045                        %chain_id,
1046                        %error,
1047                        "Failed to retry outgoing messages from chain"
1048                    );
1049                }
1050            }
1051        }));
1052        stream.for_each(future::ready).await;
1053    }
1054
1055    /// Sends money.
1056    #[instrument(level = "trace")]
1057    pub async fn transfer(
1058        &self,
1059        owner: AccountOwner,
1060        amount: Amount,
1061        recipient: Account,
1062    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1063        // TODO(#467): check the balance of `owner` before signing any block proposal.
1064        self.execute_operation(SystemOperation::Transfer {
1065            owner,
1066            recipient,
1067            amount,
1068        })
1069        .await
1070    }
1071
1072    /// Verify if a data blob is readable from storage.
1073    // TODO(#2490): Consider removing or renaming this.
1074    #[instrument(level = "trace")]
1075    pub async fn read_data_blob(
1076        &self,
1077        hash: CryptoHash,
1078    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1079        let blob_id = BlobId {
1080            hash,
1081            blob_type: BlobType::Data,
1082        };
1083        self.execute_operation(SystemOperation::VerifyBlob { blob_id })
1084            .await
1085    }
1086
1087    /// Claims money in a remote chain.
1088    #[instrument(level = "trace")]
1089    pub async fn claim(
1090        &self,
1091        owner: AccountOwner,
1092        target_id: ChainId,
1093        recipient: Account,
1094        amount: Amount,
1095    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1096        self.execute_operation(SystemOperation::Claim {
1097            owner,
1098            target_id,
1099            recipient,
1100            amount,
1101        })
1102        .await
1103    }
1104
1105    /// Requests a leader timeout vote from all validators. If a quorum signs it, creates a
1106    /// certificate and sends it to all validators, to make them enter the next round.
1107    #[instrument(level = "trace")]
1108    pub async fn request_leader_timeout(&self) -> Result<TimeoutCertificate, Error> {
1109        let chain_id = self.chain_id;
1110        let info = self.chain_info_with_committees().await?;
1111        let committee = info.current_committee()?;
1112        let height = info.next_block_height;
1113        let round = info.manager.current_round;
1114        let action = CommunicateAction::RequestTimeout {
1115            height,
1116            round,
1117            chain_id,
1118        };
1119        let value = Timeout::new(chain_id, height, info.epoch);
1120        let certificate = Box::new(
1121            self.client
1122                .communicate_chain_action(committee, action, value)
1123                .await?,
1124        );
1125        self.client.process_certificate(certificate.clone()).await?;
1126        // The block height didn't increase, but this will communicate the timeout as well.
1127        self.client
1128            .communicate_chain_updates(
1129                committee,
1130                chain_id,
1131                height,
1132                CrossChainMessageDelivery::NonBlocking,
1133            )
1134            .await?;
1135        Ok(*certificate)
1136    }
1137
1138    /// Downloads and processes any certificates we are missing for the given chain.
1139    #[instrument(level = "trace", skip_all)]
1140    pub async fn synchronize_chain_state(
1141        &self,
1142        chain_id: ChainId,
1143    ) -> Result<Box<ChainInfo>, Error> {
1144        self.client.synchronize_chain_state(chain_id).await
1145    }
1146
1147    /// Downloads and processes any certificates we are missing for this chain, from the given
1148    /// committee.
1149    #[instrument(level = "trace", skip_all)]
1150    pub async fn synchronize_chain_state_from_committee(
1151        &self,
1152        committee: Committee,
1153    ) -> Result<Box<ChainInfo>, Error> {
1154        self.client
1155            .synchronize_chain_state_from_committee(self.chain_id, committee)
1156            .await
1157    }
1158
1159    /// Executes a list of operations.
1160    #[instrument(level = "trace", skip(operations, blobs))]
1161    pub async fn execute_operations(
1162        &self,
1163        operations: Vec<Operation>,
1164        blobs: Vec<Blob>,
1165    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1166        let timing_start = linera_base::time::Instant::now();
1167
1168        let result = loop {
1169            let execute_block_start = linera_base::time::Instant::now();
1170            // TODO(#2066): Remove boxing once the call-stack is shallower
1171            match Box::pin(self.execute_block(operations.clone(), blobs.clone())).await {
1172                Ok(ExecuteBlockOutcome::Executed(certificate)) => {
1173                    self.send_timing(execute_block_start, TimingType::ExecuteBlock);
1174                    break Ok(ClientOutcome::Committed(certificate));
1175                }
1176                Ok(ExecuteBlockOutcome::WaitForTimeout(timeout)) => {
1177                    break Ok(ClientOutcome::WaitForTimeout(timeout));
1178                }
1179                Ok(ExecuteBlockOutcome::Conflict(certificate)) => {
1180                    info!(
1181                        height = %certificate.block().header.height,
1182                        "Another block was committed; retrying."
1183                    );
1184                }
1185                Err(Error::CommunicationError(CommunicationError::Trusted(
1186                    NodeError::UnexpectedBlockHeight {
1187                        expected_block_height,
1188                        found_block_height,
1189                    },
1190                ))) if expected_block_height > found_block_height => {
1191                    tracing::info!(
1192                        "Local state is outdated; synchronizing chain {:.8}",
1193                        self.chain_id
1194                    );
1195                    self.synchronize_chain_state(self.chain_id).await?;
1196                }
1197                Err(err) => return Err(err),
1198            };
1199        };
1200
1201        self.send_timing(timing_start, TimingType::ExecuteOperations);
1202
1203        result
1204    }
1205
1206    /// Executes an operation.
1207    pub async fn execute_operation(
1208        &self,
1209        operation: impl Into<Operation>,
1210    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1211        self.execute_operations(vec![operation.into()], vec![])
1212            .await
1213    }
1214
1215    /// Executes a new block.
1216    ///
1217    /// This must be preceded by a call to `prepare_chain()`.
1218    #[instrument(level = "trace", skip(operations, blobs))]
1219    async fn execute_block(
1220        &self,
1221        operations: Vec<Operation>,
1222        blobs: Vec<Blob>,
1223    ) -> Result<ExecuteBlockOutcome, Error> {
1224        let transactions = self.prepend_epochs_messages_and_events(operations).await?;
1225
1226        if transactions.is_empty() {
1227            return Err(Error::LocalNodeError(LocalNodeError::WorkerError(
1228                WorkerError::ChainError(Box::new(ChainError::EmptyBlock)),
1229            )));
1230        }
1231
1232        self.execute_prepared_transactions(transactions, blobs)
1233            .await
1234    }
1235
1236    #[instrument(level = "trace", skip(transactions, blobs))]
1237    async fn execute_prepared_transactions(
1238        &self,
1239        transactions: Vec<Transaction>,
1240        blobs: Vec<Blob>,
1241    ) -> Result<ExecuteBlockOutcome, Error> {
1242        #[cfg(with_metrics)]
1243        let _latency = super::metrics::EXECUTE_BLOCK_LATENCY.measure_latency();
1244
1245        let mutex = self.client_mutex();
1246        let _guard = mutex.lock_owned().await;
1247        // TOOD: We shouldn't need to call this explicitly.
1248        match self.process_pending_block_without_prepare().await? {
1249            ClientOutcome::Committed(Some(certificate)) => {
1250                return Ok(ExecuteBlockOutcome::Conflict(certificate))
1251            }
1252            ClientOutcome::WaitForTimeout(timeout) => {
1253                return Ok(ExecuteBlockOutcome::WaitForTimeout(timeout))
1254            }
1255            ClientOutcome::Committed(None) => {}
1256        }
1257
1258        let block = self.new_pending_block(transactions, blobs).await?;
1259
1260        match self.process_pending_block_without_prepare().await? {
1261            ClientOutcome::Committed(Some(certificate)) if certificate.block() == &block => {
1262                Ok(ExecuteBlockOutcome::Executed(certificate))
1263            }
1264            ClientOutcome::Committed(Some(certificate)) => {
1265                Ok(ExecuteBlockOutcome::Conflict(certificate))
1266            }
1267            // Should be unreachable: We did set a pending block.
1268            ClientOutcome::Committed(None) => {
1269                Err(Error::BlockProposalError("Unexpected block proposal error"))
1270            }
1271            ClientOutcome::WaitForTimeout(timeout) => {
1272                Ok(ExecuteBlockOutcome::WaitForTimeout(timeout))
1273            }
1274        }
1275    }
1276
1277    /// Creates a vector of transactions which, in addition to the provided operations,
1278    /// also contains epoch changes, receiving message bundles and event stream updates
1279    /// (if there are any to be processed).
1280    /// This should be called when executing a block, in order to make sure that any pending
1281    /// messages or events are included in it.
1282    #[instrument(level = "trace", skip(operations))]
1283    async fn prepend_epochs_messages_and_events(
1284        &self,
1285        operations: Vec<Operation>,
1286    ) -> Result<Vec<Transaction>, Error> {
1287        let incoming_bundles = self.pending_message_bundles().await?;
1288        let stream_updates = self.collect_stream_updates().await?;
1289        Ok(self
1290            .collect_epoch_changes()
1291            .await?
1292            .into_iter()
1293            .map(Transaction::ExecuteOperation)
1294            .chain(
1295                incoming_bundles
1296                    .into_iter()
1297                    .map(Transaction::ReceiveMessages),
1298            )
1299            .chain(
1300                stream_updates
1301                    .into_iter()
1302                    .map(Transaction::ExecuteOperation),
1303            )
1304            .chain(operations.into_iter().map(Transaction::ExecuteOperation))
1305            .collect::<Vec<_>>())
1306    }
1307
1308    /// Creates a new pending block and handles the proposal in the local node.
1309    /// Next time `process_pending_block_without_prepare` is called, this block will be proposed
1310    /// to the validators.
1311    #[instrument(level = "trace", skip(transactions, blobs))]
1312    async fn new_pending_block(
1313        &self,
1314        transactions: Vec<Transaction>,
1315        blobs: Vec<Blob>,
1316    ) -> Result<Block, Error> {
1317        let identity = self.identity().await?;
1318
1319        ensure!(
1320            self.pending_proposal().is_none(),
1321            Error::BlockProposalError(
1322                "Client state already has a pending block; \
1323                use the `linera retry-pending-block` command to commit that first"
1324            )
1325        );
1326        let info = self.chain_info_with_committees().await?;
1327        let timestamp = self.next_timestamp(&transactions, info.timestamp);
1328        let proposed_block = ProposedBlock {
1329            epoch: info.epoch,
1330            chain_id: self.chain_id,
1331            transactions,
1332            previous_block_hash: info.block_hash,
1333            height: info.next_block_height,
1334            authenticated_owner: Some(identity),
1335            timestamp,
1336        };
1337
1338        // Use the round number assuming there are oracle responses.
1339        // Using the round number during execution counts as an oracle.
1340        // Accessing the round number in single-leader rounds where we are not the leader
1341        // is not currently supported.
1342        let round = match self.round_for_new_proposal(&info, &identity, true).await? {
1343            Either::Left(round) => round.multi_leader(),
1344            Either::Right(_) => None,
1345        };
1346        // Make sure every incoming message succeeds and otherwise remove them.
1347        // Also, compute the final certified hash while we're at it.
1348        let (block, _) = self
1349            .client
1350            .stage_block_execution_and_discard_failing_messages(
1351                proposed_block,
1352                round,
1353                blobs.clone(),
1354            )
1355            .await?;
1356        let (proposed_block, _) = block.clone().into_proposal();
1357        self.update_state(|state| {
1358            state.set_pending_proposal(proposed_block.clone(), blobs.clone())
1359        });
1360        Ok(block)
1361    }
1362
1363    /// Returns a suitable timestamp for the next block.
1364    ///
1365    /// This will usually be the current time according to the local clock, but may be slightly
1366    /// ahead to make sure it's not earlier than the incoming messages or the previous block.
1367    #[instrument(level = "trace", skip(transactions))]
1368    fn next_timestamp(&self, transactions: &[Transaction], block_time: Timestamp) -> Timestamp {
1369        let local_time = self.storage_client().clock().current_time();
1370        transactions
1371            .iter()
1372            .filter_map(Transaction::incoming_bundle)
1373            .map(|msg| msg.bundle.timestamp)
1374            .max()
1375            .map_or(local_time, |timestamp| timestamp.max(local_time))
1376            .max(block_time)
1377    }
1378
1379    /// Queries an application.
1380    #[instrument(level = "trace", skip(query))]
1381    pub async fn query_application(&self, query: Query) -> Result<QueryOutcome, Error> {
1382        loop {
1383            let result = self
1384                .client
1385                .local_node
1386                .query_application(self.chain_id, query.clone())
1387                .await;
1388            if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1389                let validators = self.client.validator_nodes().await?;
1390                self.client
1391                    .update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1392                    .await?;
1393                continue; // We found the missing blob: retry.
1394            }
1395            return Ok(result?);
1396        }
1397    }
1398
1399    /// Queries a system application.
1400    #[instrument(level = "trace", skip(query))]
1401    pub async fn query_system_application(
1402        &self,
1403        query: SystemQuery,
1404    ) -> Result<QueryOutcome<SystemResponse>, Error> {
1405        let QueryOutcome {
1406            response,
1407            operations,
1408        } = self.query_application(Query::System(query)).await?;
1409        match response {
1410            QueryResponse::System(response) => Ok(QueryOutcome {
1411                response,
1412                operations,
1413            }),
1414            _ => Err(Error::InternalError("Unexpected response for system query")),
1415        }
1416    }
1417
1418    /// Queries a user application.
1419    #[instrument(level = "trace", skip(application_id, query))]
1420    pub async fn query_user_application<A: Abi>(
1421        &self,
1422        application_id: ApplicationId<A>,
1423        query: &A::Query,
1424    ) -> Result<QueryOutcome<A::QueryResponse>, Error> {
1425        let query = Query::user(application_id, query)?;
1426        let QueryOutcome {
1427            response,
1428            operations,
1429        } = self.query_application(query).await?;
1430        match response {
1431            QueryResponse::User(response_bytes) => {
1432                let response = serde_json::from_slice(&response_bytes)?;
1433                Ok(QueryOutcome {
1434                    response,
1435                    operations,
1436                })
1437            }
1438            _ => Err(Error::InternalError("Unexpected response for user query")),
1439        }
1440    }
1441
1442    /// Obtains the local balance of the chain account after staging the execution of
1443    /// incoming messages in a new block.
1444    ///
1445    /// Does not attempt to synchronize with validators. The result will reflect up to
1446    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
1447    /// block.
1448    #[instrument(level = "trace")]
1449    pub async fn query_balance(&self) -> Result<Amount, Error> {
1450        let (balance, _) = self.query_balances_with_owner(AccountOwner::CHAIN).await?;
1451        Ok(balance)
1452    }
1453
1454    /// Obtains the local balance of an account after staging the execution of incoming messages in
1455    /// a new block.
1456    ///
1457    /// Does not attempt to synchronize with validators. The result will reflect up to
1458    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
1459    /// block.
1460    #[instrument(level = "trace", skip(owner))]
1461    pub async fn query_owner_balance(&self, owner: AccountOwner) -> Result<Amount, Error> {
1462        if owner.is_chain() {
1463            self.query_balance().await
1464        } else {
1465            Ok(self
1466                .query_balances_with_owner(owner)
1467                .await?
1468                .1
1469                .unwrap_or(Amount::ZERO))
1470        }
1471    }
1472
1473    /// Obtains the local balance of an account and optionally another user after staging the
1474    /// execution of incoming messages in a new block.
1475    ///
1476    /// Does not attempt to synchronize with validators. The result will reflect up to
1477    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
1478    /// block.
1479    #[instrument(level = "trace", skip(owner))]
1480    pub(crate) async fn query_balances_with_owner(
1481        &self,
1482        owner: AccountOwner,
1483    ) -> Result<(Amount, Option<Amount>), Error> {
1484        let incoming_bundles = self.pending_message_bundles().await?;
1485        // Since we disallow empty blocks, and there is no incoming messages,
1486        // that could change it, we query for the balance immediately.
1487        if incoming_bundles.is_empty() {
1488            let chain_balance = self.local_balance().await?;
1489            let owner_balance = self.local_owner_balance(owner).await?;
1490            return Ok((chain_balance, Some(owner_balance)));
1491        }
1492        let info = self.chain_info().await?;
1493        let transactions = incoming_bundles
1494            .into_iter()
1495            .map(Transaction::ReceiveMessages)
1496            .collect::<Vec<_>>();
1497        let timestamp = self.next_timestamp(&transactions, info.timestamp);
1498        let block = ProposedBlock {
1499            epoch: info.epoch,
1500            chain_id: self.chain_id,
1501            transactions,
1502            previous_block_hash: info.block_hash,
1503            height: info.next_block_height,
1504            authenticated_owner: if owner == AccountOwner::CHAIN {
1505                None
1506            } else {
1507                Some(owner)
1508            },
1509            timestamp,
1510        };
1511        match self
1512            .client
1513            .stage_block_execution_and_discard_failing_messages(block, None, Vec::new())
1514            .await
1515        {
1516            Ok((_, response)) => Ok((
1517                response.info.chain_balance,
1518                response.info.requested_owner_balance,
1519            )),
1520            Err(Error::LocalNodeError(LocalNodeError::WorkerError(WorkerError::ChainError(
1521                error,
1522            )))) if matches!(
1523                &*error,
1524                ChainError::ExecutionError(
1525                    execution_error,
1526                    ChainExecutionContext::Block
1527                ) if matches!(
1528                    **execution_error,
1529                    ExecutionError::FeesExceedFunding { .. }
1530                )
1531            ) =>
1532            {
1533                // We can't even pay for the execution of one empty block. Let's return zero.
1534                Ok((Amount::ZERO, Some(Amount::ZERO)))
1535            }
1536            Err(error) => Err(error),
1537        }
1538    }
1539
1540    /// Reads the local balance of the chain account.
1541    ///
1542    /// Does not process the inbox or attempt to synchronize with validators.
1543    #[instrument(level = "trace")]
1544    pub async fn local_balance(&self) -> Result<Amount, Error> {
1545        let (balance, _) = self.local_balances_with_owner(AccountOwner::CHAIN).await?;
1546        Ok(balance)
1547    }
1548
1549    /// Reads the local balance of a user account.
1550    ///
1551    /// Does not process the inbox or attempt to synchronize with validators.
1552    #[instrument(level = "trace", skip(owner))]
1553    pub async fn local_owner_balance(&self, owner: AccountOwner) -> Result<Amount, Error> {
1554        if owner.is_chain() {
1555            self.local_balance().await
1556        } else {
1557            Ok(self
1558                .local_balances_with_owner(owner)
1559                .await?
1560                .1
1561                .unwrap_or(Amount::ZERO))
1562        }
1563    }
1564
1565    /// Reads the local balance of the chain account and optionally another user.
1566    ///
1567    /// Does not process the inbox or attempt to synchronize with validators.
1568    #[instrument(level = "trace", skip(owner))]
1569    pub(crate) async fn local_balances_with_owner(
1570        &self,
1571        owner: AccountOwner,
1572    ) -> Result<(Amount, Option<Amount>), Error> {
1573        ensure!(
1574            self.chain_info().await?.next_block_height >= self.initial_next_block_height,
1575            Error::WalletSynchronizationError
1576        );
1577        let mut query = ChainInfoQuery::new(self.chain_id);
1578        query.request_owner_balance = owner;
1579        let response = self
1580            .client
1581            .local_node
1582            .handle_chain_info_query(query)
1583            .await?;
1584        Ok((
1585            response.info.chain_balance,
1586            response.info.requested_owner_balance,
1587        ))
1588    }
1589
1590    /// Sends tokens to a chain.
1591    #[instrument(level = "trace")]
1592    pub async fn transfer_to_account(
1593        &self,
1594        from: AccountOwner,
1595        amount: Amount,
1596        account: Account,
1597    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1598        self.transfer(from, amount, account).await
1599    }
1600
1601    /// Burns tokens (transfer to a special address).
1602    #[cfg(with_testing)]
1603    #[instrument(level = "trace")]
1604    pub async fn burn(
1605        &self,
1606        owner: AccountOwner,
1607        amount: Amount,
1608    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1609        let recipient = Account::burn_address(self.chain_id);
1610        self.transfer(owner, amount, recipient).await
1611    }
1612
1613    #[instrument(level = "trace")]
1614    pub async fn fetch_chain_info(&self) -> Result<Box<ChainInfo>, Error> {
1615        let validators = self.client.validator_nodes().await?;
1616        self.client
1617            .fetch_chain_info(self.chain_id, &validators)
1618            .await
1619    }
1620
1621    /// Attempts to synchronize chains that have sent us messages and populate our local
1622    /// inbox.
1623    ///
1624    /// To create a block that actually executes the messages in the inbox,
1625    /// `process_inbox` must be called separately.
1626    #[instrument(level = "trace")]
1627    pub async fn synchronize_from_validators(&self) -> Result<Box<ChainInfo>, Error> {
1628        let info = self.prepare_chain().await?;
1629        self.synchronize_publisher_chains().await?;
1630        self.find_received_certificates(None).await?;
1631        Ok(info)
1632    }
1633
1634    /// Processes the last pending block
1635    #[instrument(level = "trace")]
1636    pub async fn process_pending_block(
1637        &self,
1638    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1639        self.prepare_chain().await?;
1640        self.process_pending_block_without_prepare().await
1641    }
1642
1643    /// Processes the last pending block. Assumes that the local chain is up to date.
1644    #[instrument(level = "trace")]
1645    async fn process_pending_block_without_prepare(
1646        &self,
1647    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1648        let info = self.request_leader_timeout_if_needed().await?;
1649
1650        // If there is a validated block in the current round, finalize it.
1651        if info.manager.has_locking_block_in_current_round()
1652            && !info.manager.current_round.is_fast()
1653        {
1654            return self.finalize_locking_block(info).await;
1655        }
1656        let owner = self.identity().await?;
1657
1658        let local_node = &self.client.local_node;
1659        // Otherwise we have to re-propose the highest validated block, if there is one.
1660        let pending_proposal = self.pending_proposal();
1661        let (block, blobs) = if let Some(locking) = &info.manager.requested_locking {
1662            match &**locking {
1663                LockingBlock::Regular(certificate) => {
1664                    let blob_ids = certificate.block().required_blob_ids();
1665                    let blobs = local_node
1666                        .get_locking_blobs(&blob_ids, self.chain_id)
1667                        .await?
1668                        .ok_or_else(|| Error::InternalError("Missing local locking blobs"))?;
1669                    debug!("Retrying locking block from round {}", certificate.round);
1670                    (certificate.block().clone(), blobs)
1671                }
1672                LockingBlock::Fast(proposal) => {
1673                    let proposed_block = proposal.content.block.clone();
1674                    let blob_ids = proposed_block.published_blob_ids();
1675                    let blobs = local_node
1676                        .get_locking_blobs(&blob_ids, self.chain_id)
1677                        .await?
1678                        .ok_or_else(|| Error::InternalError("Missing local locking blobs"))?;
1679                    let block = self
1680                        .client
1681                        .stage_block_execution(proposed_block, None, blobs.clone())
1682                        .await?
1683                        .0;
1684                    debug!("Retrying locking block from fast round.");
1685                    (block, blobs)
1686                }
1687            }
1688        } else if let Some(pending_proposal) = pending_proposal {
1689            // Otherwise we are free to propose our own pending block.
1690            // Use the round number assuming there are oracle responses.
1691            // Using the round number during execution counts as an oracle.
1692            let proposed_block = pending_proposal.block;
1693            let round = match self.round_for_new_proposal(&info, &owner, true).await? {
1694                Either::Left(round) => round.multi_leader(),
1695                Either::Right(_) => None,
1696            };
1697            let (block, _) = self
1698                .client
1699                .stage_block_execution(proposed_block, round, pending_proposal.blobs.clone())
1700                .await?;
1701            debug!("Proposing the local pending block.");
1702            (block, pending_proposal.blobs)
1703        } else {
1704            return Ok(ClientOutcome::Committed(None)); // Nothing to do.
1705        };
1706
1707        let has_oracle_responses = block.has_oracle_responses();
1708        let (proposed_block, outcome) = block.into_proposal();
1709        let round = match self
1710            .round_for_new_proposal(&info, &owner, has_oracle_responses)
1711            .await?
1712        {
1713            Either::Left(round) => round,
1714            Either::Right(timeout) => return Ok(ClientOutcome::WaitForTimeout(timeout)),
1715        };
1716        debug!("Proposing block for round {}", round);
1717
1718        let already_handled_locally = info
1719            .manager
1720            .already_handled_proposal(round, &proposed_block);
1721        // Create the final block proposal.
1722        let proposal = if let Some(locking) = info.manager.requested_locking {
1723            Box::new(match *locking {
1724                LockingBlock::Regular(cert) => {
1725                    BlockProposal::new_retry_regular(owner, round, cert, self.signer())
1726                        .await
1727                        .map_err(Error::signer_failure)?
1728                }
1729                LockingBlock::Fast(proposal) => {
1730                    BlockProposal::new_retry_fast(owner, round, proposal, self.signer())
1731                        .await
1732                        .map_err(Error::signer_failure)?
1733                }
1734            })
1735        } else {
1736            Box::new(
1737                BlockProposal::new_initial(owner, round, proposed_block.clone(), self.signer())
1738                    .await
1739                    .map_err(Error::signer_failure)?,
1740            )
1741        };
1742        if !already_handled_locally {
1743            // Check the final block proposal. This will be cheaper after #1401.
1744            if let Err(err) = local_node.handle_block_proposal(*proposal.clone()).await {
1745                match err {
1746                    LocalNodeError::BlobsNotFound(_) => {
1747                        local_node
1748                            .handle_pending_blobs(self.chain_id, blobs)
1749                            .await?;
1750                        local_node.handle_block_proposal(*proposal.clone()).await?;
1751                    }
1752                    err => return Err(err.into()),
1753                }
1754            }
1755        }
1756        let committee = self.local_committee().await?;
1757        let block = Block::new(proposed_block, outcome);
1758        // Send the query to validators.
1759        let submit_block_proposal_start = linera_base::time::Instant::now();
1760        let certificate = if round.is_fast() {
1761            let hashed_value = ConfirmedBlock::new(block);
1762            self.client
1763                .submit_block_proposal(&committee, proposal, hashed_value)
1764                .await?
1765        } else {
1766            let hashed_value = ValidatedBlock::new(block);
1767            let certificate = self
1768                .client
1769                .submit_block_proposal(&committee, proposal, hashed_value.clone())
1770                .await?;
1771            self.client.finalize_block(&committee, certificate).await?
1772        };
1773        self.send_timing(submit_block_proposal_start, TimingType::SubmitBlockProposal);
1774        debug!(round = %certificate.round, "Sending confirmed block to validators");
1775        self.update_validators(Some(&committee)).await?;
1776        Ok(ClientOutcome::Committed(Some(certificate)))
1777    }
1778
1779    fn send_timing(&self, start: Instant, timing_type: TimingType) {
1780        let Some(sender) = &self.timing_sender else {
1781            return;
1782        };
1783        if let Err(err) = sender.send((start.elapsed().as_millis() as u64, timing_type)) {
1784            tracing::warn!(%err, "Failed to send timing info");
1785        }
1786    }
1787
1788    /// Requests a leader timeout certificate if the current round has timed out. Returns the
1789    /// chain info for the (possibly new) current round.
1790    async fn request_leader_timeout_if_needed(&self) -> Result<Box<ChainInfo>, Error> {
1791        let mut info = self.chain_info_with_manager_values().await?;
1792        // If the current round has timed out, we request a timeout certificate and retry in
1793        // the next round.
1794        if let Some(round_timeout) = info.manager.round_timeout {
1795            if round_timeout <= self.storage_client().clock().current_time() {
1796                if let Err(e) = self.request_leader_timeout().await {
1797                    info!("Failed to obtain a timeout certificate: {}", e);
1798                } else {
1799                    info = self.chain_info_with_manager_values().await?;
1800                }
1801            }
1802        }
1803        Ok(info)
1804    }
1805
1806    /// Finalizes the locking block.
1807    ///
1808    /// Panics if there is no locking block; fails if the locking block is not in the current round.
1809    async fn finalize_locking_block(
1810        &self,
1811        info: Box<ChainInfo>,
1812    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1813        let locking = info
1814            .manager
1815            .requested_locking
1816            .expect("Should have a locking block");
1817        let LockingBlock::Regular(certificate) = *locking else {
1818            panic!("Should have a locking validated block");
1819        };
1820        debug!(
1821            round = %certificate.round,
1822            "Finalizing locking block"
1823        );
1824        let committee = self.local_committee().await?;
1825        match self
1826            .client
1827            .finalize_block(&committee, certificate.clone())
1828            .await
1829        {
1830            Ok(certificate) => {
1831                self.update_validators(Some(&committee)).await?;
1832                Ok(ClientOutcome::Committed(Some(certificate)))
1833            }
1834            Err(Error::CommunicationError(error))
1835                if info.manager.current_round >= Round::SingleLeader(0) =>
1836            {
1837                // Communication errors in this case often mean that someone else already
1838                // finalized the block or started another round.
1839                let timestamp = info.manager.round_timeout.ok_or(error)?;
1840                Ok(ClientOutcome::WaitForTimeout(RoundTimeout {
1841                    timestamp,
1842                    current_round: info.manager.current_round,
1843                    next_block_height: info.next_block_height,
1844                }))
1845            }
1846            Err(error) => Err(error),
1847        }
1848    }
1849
1850    /// Returns a round in which we can propose a new block or the given one, if possible.
1851    async fn round_for_new_proposal(
1852        &self,
1853        info: &ChainInfo,
1854        identity: &AccountOwner,
1855        has_oracle_responses: bool,
1856    ) -> Result<Either<Round, RoundTimeout>, Error> {
1857        let seed = *self.chain_state_view().await?.manager.seed.get();
1858        let manager = &info.manager;
1859        // If there is a conflicting proposal in the current round, we can only propose if the
1860        // next round can be started without a timeout, i.e. if we are in a multi-leader round.
1861        // Similarly, we cannot propose a block that uses oracles in the fast round.
1862        let conflict = manager
1863            .requested_signed_proposal
1864            .as_ref()
1865            .into_iter()
1866            .chain(&manager.requested_proposed)
1867            .any(|proposal| proposal.content.round == manager.current_round)
1868            || (manager.current_round.is_fast() && has_oracle_responses);
1869        let round = if !conflict {
1870            manager.current_round
1871        } else if let Some(round) = manager
1872            .ownership
1873            .next_round(manager.current_round)
1874            .filter(|_| manager.current_round.is_multi_leader() || manager.current_round.is_fast())
1875        {
1876            round
1877        } else if let Some(timeout) = info.round_timeout() {
1878            return Ok(Either::Right(timeout));
1879        } else {
1880            return Err(Error::BlockProposalError(
1881                "Conflicting proposal in the current round",
1882            ));
1883        };
1884        let current_committee = info
1885            .current_committee()?
1886            .validators
1887            .values()
1888            .map(|v| (AccountOwner::from(v.account_public_key), v.votes))
1889            .collect();
1890        if manager.can_propose(identity, round, seed, &current_committee) {
1891            return Ok(Either::Left(round));
1892        }
1893        if let Some(timeout) = info.round_timeout() {
1894            return Ok(Either::Right(timeout));
1895        }
1896        Err(Error::BlockProposalError(
1897            "Not a leader in the current round",
1898        ))
1899    }
1900
1901    /// Clears the information on any operation that previously failed.
1902    #[cfg(with_testing)]
1903    #[instrument(level = "trace")]
1904    pub fn clear_pending_proposal(&self) {
1905        self.update_state(|state| state.clear_pending_proposal());
1906    }
1907
1908    /// Rotates the key of the chain.
1909    ///
1910    /// Replaces current owners of the chain with the new key pair.
1911    #[instrument(level = "trace")]
1912    pub async fn rotate_key_pair(
1913        &self,
1914        public_key: AccountPublicKey,
1915    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1916        self.transfer_ownership(public_key.into()).await
1917    }
1918
1919    /// Transfers ownership of the chain to a single super owner.
1920    #[instrument(level = "trace")]
1921    pub async fn transfer_ownership(
1922        &self,
1923        new_owner: AccountOwner,
1924    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1925        self.execute_operation(SystemOperation::ChangeOwnership {
1926            super_owners: vec![new_owner],
1927            owners: Vec::new(),
1928            multi_leader_rounds: 2,
1929            open_multi_leader_rounds: false,
1930            timeout_config: TimeoutConfig::default(),
1931        })
1932        .await
1933    }
1934
1935    /// Adds another owner to the chain, and turns existing super owners into regular owners.
1936    #[instrument(level = "trace")]
1937    pub async fn share_ownership(
1938        &self,
1939        new_owner: AccountOwner,
1940        new_weight: u64,
1941    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1942        loop {
1943            let ownership = self.prepare_chain().await?.manager.ownership;
1944            ensure!(
1945                ownership.is_active(),
1946                ChainError::InactiveChain(self.chain_id)
1947            );
1948            let mut owners = ownership.owners.into_iter().collect::<Vec<_>>();
1949            owners.extend(ownership.super_owners.into_iter().zip(iter::repeat(100)));
1950            owners.push((new_owner, new_weight));
1951            let operations = vec![Operation::system(SystemOperation::ChangeOwnership {
1952                super_owners: Vec::new(),
1953                owners,
1954                multi_leader_rounds: ownership.multi_leader_rounds,
1955                open_multi_leader_rounds: ownership.open_multi_leader_rounds,
1956                timeout_config: ownership.timeout_config,
1957            })];
1958            match self.execute_block(operations, vec![]).await? {
1959                ExecuteBlockOutcome::Executed(certificate) => {
1960                    return Ok(ClientOutcome::Committed(certificate));
1961                }
1962                ExecuteBlockOutcome::Conflict(certificate) => {
1963                    info!(
1964                        height = %certificate.block().header.height,
1965                        "Another block was committed; retrying."
1966                    );
1967                }
1968                ExecuteBlockOutcome::WaitForTimeout(timeout) => {
1969                    return Ok(ClientOutcome::WaitForTimeout(timeout));
1970                }
1971            };
1972        }
1973    }
1974
1975    /// Changes the ownership of this chain. Fails if it would remove existing owners, unless
1976    /// `remove_owners` is `true`.
1977    #[instrument(level = "trace")]
1978    pub async fn change_ownership(
1979        &self,
1980        ownership: ChainOwnership,
1981    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1982        self.execute_operation(SystemOperation::ChangeOwnership {
1983            super_owners: ownership.super_owners.into_iter().collect(),
1984            owners: ownership.owners.into_iter().collect(),
1985            multi_leader_rounds: ownership.multi_leader_rounds,
1986            open_multi_leader_rounds: ownership.open_multi_leader_rounds,
1987            timeout_config: ownership.timeout_config.clone(),
1988        })
1989        .await
1990    }
1991
1992    /// Changes the application permissions configuration on this chain.
1993    #[instrument(level = "trace", skip(application_permissions))]
1994    pub async fn change_application_permissions(
1995        &self,
1996        application_permissions: ApplicationPermissions,
1997    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1998        self.execute_operation(SystemOperation::ChangeApplicationPermissions(
1999            application_permissions,
2000        ))
2001        .await
2002    }
2003
2004    /// Opens a new chain with a derived UID.
2005    #[instrument(level = "trace", skip(self))]
2006    pub async fn open_chain(
2007        &self,
2008        ownership: ChainOwnership,
2009        application_permissions: ApplicationPermissions,
2010        balance: Amount,
2011    ) -> Result<ClientOutcome<(ChainDescription, ConfirmedBlockCertificate)>, Error> {
2012        loop {
2013            let config = OpenChainConfig {
2014                ownership: ownership.clone(),
2015                balance,
2016                application_permissions: application_permissions.clone(),
2017            };
2018            let operation = Operation::system(SystemOperation::OpenChain(config));
2019            let certificate = match self.execute_block(vec![operation], vec![]).await? {
2020                ExecuteBlockOutcome::Executed(certificate) => certificate,
2021                ExecuteBlockOutcome::Conflict(_) => continue,
2022                ExecuteBlockOutcome::WaitForTimeout(timeout) => {
2023                    return Ok(ClientOutcome::WaitForTimeout(timeout));
2024                }
2025            };
2026            // The only operation, i.e. the last transaction, created the new chain.
2027            let chain_blob = certificate
2028                .block()
2029                .body
2030                .blobs
2031                .last()
2032                .and_then(|blobs| blobs.last())
2033                .ok_or_else(|| Error::InternalError("Failed to create a new chain"))?;
2034            let description = bcs::from_bytes::<ChainDescription>(chain_blob.bytes())?;
2035            // Add the new chain to the list of tracked chains
2036            self.client.track_chain(description.id());
2037            self.client
2038                .local_node
2039                .retry_pending_cross_chain_requests(self.chain_id)
2040                .await?;
2041            return Ok(ClientOutcome::Committed((description, certificate)));
2042        }
2043    }
2044
2045    /// Closes the chain (and loses everything in it!!).
2046    /// Returns `None` if the chain was already closed.
2047    #[instrument(level = "trace")]
2048    pub async fn close_chain(
2049        &self,
2050    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
2051        match self.execute_operation(SystemOperation::CloseChain).await {
2052            Ok(outcome) => Ok(outcome.map(Some)),
2053            Err(Error::LocalNodeError(LocalNodeError::WorkerError(WorkerError::ChainError(
2054                chain_error,
2055            )))) if matches!(*chain_error, ChainError::ClosedChain) => {
2056                Ok(ClientOutcome::Committed(None)) // Chain is already closed.
2057            }
2058            Err(error) => Err(error),
2059        }
2060    }
2061
2062    /// Publishes some module.
2063    #[cfg(not(target_arch = "wasm32"))]
2064    #[instrument(level = "trace", skip(contract, service))]
2065    pub async fn publish_module(
2066        &self,
2067        contract: Bytecode,
2068        service: Bytecode,
2069        vm_runtime: VmRuntime,
2070    ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, Error> {
2071        let (blobs, module_id) = super::create_bytecode_blobs(contract, service, vm_runtime).await;
2072        self.publish_module_blobs(blobs, module_id).await
2073    }
2074
2075    /// Publishes some module.
2076    #[cfg(not(target_arch = "wasm32"))]
2077    #[instrument(level = "trace", skip(blobs, module_id))]
2078    pub async fn publish_module_blobs(
2079        &self,
2080        blobs: Vec<Blob>,
2081        module_id: ModuleId,
2082    ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, Error> {
2083        self.execute_operations(
2084            vec![Operation::system(SystemOperation::PublishModule {
2085                module_id,
2086            })],
2087            blobs,
2088        )
2089        .await?
2090        .try_map(|certificate| Ok((module_id, certificate)))
2091    }
2092
2093    /// Publishes some data blobs.
2094    #[instrument(level = "trace", skip(bytes))]
2095    pub async fn publish_data_blobs(
2096        &self,
2097        bytes: Vec<Vec<u8>>,
2098    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2099        let blobs = bytes.into_iter().map(Blob::new_data);
2100        let publish_blob_operations = blobs
2101            .clone()
2102            .map(|blob| {
2103                Operation::system(SystemOperation::PublishDataBlob {
2104                    blob_hash: blob.id().hash,
2105                })
2106            })
2107            .collect();
2108        self.execute_operations(publish_blob_operations, blobs.collect())
2109            .await
2110    }
2111
2112    /// Publishes some data blob.
2113    #[instrument(level = "trace", skip(bytes))]
2114    pub async fn publish_data_blob(
2115        &self,
2116        bytes: Vec<u8>,
2117    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2118        self.publish_data_blobs(vec![bytes]).await
2119    }
2120
2121    /// Creates an application by instantiating some bytecode.
2122    #[instrument(
2123        level = "trace",
2124        skip(self, parameters, instantiation_argument, required_application_ids)
2125    )]
2126    pub async fn create_application<
2127        A: Abi,
2128        Parameters: Serialize,
2129        InstantiationArgument: Serialize,
2130    >(
2131        &self,
2132        module_id: ModuleId<A, Parameters, InstantiationArgument>,
2133        parameters: &Parameters,
2134        instantiation_argument: &InstantiationArgument,
2135        required_application_ids: Vec<ApplicationId>,
2136    ) -> Result<ClientOutcome<(ApplicationId<A>, ConfirmedBlockCertificate)>, Error> {
2137        let instantiation_argument = serde_json::to_vec(instantiation_argument)?;
2138        let parameters = serde_json::to_vec(parameters)?;
2139        Ok(self
2140            .create_application_untyped(
2141                module_id.forget_abi(),
2142                parameters,
2143                instantiation_argument,
2144                required_application_ids,
2145            )
2146            .await?
2147            .map(|(app_id, cert)| (app_id.with_abi(), cert)))
2148    }
2149
2150    /// Creates an application by instantiating some bytecode.
2151    #[instrument(
2152        level = "trace",
2153        skip(
2154            self,
2155            module_id,
2156            parameters,
2157            instantiation_argument,
2158            required_application_ids
2159        )
2160    )]
2161    pub async fn create_application_untyped(
2162        &self,
2163        module_id: ModuleId,
2164        parameters: Vec<u8>,
2165        instantiation_argument: Vec<u8>,
2166        required_application_ids: Vec<ApplicationId>,
2167    ) -> Result<ClientOutcome<(ApplicationId, ConfirmedBlockCertificate)>, Error> {
2168        self.execute_operation(SystemOperation::CreateApplication {
2169            module_id,
2170            parameters,
2171            instantiation_argument,
2172            required_application_ids,
2173        })
2174        .await?
2175        .try_map(|certificate| {
2176            // The first message of the only operation created the application.
2177            let mut creation: Vec<_> = certificate
2178                .block()
2179                .created_blob_ids()
2180                .into_iter()
2181                .filter(|blob_id| blob_id.blob_type == BlobType::ApplicationDescription)
2182                .collect();
2183            if creation.len() > 1 {
2184                return Err(Error::InternalError(
2185                    "Unexpected number of application descriptions published",
2186                ));
2187            }
2188            let blob_id = creation.pop().ok_or(Error::InternalError(
2189                "ApplicationDescription blob not found.",
2190            ))?;
2191            let id = ApplicationId::new(blob_id.hash);
2192            Ok((id, certificate))
2193        })
2194    }
2195
2196    /// Creates a new committee and starts using it (admin chains only).
2197    #[instrument(level = "trace", skip(committee))]
2198    pub async fn stage_new_committee(
2199        &self,
2200        committee: Committee,
2201    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2202        let blob = Blob::new(BlobContent::new_committee(bcs::to_bytes(&committee)?));
2203        let blob_hash = blob.id().hash;
2204        match self
2205            .execute_operations(
2206                vec![Operation::system(SystemOperation::Admin(
2207                    AdminOperation::PublishCommitteeBlob { blob_hash },
2208                ))],
2209                vec![blob],
2210            )
2211            .await?
2212        {
2213            ClientOutcome::Committed(_) => {}
2214            outcome @ ClientOutcome::WaitForTimeout(_) => return Ok(outcome),
2215        }
2216        let epoch = self.chain_info().await?.epoch.try_add_one()?;
2217        self.execute_operation(SystemOperation::Admin(AdminOperation::CreateCommittee {
2218            epoch,
2219            blob_hash,
2220        }))
2221        .await
2222    }
2223
2224    /// Synchronizes the chain with the validators and creates blocks without any operations to
2225    /// process all incoming messages. This may require several blocks.
2226    ///
2227    /// If not all certificates could be processed due to a timeout, the timestamp for when to retry
2228    /// is returned, too.
2229    #[instrument(level = "trace")]
2230    pub async fn process_inbox(
2231        &self,
2232    ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), Error> {
2233        self.prepare_chain().await?;
2234        self.process_inbox_without_prepare().await
2235    }
2236
2237    /// Creates blocks without any operations to process all incoming messages. This may require
2238    /// several blocks.
2239    ///
2240    /// If not all certificates could be processed due to a timeout, the timestamp for when to retry
2241    /// is returned, too.
2242    #[instrument(level = "trace")]
2243    pub async fn process_inbox_without_prepare(
2244        &self,
2245    ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), Error> {
2246        #[cfg(with_metrics)]
2247        let _latency = super::metrics::PROCESS_INBOX_WITHOUT_PREPARE_LATENCY.measure_latency();
2248
2249        let mut certificates = Vec::new();
2250        loop {
2251            // We provide no operations - this means that the only operations executed
2252            // will be epoch changes, receiving messages and processing event stream
2253            // updates, if any are pending.
2254            let transactions = self.prepend_epochs_messages_and_events(vec![]).await?;
2255            // Nothing in the inbox and no stream updates to be processed.
2256            if transactions.is_empty() {
2257                return Ok((certificates, None));
2258            }
2259            match self
2260                .execute_prepared_transactions(transactions, vec![])
2261                .await
2262            {
2263                Ok(ExecuteBlockOutcome::Executed(certificate))
2264                | Ok(ExecuteBlockOutcome::Conflict(certificate)) => certificates.push(certificate),
2265                Ok(ExecuteBlockOutcome::WaitForTimeout(timeout)) => {
2266                    return Ok((certificates, Some(timeout)));
2267                }
2268                Err(error) => return Err(error),
2269            };
2270        }
2271    }
2272
2273    /// Returns operations to process all pending epoch changes: first the new epochs, in order,
2274    /// then the removed epochs, in order.
2275    async fn collect_epoch_changes(&self) -> Result<Vec<Operation>, Error> {
2276        let (mut min_epoch, mut next_epoch) = {
2277            let (epoch, committees) = self.epoch_and_committees().await?;
2278            let min_epoch = *committees.keys().next().unwrap_or(&Epoch::ZERO);
2279            (min_epoch, epoch.try_add_one()?)
2280        };
2281        let mut epoch_change_ops = Vec::new();
2282        while self
2283            .has_admin_event(EPOCH_STREAM_NAME, next_epoch.0)
2284            .await?
2285        {
2286            epoch_change_ops.push(Operation::system(SystemOperation::ProcessNewEpoch(
2287                next_epoch,
2288            )));
2289            next_epoch.try_add_assign_one()?;
2290        }
2291        while self
2292            .has_admin_event(REMOVED_EPOCH_STREAM_NAME, min_epoch.0)
2293            .await?
2294        {
2295            epoch_change_ops.push(Operation::system(SystemOperation::ProcessRemovedEpoch(
2296                min_epoch,
2297            )));
2298            min_epoch.try_add_assign_one()?;
2299        }
2300        Ok(epoch_change_ops)
2301    }
2302
2303    /// Returns whether the system event on the admin chain with the given stream name and key
2304    /// exists in storage.
2305    async fn has_admin_event(&self, stream_name: &[u8], index: u32) -> Result<bool, Error> {
2306        let event_id = EventId {
2307            chain_id: self.client.admin_id,
2308            stream_id: StreamId::system(stream_name),
2309            index,
2310        };
2311        Ok(self
2312            .client
2313            .storage_client()
2314            .read_event(event_id)
2315            .await?
2316            .is_some())
2317    }
2318
2319    /// Returns the indices and events from the storage
2320    pub async fn events_from_index(
2321        &self,
2322        stream_id: StreamId,
2323        start_index: u32,
2324    ) -> Result<Vec<IndexAndEvent>, Error> {
2325        Ok(self
2326            .client
2327            .storage_client()
2328            .read_events_from_index(&self.chain_id, &stream_id, start_index)
2329            .await?)
2330    }
2331
2332    /// Deprecates all the configurations of voting rights up to the given one (admin chains
2333    /// only). Currently, each individual chain is still entitled to wait before accepting
2334    /// this command. However, it is expected that deprecated validators stop functioning
2335    /// shortly after such command is issued.
2336    #[instrument(level = "trace")]
2337    pub async fn revoke_epochs(
2338        &self,
2339        revoked_epoch: Epoch,
2340    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2341        self.prepare_chain().await?;
2342        let (current_epoch, committees) = self.epoch_and_committees().await?;
2343        ensure!(
2344            revoked_epoch < current_epoch,
2345            Error::CannotRevokeCurrentEpoch(current_epoch)
2346        );
2347        ensure!(
2348            committees.contains_key(&revoked_epoch),
2349            Error::EpochAlreadyRevoked
2350        );
2351        let operations = committees
2352            .keys()
2353            .filter_map(|epoch| {
2354                if *epoch <= revoked_epoch {
2355                    Some(Operation::system(SystemOperation::Admin(
2356                        AdminOperation::RemoveCommittee { epoch: *epoch },
2357                    )))
2358                } else {
2359                    None
2360                }
2361            })
2362            .collect();
2363        self.execute_operations(operations, vec![]).await
2364    }
2365
2366    /// Sends money to a chain.
2367    /// Do not check balance. (This may block the client)
2368    /// Do not confirm the transaction.
2369    #[instrument(level = "trace")]
2370    pub async fn transfer_to_account_unsafe_unconfirmed(
2371        &self,
2372        owner: AccountOwner,
2373        amount: Amount,
2374        recipient: Account,
2375    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2376        self.execute_operation(SystemOperation::Transfer {
2377            owner,
2378            recipient,
2379            amount,
2380        })
2381        .await
2382    }
2383
2384    #[instrument(level = "trace", skip(hash))]
2385    pub async fn read_confirmed_block(&self, hash: CryptoHash) -> Result<ConfirmedBlock, Error> {
2386        let block = self
2387            .client
2388            .storage_client()
2389            .read_confirmed_block(hash)
2390            .await?;
2391        block.ok_or(Error::MissingConfirmedBlock(hash))
2392    }
2393
2394    /// Handles any cross-chain requests for any pending outgoing messages.
2395    #[instrument(level = "trace")]
2396    pub async fn retry_pending_outgoing_messages(&self) -> Result<(), Error> {
2397        self.client
2398            .local_node
2399            .retry_pending_cross_chain_requests(self.chain_id)
2400            .await?;
2401        Ok(())
2402    }
2403
2404    #[instrument(level = "trace", skip(local_node))]
2405    async fn local_chain_info(
2406        &self,
2407        chain_id: ChainId,
2408        local_node: &mut LocalNodeClient<Env::Storage>,
2409    ) -> Result<Option<Box<ChainInfo>>, Error> {
2410        match local_node.chain_info(chain_id).await {
2411            Ok(info) => {
2412                // Useful in case `chain_id` is the same as a local chain.
2413                self.client.update_from_info(&info);
2414                Ok(Some(info))
2415            }
2416            Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => Ok(None),
2417            Err(err) => Err(err.into()),
2418        }
2419    }
2420
2421    #[instrument(level = "trace", skip(chain_id, local_node))]
2422    async fn local_next_block_height(
2423        &self,
2424        chain_id: ChainId,
2425        local_node: &mut LocalNodeClient<Env::Storage>,
2426    ) -> Result<BlockHeight, Error> {
2427        Ok(self
2428            .local_chain_info(chain_id, local_node)
2429            .await?
2430            .map_or(BlockHeight::ZERO, |info| info.next_block_height))
2431    }
2432
2433    /// Returns the next height we expect to receive from the given sender chain, according to the
2434    /// local inbox.
2435    #[instrument(level = "trace")]
2436    async fn local_next_height_to_receive(&self, origin: ChainId) -> Result<BlockHeight, Error> {
2437        Ok(self
2438            .client
2439            .local_node
2440            .get_inbox_next_height(self.chain_id, origin)
2441            .await?)
2442    }
2443
2444    #[instrument(level = "trace", skip(remote_node, local_node, notification))]
2445    async fn process_notification(
2446        &self,
2447        remote_node: RemoteNode<Env::ValidatorNode>,
2448        mut local_node: LocalNodeClient<Env::Storage>,
2449        notification: Notification,
2450        listening_mode: &ListeningMode,
2451    ) -> Result<(), Error> {
2452        match notification.reason {
2453            Reason::NewIncomingBundle { origin, height } => {
2454                if self.local_next_height_to_receive(origin).await? > height {
2455                    debug!(
2456                        chain_id = %self.chain_id,
2457                        "Accepting redundant notification for new message"
2458                    );
2459                    return Ok(());
2460                }
2461                self.client
2462                    .download_sender_block_with_sending_ancestors(
2463                        self.chain_id,
2464                        origin,
2465                        height,
2466                        &remote_node,
2467                    )
2468                    .await?;
2469                if self.local_next_height_to_receive(origin).await? <= height {
2470                    info!(
2471                        chain_id = %self.chain_id,
2472                        "NewIncomingBundle: Fail to synchronize new message after notification"
2473                    );
2474                }
2475            }
2476            Reason::NewBlock { height, .. } => {
2477                let chain_id = notification.chain_id;
2478                if self
2479                    .local_next_block_height(chain_id, &mut local_node)
2480                    .await?
2481                    > height
2482                {
2483                    debug!(
2484                        chain_id = %self.chain_id,
2485                        "Accepting redundant notification for new block"
2486                    );
2487                    return Ok(());
2488                }
2489                match listening_mode {
2490                    ListeningMode::FullChain => {
2491                        self.client
2492                            .synchronize_chain_state_from(&remote_node, chain_id)
2493                            .await?;
2494                        if self
2495                            .local_next_block_height(chain_id, &mut local_node)
2496                            .await?
2497                            <= height
2498                        {
2499                            info!("NewBlock: Fail to synchronize new block after notification");
2500                        }
2501                        trace!(
2502                            chain_id = %self.chain_id,
2503                            %height,
2504                            "NewBlock: processed notification",
2505                        );
2506                    }
2507                    ListeningMode::EventsOnly(_) => {
2508                        debug!(
2509                            chain_id = %self.chain_id,
2510                            %height,
2511                            "NewBlock: ignoring notification due to listening in EventsOnly mode"
2512                        );
2513                    }
2514                }
2515            }
2516            Reason::NewEvents {
2517                height,
2518                hash,
2519                event_streams,
2520            } => {
2521                if self
2522                    .local_next_block_height(notification.chain_id, &mut local_node)
2523                    .await?
2524                    > height
2525                {
2526                    debug!(
2527                        chain_id = %self.chain_id,
2528                        "Accepting redundant notification for new block"
2529                    );
2530                    return Ok(());
2531                }
2532                let should_process = match listening_mode {
2533                    ListeningMode::FullChain => true,
2534                    ListeningMode::EventsOnly(relevant_events) => relevant_events
2535                        .intersection(&event_streams)
2536                        .next()
2537                        .is_some(),
2538                };
2539                if !should_process {
2540                    debug!(
2541                        chain_id = %self.chain_id,
2542                        %height,
2543                        "NewEvents: got a notification, but no relevant event streams in it"
2544                    );
2545                    return Ok(());
2546                }
2547                trace!(
2548                    chain_id = %self.chain_id,
2549                    %height,
2550                    "NewEvents: processing notification"
2551                );
2552                let mut certificates = remote_node.node.download_certificates(vec![hash]).await?;
2553                // download_certificates ensures that we will get exactly one
2554                // certificate in the result
2555                let certificate = certificates
2556                    .pop()
2557                    .expect("download_certificates should have returned one certificate");
2558                self.client
2559                    .receive_sender_certificate(
2560                        certificate,
2561                        ReceiveCertificateMode::NeedsCheck,
2562                        None,
2563                    )
2564                    .await?;
2565            }
2566            Reason::NewRound { height, round } => {
2567                if matches!(listening_mode, ListeningMode::EventsOnly(_)) {
2568                    debug!("NewRound: ignoring a notification due to listening mode");
2569                    return Ok(());
2570                }
2571                let chain_id = notification.chain_id;
2572                if let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? {
2573                    if (info.next_block_height, info.manager.current_round) >= (height, round) {
2574                        debug!(
2575                            chain_id = %self.chain_id,
2576                            "Accepting redundant notification for new round"
2577                        );
2578                        return Ok(());
2579                    }
2580                }
2581                self.client
2582                    .synchronize_chain_state_from(&remote_node, chain_id)
2583                    .await?;
2584                let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? else {
2585                    error!(
2586                        chain_id = %self.chain_id,
2587                        "NewRound: Fail to read local chain info for {chain_id}"
2588                    );
2589                    return Ok(());
2590                };
2591                if (info.next_block_height, info.manager.current_round) < (height, round) {
2592                    error!(
2593                        chain_id = %self.chain_id,
2594                        "NewRound: Fail to synchronize new block after notification"
2595                    );
2596                }
2597            }
2598        }
2599        Ok(())
2600    }
2601
2602    /// Returns whether this chain is tracked by the client, i.e. we are updating its inbox.
2603    pub fn is_tracked(&self) -> bool {
2604        self.client
2605            .tracked_chains
2606            .read()
2607            .unwrap()
2608            .contains(&self.chain_id)
2609    }
2610
2611    /// Spawns a task that listens to notifications about the current chain from all validators,
2612    /// and synchronizes the local state accordingly.
2613    #[instrument(level = "trace", fields(chain_id = ?self.chain_id))]
2614    pub async fn listen(
2615        &self,
2616        listening_mode: ListeningMode,
2617    ) -> Result<(impl Future<Output = ()>, AbortOnDrop, NotificationStream), Error> {
2618        use future::FutureExt as _;
2619
2620        async fn await_while_polling<F: FusedFuture>(
2621            future: F,
2622            background_work: impl FusedStream<Item = ()>,
2623        ) -> F::Output {
2624            tokio::pin!(future);
2625            tokio::pin!(background_work);
2626            loop {
2627                futures::select! {
2628                    _ = background_work.next() => (),
2629                    result = future => return result,
2630                }
2631            }
2632        }
2633
2634        let mut senders = HashMap::new(); // Senders to cancel notification streams.
2635        let notifications = self.subscribe()?;
2636        let (abortable_notifications, abort) = stream::abortable(self.subscribe()?);
2637
2638        // Beware: if this future ceases to make progress, notification processing will
2639        // deadlock, because of the issue described in
2640        // https://github.com/linera-io/linera-protocol/pull/1173.
2641
2642        // TODO(#2013): replace this lock with an asynchronous communication channel
2643
2644        let mut process_notifications = FuturesUnordered::new();
2645
2646        match self
2647            .update_notification_streams(&mut senders, &listening_mode)
2648            .await
2649        {
2650            Ok(handler) => process_notifications.push(handler),
2651            Err(error) => error!("Failed to update committee: {error}"),
2652        };
2653
2654        let this = self.clone();
2655        let update_streams = async move {
2656            let mut abortable_notifications = abortable_notifications.fuse();
2657
2658            while let Some(notification) =
2659                await_while_polling(abortable_notifications.next(), &mut process_notifications)
2660                    .await
2661            {
2662                if let Reason::NewBlock { .. } = notification.reason {
2663                    match Box::pin(await_while_polling(
2664                        this.update_notification_streams(&mut senders, &listening_mode)
2665                            .fuse(),
2666                        &mut process_notifications,
2667                    ))
2668                    .await
2669                    {
2670                        Ok(handler) => process_notifications.push(handler),
2671                        Err(error) => error!("Failed to update committee: {error}"),
2672                    }
2673                }
2674            }
2675
2676            for abort in senders.into_values() {
2677                abort.abort();
2678            }
2679
2680            let () = process_notifications.collect().await;
2681        }
2682        .in_current_span();
2683
2684        Ok((update_streams, AbortOnDrop(abort), notifications))
2685    }
2686
2687    #[instrument(level = "trace", skip(senders))]
2688    async fn update_notification_streams(
2689        &self,
2690        senders: &mut HashMap<ValidatorPublicKey, AbortHandle>,
2691        listening_mode: &ListeningMode,
2692    ) -> Result<impl Future<Output = ()>, Error> {
2693        let (nodes, local_node) = {
2694            let committee = self.local_committee().await?;
2695            let nodes: HashMap<_, _> = self
2696                .client
2697                .validator_node_provider()
2698                .make_nodes(&committee)?
2699                .collect();
2700            (nodes, self.client.local_node.clone())
2701        };
2702        // Drop removed validators.
2703        senders.retain(|validator, abort| {
2704            if !nodes.contains_key(validator) {
2705                abort.abort();
2706            }
2707            !abort.is_aborted()
2708        });
2709        // Add tasks for new validators.
2710        let validator_tasks = FuturesUnordered::new();
2711        for (public_key, node) in nodes {
2712            let hash_map::Entry::Vacant(entry) = senders.entry(public_key) else {
2713                continue;
2714            };
2715            let this = self.clone();
2716            let stream = stream::once({
2717                let node = node.clone();
2718                async move {
2719                    let stream = node.subscribe(vec![this.chain_id]).await?;
2720                    // Only now the notification stream is established. We may have missed
2721                    // notifications since the last time we synchronized.
2722                    let remote_node = RemoteNode { public_key, node };
2723                    this.client
2724                        .synchronize_chain_state_from(&remote_node, this.chain_id)
2725                        .await?;
2726                    Ok::<_, Error>(stream)
2727                }
2728            })
2729            .filter_map(move |result| async move {
2730                if let Err(error) = &result {
2731                    info!(?error, "Could not connect to validator {public_key}");
2732                } else {
2733                    debug!("Connected to validator {public_key}");
2734                }
2735                result.ok()
2736            })
2737            .flatten();
2738            let (stream, abort) = stream::abortable(stream);
2739            let mut stream = Box::pin(stream);
2740            let this = self.clone();
2741            let local_node = local_node.clone();
2742            let remote_node = RemoteNode { public_key, node };
2743            let listening_mode_cloned = listening_mode.clone();
2744            validator_tasks.push(async move {
2745                while let Some(notification) = stream.next().await {
2746                    if let Err(err) = this
2747                        .process_notification(
2748                            remote_node.clone(),
2749                            local_node.clone(),
2750                            notification.clone(),
2751                            &listening_mode_cloned,
2752                        )
2753                        .await
2754                    {
2755                        tracing::info!(
2756                            chain_id = %this.chain_id,
2757                            validator_public_key = ?remote_node.public_key,
2758                            ?notification,
2759                            "Failed to process notification: {err}",
2760                        );
2761                    }
2762                }
2763            });
2764            entry.insert(abort);
2765        }
2766        Ok(validator_tasks.collect())
2767    }
2768
2769    /// Attempts to update a validator with the local information.
2770    #[instrument(level = "trace", skip(remote_node))]
2771    pub async fn sync_validator(&self, remote_node: Env::ValidatorNode) -> Result<(), Error> {
2772        let validator_next_block_height = match remote_node
2773            .handle_chain_info_query(ChainInfoQuery::new(self.chain_id))
2774            .await
2775        {
2776            Ok(info) => info.info.next_block_height.0,
2777            Err(NodeError::BlobsNotFound(_)) => 0,
2778            Err(err) => return Err(err.into()),
2779        };
2780        let local_chain_state = self.chain_info().await?;
2781
2782        let Some(missing_certificate_count) = local_chain_state
2783            .next_block_height
2784            .0
2785            .checked_sub(validator_next_block_height)
2786            .filter(|count| *count > 0)
2787        else {
2788            debug!("Validator is up-to-date with local state");
2789            return Ok(());
2790        };
2791
2792        let missing_certificates_end = usize::try_from(local_chain_state.next_block_height.0)
2793            .expect("`usize` should be at least `u64`");
2794        let missing_certificates_start = missing_certificates_end
2795            - usize::try_from(missing_certificate_count).expect("`usize` should be at least `u64`");
2796
2797        let missing_certificate_hashes = self
2798            .chain_state_view()
2799            .await?
2800            .confirmed_log
2801            .read(missing_certificates_start..missing_certificates_end)
2802            .await?;
2803
2804        let certificates = self
2805            .client
2806            .storage_client()
2807            .read_certificates(missing_certificate_hashes.clone())
2808            .await?;
2809        let certificates =
2810            match ResultReadCertificates::new(certificates, missing_certificate_hashes) {
2811                ResultReadCertificates::Certificates(certificates) => certificates,
2812                ResultReadCertificates::InvalidHashes(hashes) => {
2813                    return Err(Error::ReadCertificatesError(hashes))
2814                }
2815            };
2816        for certificate in certificates {
2817            match remote_node
2818                .handle_confirmed_certificate(
2819                    certificate.clone(),
2820                    CrossChainMessageDelivery::NonBlocking,
2821                )
2822                .await
2823            {
2824                Ok(_) => (),
2825                Err(NodeError::BlobsNotFound(missing_blob_ids)) => {
2826                    // Upload the missing blobs we have and retry.
2827                    let missing_blobs: Vec<_> = self
2828                        .client
2829                        .storage_client()
2830                        .read_blobs(&missing_blob_ids)
2831                        .await?
2832                        .into_iter()
2833                        .flatten()
2834                        .collect();
2835                    remote_node.upload_blobs(missing_blobs).await?;
2836                    remote_node
2837                        .handle_confirmed_certificate(
2838                            certificate,
2839                            CrossChainMessageDelivery::NonBlocking,
2840                        )
2841                        .await?;
2842                }
2843                Err(err) => return Err(err.into()),
2844            }
2845        }
2846
2847        Ok(())
2848    }
2849}
2850
2851#[cfg(with_testing)]
2852impl<Env: Environment> ChainClient<Env> {
2853    pub async fn process_notification_from(
2854        &self,
2855        notification: Notification,
2856        validator: (ValidatorPublicKey, &str),
2857    ) {
2858        let mut node_list = self
2859            .client
2860            .validator_node_provider()
2861            .make_nodes_from_list(vec![validator])
2862            .unwrap();
2863        let (public_key, node) = node_list.next().unwrap();
2864        let remote_node = RemoteNode { node, public_key };
2865        let local_node = self.client.local_node.clone();
2866        self.process_notification(
2867            remote_node,
2868            local_node,
2869            notification,
2870            &ListeningMode::FullChain,
2871        )
2872        .await
2873        .unwrap();
2874    }
2875}