linera_core/client/chain_client/
mod.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4mod state;
5use std::{
6    collections::{hash_map, BTreeMap, BTreeSet, HashMap},
7    convert::Infallible,
8    iter,
9    sync::Arc,
10};
11
12use custom_debug_derive::Debug;
13use futures::{
14    future::{self, Either, FusedFuture, Future},
15    stream::{self, AbortHandle, FusedStream, FuturesUnordered, StreamExt, TryStreamExt},
16};
17#[cfg(with_metrics)]
18use linera_base::prometheus_util::MeasureLatency as _;
19use linera_base::{
20    abi::Abi,
21    crypto::{signer, AccountPublicKey, CryptoHash, Signer, ValidatorPublicKey},
22    data_types::{
23        Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
24        ChainDescription, Epoch, MessagePolicy, Round, Timestamp,
25    },
26    ensure,
27    identifiers::{
28        Account, AccountOwner, ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent,
29        ModuleId, StreamId,
30    },
31    ownership::{ChainOwnership, TimeoutConfig},
32    time::{Duration, Instant},
33};
34#[cfg(not(target_arch = "wasm32"))]
35use linera_base::{data_types::Bytecode, vm::VmRuntime};
36use linera_chain::{
37    data_types::{
38        BlockProposal, BundleExecutionPolicy, ChainAndHeight, IncomingBundle, ProposedBlock,
39        Transaction,
40    },
41    manager::LockingBlock,
42    types::{
43        Block, ConfirmedBlock, ConfirmedBlockCertificate, Timeout, TimeoutCertificate,
44        ValidatedBlock,
45    },
46    ChainError, ChainExecutionContext, ChainStateView,
47};
48use linera_execution::{
49    committee::Committee,
50    system::{
51        AdminOperation, OpenChainConfig, SystemOperation, EPOCH_STREAM_NAME,
52        REMOVED_EPOCH_STREAM_NAME,
53    },
54    ExecutionError, Operation, Query, QueryOutcome, QueryResponse, SystemQuery, SystemResponse,
55};
56use linera_storage::{Clock as _, Storage as _};
57use linera_views::ViewError;
58use rand::seq::SliceRandom;
59use serde::Serialize;
60pub use state::State;
61use thiserror::Error;
62use tokio::sync::{mpsc, OwnedRwLockReadGuard};
63use tokio_stream::wrappers::UnboundedReceiverStream;
64use tracing::{debug, error, info, instrument, trace, warn, Instrument as _};
65
66use super::{
67    received_log::ReceivedLogs, validator_trackers::ValidatorTrackers, AbortOnDrop, Client,
68    ListeningMode, PendingProposal, ReceiveCertificateMode, TimingType,
69};
70use crate::{
71    data_types::{ChainInfo, ChainInfoQuery, ClientOutcome, RoundTimeout},
72    environment::Environment,
73    local_node::{LocalChainInfoExt as _, LocalNodeClient, LocalNodeError},
74    node::{
75        CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
76        ValidatorNodeProvider as _,
77    },
78    remote_node::RemoteNode,
79    updater::{communicate_with_quorum, CommunicateAction, CommunicationError},
80    worker::{Notification, Reason, WorkerError},
81};
82
83#[derive(Debug, Clone)]
84pub struct Options {
85    /// Maximum number of pending message bundles processed at a time in a block.
86    pub max_pending_message_bundles: usize,
87    /// Maximum number of message bundles to discard from a block proposal due to block limit
88    /// errors before discarding all remaining bundles.
89    ///
90    /// Discarded bundles can be retried in the next block.
91    pub max_block_limit_errors: u32,
92    /// The policy for automatically handling incoming messages.
93    pub message_policy: MessagePolicy,
94    /// Whether to block on cross-chain message delivery.
95    pub cross_chain_message_delivery: CrossChainMessageDelivery,
96    /// An additional delay, after reaching a quorum, to wait for additional validator signatures,
97    /// as a fraction of time taken to reach quorum.
98    pub quorum_grace_period: f64,
99    /// The delay when downloading a blob, after which we try a second validator.
100    pub blob_download_timeout: Duration,
101    /// The delay when downloading a batch of certificates, after which we try a second validator.
102    pub certificate_batch_download_timeout: Duration,
103    /// Maximum number of certificates that we download at a time from one validator when
104    /// synchronizing one of our chains.
105    pub certificate_download_batch_size: u64,
106    /// Maximum number of sender certificates we try to download and receive in one go
107    /// when syncing sender chains.
108    pub sender_certificate_download_batch_size: usize,
109    /// Maximum number of tasks that can be joined concurrently using buffer_unordered.
110    pub max_joined_tasks: usize,
111    /// Whether to allow creating blocks in the fast round. Fast blocks have lower latency but
112    /// must be used carefully so that there are never any conflicting fast block proposals.
113    pub allow_fast_blocks: bool,
114}
115
116#[cfg(with_testing)]
117impl Options {
118    pub fn test_default() -> Self {
119        use super::{
120            DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE, DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
121        };
122        use crate::DEFAULT_QUORUM_GRACE_PERIOD;
123
124        Options {
125            max_pending_message_bundles: 10,
126            max_block_limit_errors: 3,
127            message_policy: MessagePolicy::new_accept_all(),
128            cross_chain_message_delivery: CrossChainMessageDelivery::NonBlocking,
129            quorum_grace_period: DEFAULT_QUORUM_GRACE_PERIOD,
130            blob_download_timeout: Duration::from_secs(1),
131            certificate_batch_download_timeout: Duration::from_secs(1),
132            certificate_download_batch_size: DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
133            sender_certificate_download_batch_size: DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
134            max_joined_tasks: 100,
135            allow_fast_blocks: false,
136        }
137    }
138}
139
140/// Client to operate a chain by interacting with validators and the given local storage
141/// implementation.
142/// * The chain being operated is called the "local chain" or just the "chain".
143/// * As a rule, operations are considered successful (and communication may stop) when
144///   they succeeded in gathering a quorum of responses.
145#[derive(Debug)]
146pub struct ChainClient<Env: Environment> {
147    /// The Linera [`Client`] that manages operations for this chain client.
148    #[debug(skip)]
149    pub(crate) client: Arc<Client<Env>>,
150    /// The off-chain chain ID.
151    chain_id: ChainId,
152    /// The client options.
153    #[debug(skip)]
154    options: Options,
155    /// The preferred owner of the chain used to sign proposals.
156    /// `None` if we cannot propose on this chain.
157    preferred_owner: Option<AccountOwner>,
158    /// The next block height as read from the wallet.
159    initial_next_block_height: BlockHeight,
160    /// The last block hash as read from the wallet.
161    initial_block_hash: Option<CryptoHash>,
162    /// Optional timing sender for benchmarking.
163    timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
164}
165
166impl<Env: Environment> Clone for ChainClient<Env> {
167    fn clone(&self) -> Self {
168        Self {
169            client: self.client.clone(),
170            chain_id: self.chain_id,
171            options: self.options.clone(),
172            preferred_owner: self.preferred_owner,
173            initial_next_block_height: self.initial_next_block_height,
174            initial_block_hash: self.initial_block_hash,
175            timing_sender: self.timing_sender.clone(),
176        }
177    }
178}
179
180/// Error type for [`ChainClient`].
181#[derive(Debug, Error)]
182pub enum Error {
183    #[error("Local node operation failed: {0}")]
184    LocalNodeError(#[from] LocalNodeError),
185
186    #[error("Remote node operation failed: {0}")]
187    RemoteNodeError(#[from] NodeError),
188
189    #[error(transparent)]
190    ArithmeticError(#[from] ArithmeticError),
191
192    #[error("Missing certificates: {0:?}")]
193    ReadCertificatesError(Vec<CryptoHash>),
194
195    #[error("Missing confirmed block: {0:?}")]
196    MissingConfirmedBlock(CryptoHash),
197
198    #[error("JSON (de)serialization error: {0}")]
199    JsonError(#[from] serde_json::Error),
200
201    #[error("Chain operation failed: {0}")]
202    ChainError(#[from] ChainError),
203
204    #[error(transparent)]
205    CommunicationError(#[from] CommunicationError<NodeError>),
206
207    #[error("Internal error within chain client: {0}")]
208    InternalError(&'static str),
209
210    #[error(
211        "Cannot accept a certificate from an unknown committee in the future. \
212         Please synchronize the local view of the admin chain"
213    )]
214    CommitteeSynchronizationError,
215
216    #[error("The local node is behind the trusted state in wallet and needs synchronization with validators")]
217    WalletSynchronizationError,
218
219    #[error("The state of the client is incompatible with the proposed block: {0}")]
220    BlockProposalError(&'static str),
221
222    #[error(
223        "Cannot accept a certificate from a committee that was retired. \
224         Try a newer certificate from the same origin"
225    )]
226    CommitteeDeprecationError,
227
228    #[error("Protocol error within chain client: {0}")]
229    ProtocolError(&'static str),
230
231    #[error("Signer doesn't have key to sign for chain {0}")]
232    CannotFindKeyForChain(ChainId),
233
234    #[error("client is not configured to propose on chain {0}")]
235    NoAccountKeyConfigured(ChainId),
236
237    #[error("The chain client isn't owner on chain {0}")]
238    NotAnOwner(ChainId),
239
240    #[error(transparent)]
241    ViewError(#[from] ViewError),
242
243    #[error(
244        "Failed to download certificates and update local node to the next height \
245         {target_next_block_height} of chain {chain_id}"
246    )]
247    CannotDownloadCertificates {
248        chain_id: ChainId,
249        target_next_block_height: BlockHeight,
250    },
251
252    #[error(transparent)]
253    BcsError(#[from] bcs::Error),
254
255    #[error(
256        "Unexpected quorum: validators voted for block hash {hash} in {round}, \
257         expected block hash {expected_hash} in {expected_round}"
258    )]
259    UnexpectedQuorum {
260        hash: CryptoHash,
261        round: Round,
262        expected_hash: CryptoHash,
263        expected_round: Round,
264    },
265
266    #[error("signer error: {0:?}")]
267    Signer(#[source] Box<dyn signer::Error>),
268
269    #[error("Cannot revoke the current epoch {0}")]
270    CannotRevokeCurrentEpoch(Epoch),
271
272    #[error("Epoch is already revoked")]
273    EpochAlreadyRevoked,
274
275    #[error("Failed to download missing sender blocks from chain {chain_id} at height {height}")]
276    CannotDownloadMissingSenderBlock {
277        chain_id: ChainId,
278        height: BlockHeight,
279    },
280
281    #[error(
282        "A different block was already committed at this height. \
283         The committed certificate hash is {0}"
284    )]
285    Conflict(CryptoHash),
286}
287
288impl From<Infallible> for Error {
289    fn from(infallible: Infallible) -> Self {
290        match infallible {}
291    }
292}
293
294impl Error {
295    pub fn signer_failure(err: impl signer::Error + 'static) -> Self {
296        Self::Signer(Box::new(err))
297    }
298}
299
300impl<Env: Environment> ChainClient<Env> {
301    pub fn new(
302        client: Arc<Client<Env>>,
303        chain_id: ChainId,
304        options: Options,
305        initial_block_hash: Option<CryptoHash>,
306        initial_next_block_height: BlockHeight,
307        preferred_owner: Option<AccountOwner>,
308        timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
309    ) -> Self {
310        ChainClient {
311            client,
312            chain_id,
313            options,
314            preferred_owner,
315            initial_block_hash,
316            initial_next_block_height,
317            timing_sender,
318        }
319    }
320
321    /// Returns whether this chain is in follow-only mode.
322    pub fn is_follow_only(&self) -> bool {
323        self.client.is_chain_follow_only(self.chain_id)
324    }
325
326    /// Gets the client mutex from the chain's state.
327    #[instrument(level = "trace", skip(self))]
328    fn client_mutex(&self) -> Arc<tokio::sync::Mutex<()>> {
329        self.client
330            .chains
331            .pin()
332            .get(&self.chain_id)
333            .expect("Chain client constructed for invalid chain")
334            .client_mutex()
335    }
336
337    /// Gets the next pending block.
338    #[instrument(level = "trace", skip(self))]
339    pub fn pending_proposal(&self) -> Option<PendingProposal> {
340        self.client
341            .chains
342            .pin()
343            .get(&self.chain_id)
344            .expect("Chain client constructed for invalid chain")
345            .pending_proposal()
346            .clone()
347    }
348
349    /// Updates the chain's state using a closure.
350    #[instrument(level = "trace", skip(self, f))]
351    fn update_state<F>(&self, f: F)
352    where
353        F: Fn(&mut State),
354    {
355        let chains = self.client.chains.pin();
356        chains
357            .update(self.chain_id, |state| {
358                let mut state = state.clone_for_update_unchecked();
359                f(&mut state);
360                state
361            })
362            .expect("Chain client constructed for invalid chain");
363    }
364
365    /// Gets a reference to the client's signer instance.
366    #[instrument(level = "trace", skip(self))]
367    pub fn signer(&self) -> &impl Signer {
368        self.client.signer()
369    }
370
371    /// Returns whether the signer has a key for the given owner.
372    pub async fn has_key_for(&self, owner: &AccountOwner) -> Result<bool, Error> {
373        self.client.has_key_for(owner).await
374    }
375
376    /// Gets a mutable reference to the per-`ChainClient` options.
377    #[instrument(level = "trace", skip(self))]
378    pub fn options_mut(&mut self) -> &mut Options {
379        &mut self.options
380    }
381
382    /// Gets a reference to the per-`ChainClient` options.
383    #[instrument(level = "trace", skip(self))]
384    pub fn options(&self) -> &Options {
385        &self.options
386    }
387
388    /// Gets the ID of the associated chain.
389    #[instrument(level = "trace", skip(self))]
390    pub fn chain_id(&self) -> ChainId {
391        self.chain_id
392    }
393
394    /// Gets a clone of the timing sender for benchmarking.
395    pub fn timing_sender(&self) -> Option<mpsc::UnboundedSender<(u64, TimingType)>> {
396        self.timing_sender.clone()
397    }
398
399    /// Gets the ID of the admin chain.
400    #[instrument(level = "trace", skip(self))]
401    pub fn admin_chain_id(&self) -> ChainId {
402        self.client.admin_chain_id
403    }
404
405    /// Gets the currently preferred owner for signing the blocks.
406    #[instrument(level = "trace", skip(self))]
407    pub fn preferred_owner(&self) -> Option<AccountOwner> {
408        self.preferred_owner
409    }
410
411    /// Sets the new, preferred owner for signing the blocks.
412    #[instrument(level = "trace", skip(self))]
413    pub fn set_preferred_owner(&mut self, preferred_owner: AccountOwner) {
414        self.preferred_owner = Some(preferred_owner);
415    }
416
417    /// Unsets the preferred owner for signing the blocks.
418    #[instrument(level = "trace", skip(self))]
419    pub fn unset_preferred_owner(&mut self) {
420        self.preferred_owner = None;
421    }
422
423    /// Obtains a `ChainStateView` for this client's chain.
424    #[instrument(level = "trace")]
425    pub async fn chain_state_view(
426        &self,
427    ) -> Result<OwnedRwLockReadGuard<ChainStateView<Env::StorageContext>>, LocalNodeError> {
428        self.client.local_node.chain_state_view(self.chain_id).await
429    }
430
431    /// Returns chain IDs that this chain subscribes to.
432    #[instrument(level = "trace", skip(self))]
433    pub async fn event_stream_publishers(
434        &self,
435    ) -> Result<BTreeMap<ChainId, BTreeSet<StreamId>>, LocalNodeError> {
436        let subscriptions = self
437            .client
438            .local_node
439            .get_event_subscriptions(self.chain_id)
440            .await?;
441        let mut publishers = subscriptions.into_iter().fold(
442            BTreeMap::<ChainId, BTreeSet<StreamId>>::new(),
443            |mut map, ((chain_id, stream_id), _)| {
444                map.entry(chain_id).or_default().insert(stream_id);
445                map
446            },
447        );
448        if self.chain_id != self.client.admin_chain_id {
449            publishers.insert(
450                self.client.admin_chain_id,
451                vec![
452                    StreamId::system(EPOCH_STREAM_NAME),
453                    StreamId::system(REMOVED_EPOCH_STREAM_NAME),
454                ]
455                .into_iter()
456                .collect(),
457            );
458        }
459        Ok(publishers)
460    }
461
462    /// Subscribes to notifications from this client's chain.
463    #[instrument(level = "trace")]
464    pub fn subscribe(&self) -> Result<NotificationStream, LocalNodeError> {
465        self.subscribe_to(self.chain_id)
466    }
467
468    /// Subscribes to notifications from the specified chain.
469    #[instrument(level = "trace")]
470    pub fn subscribe_to(&self, chain_id: ChainId) -> Result<NotificationStream, LocalNodeError> {
471        Ok(Box::pin(UnboundedReceiverStream::new(
472            self.client.notifier.subscribe(vec![chain_id]),
473        )))
474    }
475
476    /// Returns the storage client used by this client's local node.
477    #[instrument(level = "trace")]
478    pub fn storage_client(&self) -> &Env::Storage {
479        self.client.storage_client()
480    }
481
482    /// Obtains the basic `ChainInfo` data for the local chain.
483    #[instrument(level = "trace")]
484    pub async fn chain_info(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
485        let query = ChainInfoQuery::new(self.chain_id);
486        let response = self
487            .client
488            .local_node
489            .handle_chain_info_query(query)
490            .await?;
491        self.client.update_from_info(&response.info);
492        Ok(response.info)
493    }
494
495    /// Obtains the basic `ChainInfo` data for the local chain, with chain manager values.
496    #[instrument(level = "trace")]
497    pub async fn chain_info_with_manager_values(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
498        let query = ChainInfoQuery::new(self.chain_id)
499            .with_manager_values()
500            .with_committees();
501        let response = self
502            .client
503            .local_node
504            .handle_chain_info_query(query)
505            .await?;
506        self.client.update_from_info(&response.info);
507        Ok(response.info)
508    }
509
510    /// Returns the chain's description. Fetches it from the validators if necessary.
511    pub async fn get_chain_description(&self) -> Result<ChainDescription, Error> {
512        self.client.get_chain_description(self.chain_id).await
513    }
514
515    /// Obtains up to `self.options.max_pending_message_bundles` pending message bundles for the
516    /// local chain.
517    #[instrument(level = "trace")]
518    async fn pending_message_bundles(&self) -> Result<Vec<IncomingBundle>, Error> {
519        if self.options.message_policy.is_ignore() {
520            // Ignore all messages.
521            return Ok(Vec::new());
522        }
523
524        let query = ChainInfoQuery::new(self.chain_id).with_pending_message_bundles();
525        let info = self
526            .client
527            .local_node
528            .handle_chain_info_query(query)
529            .await?
530            .info;
531        if self.preferred_owner.is_some_and(|owner| {
532            info.manager
533                .ownership
534                .is_super_owner_no_regular_owners(&owner)
535        }) {
536            // There are only super owners; they are expected to sync manually.
537            ensure!(
538                info.next_block_height >= self.initial_next_block_height,
539                Error::WalletSynchronizationError
540            );
541        }
542
543        Ok(info
544            .requested_pending_message_bundles
545            .into_iter()
546            .filter_map(|bundle| bundle.apply_policy(&self.options.message_policy))
547            .take(self.options.max_pending_message_bundles)
548            .collect())
549    }
550
551    /// Returns an `UpdateStreams` operation that updates this client's chain about new events
552    /// in any of the streams its applications are subscribing to. Returns `None` if there are no
553    /// new events.
554    #[instrument(level = "trace")]
555    async fn collect_stream_updates(&self) -> Result<Option<Operation>, Error> {
556        // Load all our subscriptions.
557        let subscription_map = self
558            .client
559            .local_node
560            .get_event_subscriptions(self.chain_id)
561            .await?;
562        // Collect the indices of all new events.
563        let futures = subscription_map
564            .into_iter()
565            .filter(|((chain_id, _), _)| {
566                self.options
567                    .message_policy
568                    .restrict_chain_ids_to
569                    .as_ref()
570                    .is_none_or(|chain_set| chain_set.contains(chain_id))
571            })
572            .map(|((chain_id, stream_id), subscriptions)| {
573                let client = self.client.clone();
574                async move {
575                    let next_expected_index = client
576                        .local_node
577                        .get_next_expected_event(chain_id, stream_id.clone())
578                        .await?;
579                    if let Some(next_index) = next_expected_index
580                        .filter(|next_index| *next_index > subscriptions.next_index)
581                    {
582                        Ok(Some((chain_id, stream_id, next_index)))
583                    } else {
584                        Ok::<_, Error>(None)
585                    }
586                }
587            });
588        let updates = futures::stream::iter(futures)
589            .buffer_unordered(self.options.max_joined_tasks)
590            .try_collect::<Vec<_>>()
591            .await?
592            .into_iter()
593            .flatten()
594            .collect::<Vec<_>>();
595        if updates.is_empty() {
596            return Ok(None);
597        }
598        Ok(Some(SystemOperation::UpdateStreams(updates).into()))
599    }
600
601    #[instrument(level = "trace")]
602    async fn chain_info_with_committees(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
603        self.client.chain_info_with_committees(self.chain_id).await
604    }
605
606    /// Obtains the current epoch of the local chain as well as its set of trusted committees.
607    #[instrument(level = "trace")]
608    async fn epoch_and_committees(
609        &self,
610    ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
611        let info = self.chain_info_with_committees().await?;
612        let epoch = info.epoch;
613        let committees = info.into_committees()?;
614        Ok((epoch, committees))
615    }
616
617    /// Obtains the committee for the current epoch of the local chain.
618    #[instrument(level = "trace")]
619    pub async fn local_committee(&self) -> Result<Committee, Error> {
620        let info = match self.chain_info_with_committees().await {
621            Ok(info) => info,
622            Err(LocalNodeError::BlobsNotFound(_)) => {
623                self.synchronize_chain_state(self.chain_id).await?;
624                self.chain_info_with_committees().await?
625            }
626            Err(err) => return Err(err.into()),
627        };
628        Ok(info.into_current_committee()?)
629    }
630
631    /// Obtains the committee for the latest epoch on the admin chain.
632    #[instrument(level = "trace")]
633    pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
634        self.client.admin_committee().await
635    }
636
637    /// Obtains the identity of the current owner of the chain.
638    ///
639    /// Returns an error if we don't have the private key for the identity.
640    #[instrument(level = "trace")]
641    pub async fn identity(&self) -> Result<AccountOwner, Error> {
642        let Some(preferred_owner) = self.preferred_owner else {
643            return Err(Error::NoAccountKeyConfigured(self.chain_id));
644        };
645        let manager = self.chain_info().await?.manager;
646        ensure!(
647            manager.ownership.is_active(),
648            LocalNodeError::InactiveChain(self.chain_id)
649        );
650        let fallback_owners = if manager.ownership.has_fallback() {
651            self.local_committee()
652                .await?
653                .account_keys_and_weights()
654                .map(|(key, _)| AccountOwner::from(key))
655                .collect()
656        } else {
657            BTreeSet::new()
658        };
659
660        let is_owner = manager
661            .ownership
662            .can_propose_in_multi_leader_round(&preferred_owner)
663            || fallback_owners.contains(&preferred_owner);
664
665        if !is_owner {
666            warn!(
667                chain_id = %self.chain_id,
668                ownership = ?manager.ownership,
669                ?fallback_owners,
670                ?preferred_owner,
671                "The preferred owner is not configured as an owner of this chain",
672            );
673            return Err(Error::NotAnOwner(self.chain_id));
674        }
675
676        let has_signer = self.has_key_for(&preferred_owner).await?;
677
678        if !has_signer {
679            warn!(%self.chain_id, ?preferred_owner,
680                "Chain is one of the owners but its Signer instance doesn't contain the key",
681            );
682            return Err(Error::CannotFindKeyForChain(self.chain_id));
683        }
684
685        Ok(preferred_owner)
686    }
687
688    /// Prepares the chain for a new owner by fetching the chain description and validating access.
689    ///
690    /// This is useful when assigning a chain to a client that may not have the owner key,
691    /// e.g. when a faucet creates a chain with `open_multi_leader_rounds`.
692    ///
693    /// Returns the chain info if the owner can propose on this chain (either because they are
694    /// an owner, or because `open_multi_leader_rounds` is enabled).
695    #[instrument(level = "trace")]
696    pub async fn prepare_for_owner(&self, owner: AccountOwner) -> Result<Box<ChainInfo>, Error> {
697        ensure!(
698            self.has_key_for(&owner).await?,
699            Error::CannotFindKeyForChain(self.chain_id)
700        );
701        // Ensure we have the chain description blob.
702        self.client
703            .get_chain_description_blob(self.chain_id)
704            .await?;
705
706        // Get chain info.
707        let info = self.chain_info().await?;
708
709        // Validate that the owner can propose on this chain.
710        ensure!(
711            info.manager
712                .ownership
713                .can_propose_in_multi_leader_round(&owner),
714            Error::NotAnOwner(self.chain_id)
715        );
716
717        Ok(info)
718    }
719
720    /// Prepares the chain for the next operation, i.e. makes sure we have synchronized it up to
721    /// its current height.
722    #[instrument(level = "trace")]
723    pub async fn prepare_chain(&self) -> Result<Box<ChainInfo>, Error> {
724        #[cfg(with_metrics)]
725        let _latency = super::metrics::PREPARE_CHAIN_LATENCY.measure_latency();
726
727        let mut info = self.synchronize_to_known_height().await?;
728
729        if self.preferred_owner.is_none_or(|owner| {
730            !info
731                .manager
732                .ownership
733                .is_super_owner_no_regular_owners(&owner)
734        }) {
735            // If we are not a super owner or there are regular owners, we could be missing recent
736            // certificates created by other clients. Further synchronize blocks from the network.
737            // This is a best-effort that depends on network conditions.
738            info = self.client.synchronize_chain_state(self.chain_id).await?;
739        }
740
741        if info.epoch > self.client.admin_committees().await?.0 {
742            self.client
743                .synchronize_chain_state(self.client.admin_chain_id)
744                .await?;
745        }
746
747        self.client.update_from_info(&info);
748        Ok(info)
749    }
750
751    // Verifies that our local storage contains enough history compared to the
752    // known block height. Otherwise, downloads the missing history from the
753    // network.
754    // The known height only differs if the wallet is ahead of storage.
755    async fn synchronize_to_known_height(&self) -> Result<Box<ChainInfo>, Error> {
756        let info = self
757            .client
758            .download_certificates(self.chain_id, self.initial_next_block_height)
759            .await?;
760        if info.next_block_height == self.initial_next_block_height {
761            // Check that our local node has the expected block hash.
762            ensure!(
763                self.initial_block_hash == info.block_hash,
764                Error::InternalError("Invalid chain of blocks in local node")
765            );
766        }
767        Ok(info)
768    }
769
770    /// Attempts to update all validators about the local chain.
771    #[instrument(level = "trace", skip(old_committee, latest_certificate))]
772    pub async fn update_validators(
773        &self,
774        old_committee: Option<&Committee>,
775        latest_certificate: Option<ConfirmedBlockCertificate>,
776    ) -> Result<(), Error> {
777        let update_validators_start = linera_base::time::Instant::now();
778        // Communicate the new certificate now.
779        if let Some(old_committee) = old_committee {
780            self.communicate_chain_updates(old_committee, latest_certificate.clone())
781                .await?
782        };
783        if let Ok(new_committee) = self.local_committee().await {
784            if Some(&new_committee) != old_committee {
785                // If the configuration just changed, communicate to the new committee as well.
786                // (This is actually more important that updating the previous committee.)
787                self.communicate_chain_updates(&new_committee, latest_certificate)
788                    .await?;
789            }
790        }
791        self.send_timing(update_validators_start, TimingType::UpdateValidators);
792        Ok(())
793    }
794
795    /// Broadcasts certified blocks to validators.
796    #[instrument(level = "trace", skip(committee))]
797    pub async fn communicate_chain_updates(
798        &self,
799        committee: &Committee,
800        latest_certificate: Option<ConfirmedBlockCertificate>,
801    ) -> Result<(), Error> {
802        let delivery = self.options.cross_chain_message_delivery;
803        let height = self.chain_info().await?.next_block_height;
804        self.client
805            .communicate_chain_updates(
806                committee,
807                self.chain_id,
808                height,
809                delivery,
810                latest_certificate,
811            )
812            .await
813    }
814
815    /// Synchronizes all chains that any application on this chain subscribes to.
816    /// We always consider the admin chain a relevant publishing chain, for new epochs.
817    async fn synchronize_publisher_chains(&self) -> Result<(), Error> {
818        let subscriptions = self
819            .client
820            .local_node
821            .get_event_subscriptions(self.chain_id)
822            .await?;
823        let chain_ids = subscriptions
824            .iter()
825            .map(|((chain_id, _), _)| *chain_id)
826            .chain(iter::once(self.client.admin_chain_id))
827            .filter(|chain_id| *chain_id != self.chain_id)
828            .collect::<BTreeSet<_>>();
829        stream::iter(
830            chain_ids
831                .into_iter()
832                .map(|chain_id| self.client.synchronize_chain_state(chain_id)),
833        )
834        .buffer_unordered(self.options.max_joined_tasks)
835        .collect::<Vec<_>>()
836        .await
837        .into_iter()
838        .collect::<Result<Vec<_>, _>>()?;
839        Ok(())
840    }
841
842    /// Attempts to download new received certificates.
843    ///
844    /// This is a best effort: it will only find certificates that have been confirmed
845    /// amongst sufficiently many validators of the current committee of the target
846    /// chain.
847    ///
848    /// However, this should be the case whenever a sender's chain is still in use and
849    /// is regularly upgraded to new committees.
850    #[instrument(level = "trace")]
851    pub async fn find_received_certificates(&self) -> Result<(), Error> {
852        debug!(chain_id = %self.chain_id, "starting find_received_certificates");
853        #[cfg(with_metrics)]
854        let _latency = super::metrics::FIND_RECEIVED_CERTIFICATES_LATENCY.measure_latency();
855        // Use network information from the local chain.
856        let chain_id = self.chain_id;
857        let (_, committee) = self.admin_committee().await?;
858        let nodes = self.client.make_nodes(&committee)?;
859
860        let trackers = self
861            .client
862            .local_node
863            .get_received_certificate_trackers(chain_id)
864            .await?;
865
866        trace!("find_received_certificates: read trackers");
867
868        let received_log_batches = Arc::new(std::sync::Mutex::new(Vec::new()));
869        // Proceed to downloading received logs.
870        let result = communicate_with_quorum(
871            &nodes,
872            &committee,
873            |_| (),
874            |remote_node| {
875                let client = &self.client;
876                let tracker = trackers.get(&remote_node.public_key).copied().unwrap_or(0);
877                let received_log_batches = Arc::clone(&received_log_batches);
878                Box::pin(async move {
879                    let batch = client
880                        .get_received_log_from_validator(chain_id, &remote_node, tracker)
881                        .await?;
882                    let mut batches = received_log_batches.lock().unwrap();
883                    batches.push((remote_node.public_key, batch));
884                    Ok(())
885                })
886            },
887            self.options.quorum_grace_period,
888        )
889        .await;
890
891        if let Err(error) = result {
892            error!(
893                %error,
894                "Failed to synchronize received_logs from at least a quorum of validators",
895            );
896        }
897
898        let received_logs: Vec<_> = {
899            let mut received_log_batches = received_log_batches.lock().unwrap();
900            std::mem::take(received_log_batches.as_mut())
901        };
902
903        debug!(
904            received_logs_len = %received_logs.len(),
905            received_logs_total = %received_logs.iter().map(|x| x.1.len()).sum::<usize>(),
906            "collected received logs"
907        );
908
909        let (received_logs, mut validator_trackers) = {
910            (
911                ReceivedLogs::from_received_result(received_logs.clone()),
912                ValidatorTrackers::new(received_logs, &trackers),
913            )
914        };
915
916        debug!(
917            num_chains = %received_logs.num_chains(),
918            num_certs = %received_logs.num_certs(),
919            "find_received_certificates: total number of chains and certificates to sync",
920        );
921
922        let max_blocks_per_chain =
923            self.options.sender_certificate_download_batch_size / self.options.max_joined_tasks * 2;
924        for received_log in received_logs.into_batches(
925            self.options.sender_certificate_download_batch_size,
926            max_blocks_per_chain,
927        ) {
928            validator_trackers = self
929                .receive_sender_certificates(received_log, validator_trackers, &nodes)
930                .await?;
931
932            self.update_received_certificate_trackers(&validator_trackers)
933                .await;
934        }
935
936        info!("find_received_certificates finished");
937
938        Ok(())
939    }
940
941    async fn update_received_certificate_trackers(&self, trackers: &ValidatorTrackers) {
942        let updated_trackers = trackers.to_map();
943        trace!(?updated_trackers, "updated tracker values");
944
945        // Update the trackers.
946        if let Err(error) = self
947            .client
948            .local_node
949            .update_received_certificate_trackers(self.chain_id, updated_trackers)
950            .await
951        {
952            error!(
953                chain_id = %self.chain_id,
954                %error,
955                "Failed to update the certificate trackers for chain",
956            );
957        }
958    }
959
960    /// Downloads and processes or preprocesses the certificates for blocks sending messages to
961    /// this chain that we are still missing.
962    async fn receive_sender_certificates(
963        &self,
964        mut received_logs: ReceivedLogs,
965        mut validator_trackers: ValidatorTrackers,
966        nodes: &[RemoteNode<Env::ValidatorNode>],
967    ) -> Result<ValidatorTrackers, Error> {
968        debug!(
969            num_chains = %received_logs.num_chains(),
970            num_certs = %received_logs.num_certs(),
971            "receive_sender_certificates: number of chains and certificates to sync",
972        );
973
974        // Obtain the next block height we need in the local node, for each chain.
975        let local_next_heights = self
976            .client
977            .local_node
978            .next_outbox_heights(received_logs.chains(), self.chain_id)
979            .await?;
980
981        validator_trackers.filter_out_already_known(&mut received_logs, local_next_heights);
982
983        debug!(
984            remaining_total_certificates = %received_logs.num_certs(),
985            "receive_sender_certificates: computed remote_heights"
986        );
987
988        let mut other_sender_chains = Vec::new();
989        let (sender, mut receiver) = mpsc::unbounded_channel::<ChainAndHeight>();
990
991        let cert_futures = received_logs.heights_per_chain().into_iter().filter_map({
992            let received_logs = &received_logs;
993            let other_sender_chains = &mut other_sender_chains;
994
995            move |(sender_chain_id, remote_heights)| {
996                if remote_heights.is_empty() {
997                    // Our highest, locally executed block is higher than any block height
998                    // from the current batch. Skip this batch, but remember to wait for
999                    // the messages to be delivered to the inboxes.
1000                    other_sender_chains.push(sender_chain_id);
1001                    return None;
1002                };
1003                let remote_heights = remote_heights.into_iter().collect::<Vec<_>>();
1004                let sender = sender.clone();
1005                let client = self.client.clone();
1006                let mut nodes = nodes.to_vec();
1007                nodes.shuffle(&mut rand::thread_rng());
1008                Some(async move {
1009                    client
1010                        .download_and_process_sender_chain(
1011                            sender_chain_id,
1012                            &nodes,
1013                            received_logs,
1014                            remote_heights,
1015                            sender,
1016                        )
1017                        .await
1018                })
1019            }
1020        });
1021
1022        future::join(
1023            stream::iter(cert_futures)
1024                .buffer_unordered(self.options.max_joined_tasks)
1025                .collect::<()>(),
1026            async {
1027                while let Some(chain_and_height) = receiver.recv().await {
1028                    validator_trackers.downloaded_cert(chain_and_height);
1029                }
1030            },
1031        )
1032        .await;
1033
1034        debug!(
1035            num_other_chains = %other_sender_chains.len(),
1036            "receive_sender_certificates: processing certificates finished"
1037        );
1038
1039        // Certificates for these chains were omitted from `certificates` because they were
1040        // already processed locally. If they were processed in a concurrent task, it is not
1041        // guaranteed that their cross-chain messages were already handled.
1042        self.retry_pending_cross_chain_requests(nodes, other_sender_chains)
1043            .await;
1044
1045        debug!("receive_sender_certificates: finished processing other_sender_chains");
1046
1047        Ok(validator_trackers)
1048    }
1049
1050    /// Retries cross chain requests on the chains which may have been processed on
1051    /// another task without the messages being correctly handled.
1052    async fn retry_pending_cross_chain_requests(
1053        &self,
1054        nodes: &[RemoteNode<Env::ValidatorNode>],
1055        other_sender_chains: Vec<ChainId>,
1056    ) {
1057        let stream = FuturesUnordered::from_iter(other_sender_chains.into_iter().map(|chain_id| {
1058            let local_node = self.client.local_node.clone();
1059            async move {
1060                if let Err(error) = match local_node
1061                    .retry_pending_cross_chain_requests(chain_id)
1062                    .await
1063                {
1064                    Ok(()) => Ok(()),
1065                    Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1066                        if let Err(error) = self
1067                            .client
1068                            .update_local_node_with_blobs_from(blob_ids.clone(), nodes)
1069                            .await
1070                        {
1071                            error!(
1072                                ?blob_ids,
1073                                %error,
1074                                "Error while attempting to download blobs during retrying outgoing \
1075                                messages"
1076                            );
1077                        }
1078                        local_node
1079                            .retry_pending_cross_chain_requests(chain_id)
1080                            .await
1081                    }
1082                    err => err,
1083                } {
1084                    error!(
1085                        %chain_id,
1086                        %error,
1087                        "Failed to retry outgoing messages from chain"
1088                    );
1089                }
1090            }
1091        }));
1092        stream.for_each(future::ready).await;
1093    }
1094
1095    /// Sends money.
1096    #[instrument(level = "trace")]
1097    pub async fn transfer(
1098        &self,
1099        owner: AccountOwner,
1100        amount: Amount,
1101        recipient: Account,
1102    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1103        // TODO(#467): check the balance of `owner` before signing any block proposal.
1104        self.execute_operation(SystemOperation::Transfer {
1105            owner,
1106            recipient,
1107            amount,
1108        })
1109        .await
1110    }
1111
1112    /// Verify if a data blob is readable from storage.
1113    // TODO(#2490): Consider removing or renaming this.
1114    #[instrument(level = "trace")]
1115    pub async fn read_data_blob(
1116        &self,
1117        hash: CryptoHash,
1118    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1119        let blob_id = BlobId {
1120            hash,
1121            blob_type: BlobType::Data,
1122        };
1123        self.execute_operation(SystemOperation::VerifyBlob { blob_id })
1124            .await
1125    }
1126
1127    /// Claims money in a remote chain.
1128    #[instrument(level = "trace")]
1129    pub async fn claim(
1130        &self,
1131        owner: AccountOwner,
1132        target_id: ChainId,
1133        recipient: Account,
1134        amount: Amount,
1135    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1136        self.execute_operation(SystemOperation::Claim {
1137            owner,
1138            target_id,
1139            recipient,
1140            amount,
1141        })
1142        .await
1143    }
1144
1145    /// Requests a leader timeout vote from all validators. If a quorum signs it, creates a
1146    /// certificate and sends it to all validators, to make them enter the next round.
1147    #[instrument(level = "trace")]
1148    pub async fn request_leader_timeout(&self) -> Result<TimeoutCertificate, Error> {
1149        let chain_id = self.chain_id;
1150        let info = self.chain_info_with_committees().await?;
1151        let committee = info.current_committee()?;
1152        let height = info.next_block_height;
1153        let round = info.manager.current_round;
1154        let action = CommunicateAction::RequestTimeout {
1155            height,
1156            round,
1157            chain_id,
1158        };
1159        let value = Timeout::new(chain_id, height, info.epoch);
1160        let certificate = Box::new(
1161            self.client
1162                .communicate_chain_action(committee, action, value)
1163                .await?,
1164        );
1165        self.client.process_certificate(certificate.clone()).await?;
1166        // The block height didn't increase, but this will communicate the timeout as well.
1167        self.client
1168            .communicate_chain_updates(
1169                committee,
1170                chain_id,
1171                height,
1172                CrossChainMessageDelivery::NonBlocking,
1173                None,
1174            )
1175            .await?;
1176        Ok(*certificate)
1177    }
1178
1179    /// Downloads and processes any certificates we are missing for the given chain.
1180    #[instrument(level = "trace", skip_all)]
1181    pub async fn synchronize_chain_state(
1182        &self,
1183        chain_id: ChainId,
1184    ) -> Result<Box<ChainInfo>, Error> {
1185        self.client.synchronize_chain_state(chain_id).await
1186    }
1187
1188    /// Downloads and processes any certificates we are missing for this chain, from the given
1189    /// committee.
1190    #[instrument(level = "trace", skip_all)]
1191    pub async fn synchronize_chain_state_from_committee(
1192        &self,
1193        committee: Committee,
1194    ) -> Result<Box<ChainInfo>, Error> {
1195        self.client
1196            .synchronize_chain_from_committee(self.chain_id, committee)
1197            .await
1198    }
1199
1200    /// Executes a list of operations.
1201    #[instrument(level = "trace", skip(operations, blobs))]
1202    pub async fn execute_operations(
1203        &self,
1204        operations: Vec<Operation>,
1205        blobs: Vec<Blob>,
1206    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1207        let timing_start = linera_base::time::Instant::now();
1208
1209        let result = loop {
1210            let execute_block_start = linera_base::time::Instant::now();
1211            // TODO(#2066): Remove boxing once the call-stack is shallower
1212            match Box::pin(self.execute_block(operations.clone(), blobs.clone())).await {
1213                Ok(ClientOutcome::Committed(certificate)) => {
1214                    self.send_timing(execute_block_start, TimingType::ExecuteBlock);
1215                    break Ok(ClientOutcome::Committed(certificate));
1216                }
1217                Ok(ClientOutcome::WaitForTimeout(timeout)) => {
1218                    break Ok(ClientOutcome::WaitForTimeout(timeout));
1219                }
1220                Ok(ClientOutcome::Conflict(certificate)) => {
1221                    info!(
1222                        height = %certificate.block().header.height,
1223                        "Another block was committed."
1224                    );
1225                    break Ok(ClientOutcome::Conflict(certificate));
1226                }
1227                Err(Error::CommunicationError(CommunicationError::Trusted(
1228                    NodeError::UnexpectedBlockHeight {
1229                        expected_block_height,
1230                        found_block_height,
1231                    },
1232                ))) if expected_block_height > found_block_height => {
1233                    tracing::info!(
1234                        "Local state is outdated; synchronizing chain {:.8}",
1235                        self.chain_id
1236                    );
1237                    self.synchronize_chain_state(self.chain_id).await?;
1238                }
1239                Err(err) => return Err(err),
1240            };
1241        };
1242
1243        self.send_timing(timing_start, TimingType::ExecuteOperations);
1244
1245        result
1246    }
1247
1248    /// Executes an operation.
1249    pub async fn execute_operation(
1250        &self,
1251        operation: impl Into<Operation>,
1252    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1253        self.execute_operations(vec![operation.into()], vec![])
1254            .await
1255    }
1256
1257    /// Executes a new block.
1258    ///
1259    /// This must be preceded by a call to `prepare_chain()`.
1260    #[instrument(level = "trace", skip(operations, blobs))]
1261    async fn execute_block(
1262        &self,
1263        operations: Vec<Operation>,
1264        blobs: Vec<Blob>,
1265    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1266        #[cfg(with_metrics)]
1267        let _latency = super::metrics::EXECUTE_BLOCK_LATENCY.measure_latency();
1268
1269        let mutex = self.client_mutex();
1270        let _guard = mutex.lock_owned().await;
1271        // TODO(#5092): We shouldn't need to call this explicitly.
1272        match self.process_pending_block_without_prepare().await? {
1273            ClientOutcome::Committed(Some(certificate)) => {
1274                return Ok(ClientOutcome::Conflict(Box::new(certificate)))
1275            }
1276            ClientOutcome::WaitForTimeout(timeout) => {
1277                return Ok(ClientOutcome::WaitForTimeout(timeout))
1278            }
1279            ClientOutcome::Conflict(certificate) => {
1280                return Ok(ClientOutcome::Conflict(certificate))
1281            }
1282            ClientOutcome::Committed(None) => {}
1283        }
1284
1285        // Collect pending messages and epoch changes after acquiring the lock to avoid
1286        // race conditions where messages valid for one block height are proposed at a
1287        // different height.
1288        let transactions = self.prepend_epochs_messages_and_events(operations).await?;
1289
1290        if transactions.is_empty() {
1291            return Err(Error::LocalNodeError(LocalNodeError::WorkerError(
1292                WorkerError::ChainError(Box::new(ChainError::EmptyBlock)),
1293            )));
1294        }
1295
1296        let block = self.new_pending_block(transactions, blobs).await?;
1297
1298        match self.process_pending_block_without_prepare().await? {
1299            ClientOutcome::Committed(Some(certificate)) if certificate.block() == &block => {
1300                Ok(ClientOutcome::Committed(certificate))
1301            }
1302            ClientOutcome::Committed(Some(certificate)) => {
1303                Ok(ClientOutcome::Conflict(Box::new(certificate)))
1304            }
1305            // Should be unreachable: We did set a pending block.
1306            ClientOutcome::Committed(None) => {
1307                Err(Error::BlockProposalError("Unexpected block proposal error"))
1308            }
1309            ClientOutcome::WaitForTimeout(timeout) => Ok(ClientOutcome::WaitForTimeout(timeout)),
1310            ClientOutcome::Conflict(certificate) => Ok(ClientOutcome::Conflict(certificate)),
1311        }
1312    }
1313
1314    /// Creates a vector of transactions which, in addition to the provided operations,
1315    /// also contains epoch changes, receiving message bundles and event stream updates
1316    /// (if there are any to be processed).
1317    /// This should be called when executing a block, in order to make sure that any pending
1318    /// messages or events are included in it.
1319    #[instrument(level = "trace", skip(operations))]
1320    async fn prepend_epochs_messages_and_events(
1321        &self,
1322        operations: Vec<Operation>,
1323    ) -> Result<Vec<Transaction>, Error> {
1324        let incoming_bundles = self.pending_message_bundles().await?;
1325        let stream_updates = self.collect_stream_updates().await?;
1326        Ok(self
1327            .collect_epoch_changes()
1328            .await?
1329            .into_iter()
1330            .map(Transaction::ExecuteOperation)
1331            .chain(
1332                incoming_bundles
1333                    .into_iter()
1334                    .map(Transaction::ReceiveMessages),
1335            )
1336            .chain(
1337                stream_updates
1338                    .into_iter()
1339                    .map(Transaction::ExecuteOperation),
1340            )
1341            .chain(operations.into_iter().map(Transaction::ExecuteOperation))
1342            .collect::<Vec<_>>())
1343    }
1344
1345    /// Creates a new pending block and handles the proposal in the local node.
1346    /// Next time `process_pending_block_without_prepare` is called, this block will be proposed
1347    /// to the validators.
1348    #[instrument(level = "trace", skip(transactions, blobs))]
1349    async fn new_pending_block(
1350        &self,
1351        transactions: Vec<Transaction>,
1352        blobs: Vec<Blob>,
1353    ) -> Result<Block, Error> {
1354        let identity = self.identity().await?;
1355
1356        ensure!(
1357            self.pending_proposal().is_none(),
1358            Error::BlockProposalError(
1359                "Client state already has a pending block; \
1360                use the `linera retry-pending-block` command to commit that first"
1361            )
1362        );
1363        let info = self.chain_info_with_committees().await?;
1364        let timestamp = self.next_timestamp(&transactions, info.timestamp);
1365        let proposed_block = ProposedBlock {
1366            epoch: info.epoch,
1367            chain_id: self.chain_id,
1368            transactions,
1369            previous_block_hash: info.block_hash,
1370            height: info.next_block_height,
1371            authenticated_owner: Some(identity),
1372            timestamp,
1373        };
1374
1375        let round = self.round_for_oracle(&info, &identity).await?;
1376        // Make sure every incoming message succeeds and otherwise remove them.
1377        // Also, compute the final certified hash while we're at it.
1378        let (block, _) = self
1379            .client
1380            .stage_block_execution_with_policy(
1381                proposed_block,
1382                round,
1383                blobs.clone(),
1384                BundleExecutionPolicy::AutoRetry {
1385                    max_failures: self.options.max_block_limit_errors,
1386                },
1387            )
1388            .await?;
1389        let (proposed_block, _) = block.clone().into_proposal();
1390        self.update_state(|state| {
1391            state.set_pending_proposal(proposed_block.clone(), blobs.clone())
1392        });
1393        Ok(block)
1394    }
1395
1396    /// Returns a suitable timestamp for the next block.
1397    ///
1398    /// This will usually be the current time according to the local clock, but may be slightly
1399    /// ahead to make sure it's not earlier than the incoming messages or the previous block.
1400    #[instrument(level = "trace", skip(transactions))]
1401    fn next_timestamp(&self, transactions: &[Transaction], block_time: Timestamp) -> Timestamp {
1402        let local_time = self.storage_client().clock().current_time();
1403        transactions
1404            .iter()
1405            .filter_map(Transaction::incoming_bundle)
1406            .map(|msg| msg.bundle.timestamp)
1407            .max()
1408            .map_or(local_time, |timestamp| timestamp.max(local_time))
1409            .max(block_time)
1410    }
1411
1412    /// Queries an application.
1413    #[instrument(level = "trace", skip(query))]
1414    pub async fn query_application(
1415        &self,
1416        query: Query,
1417        block_hash: Option<CryptoHash>,
1418    ) -> Result<QueryOutcome, Error> {
1419        loop {
1420            let result = self
1421                .client
1422                .local_node
1423                .query_application(self.chain_id, query.clone(), block_hash)
1424                .await;
1425            if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1426                let validators = self.client.validator_nodes().await?;
1427                self.client
1428                    .update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1429                    .await?;
1430                continue; // We found the missing blob: retry.
1431            }
1432            return Ok(result?);
1433        }
1434    }
1435
1436    /// Queries a system application.
1437    #[instrument(level = "trace", skip(query))]
1438    pub async fn query_system_application(
1439        &self,
1440        query: SystemQuery,
1441    ) -> Result<QueryOutcome<SystemResponse>, Error> {
1442        let QueryOutcome {
1443            response,
1444            operations,
1445        } = self.query_application(Query::System(query), None).await?;
1446        match response {
1447            QueryResponse::System(response) => Ok(QueryOutcome {
1448                response,
1449                operations,
1450            }),
1451            _ => Err(Error::InternalError("Unexpected response for system query")),
1452        }
1453    }
1454
1455    /// Queries a user application.
1456    #[instrument(level = "trace", skip(application_id, query))]
1457    #[cfg(with_testing)]
1458    pub async fn query_user_application<A: Abi>(
1459        &self,
1460        application_id: ApplicationId<A>,
1461        query: &A::Query,
1462    ) -> Result<QueryOutcome<A::QueryResponse>, Error> {
1463        let query = Query::user(application_id, query)?;
1464        let QueryOutcome {
1465            response,
1466            operations,
1467        } = self.query_application(query, None).await?;
1468        match response {
1469            QueryResponse::User(response_bytes) => {
1470                let response = serde_json::from_slice(&response_bytes)?;
1471                Ok(QueryOutcome {
1472                    response,
1473                    operations,
1474                })
1475            }
1476            _ => Err(Error::InternalError("Unexpected response for user query")),
1477        }
1478    }
1479
1480    /// Obtains the local balance of the chain account after staging the execution of
1481    /// incoming messages in a new block.
1482    ///
1483    /// Does not attempt to synchronize with validators. The result will reflect up to
1484    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
1485    /// block.
1486    #[instrument(level = "trace")]
1487    pub async fn query_balance(&self) -> Result<Amount, Error> {
1488        let (balance, _) = self.query_balances_with_owner(AccountOwner::CHAIN).await?;
1489        Ok(balance)
1490    }
1491
1492    /// Obtains the local balance of an account after staging the execution of incoming messages in
1493    /// a new block.
1494    ///
1495    /// Does not attempt to synchronize with validators. The result will reflect up to
1496    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
1497    /// block.
1498    #[instrument(level = "trace", skip(owner))]
1499    pub async fn query_owner_balance(&self, owner: AccountOwner) -> Result<Amount, Error> {
1500        if owner.is_chain() {
1501            self.query_balance().await
1502        } else {
1503            Ok(self
1504                .query_balances_with_owner(owner)
1505                .await?
1506                .1
1507                .unwrap_or(Amount::ZERO))
1508        }
1509    }
1510
1511    /// Obtains the local balance of an account and optionally another user after staging the
1512    /// execution of incoming messages in a new block.
1513    ///
1514    /// Does not attempt to synchronize with validators. The result will reflect up to
1515    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
1516    /// block.
1517    #[instrument(level = "trace", skip(owner))]
1518    pub(crate) async fn query_balances_with_owner(
1519        &self,
1520        owner: AccountOwner,
1521    ) -> Result<(Amount, Option<Amount>), Error> {
1522        let incoming_bundles = self.pending_message_bundles().await?;
1523        // Since we disallow empty blocks, and there is no incoming messages,
1524        // that could change it, we query for the balance immediately.
1525        if incoming_bundles.is_empty() {
1526            let chain_balance = self.local_balance().await?;
1527            let owner_balance = self.local_owner_balance(owner).await?;
1528            return Ok((chain_balance, Some(owner_balance)));
1529        }
1530        let info = self.chain_info().await?;
1531        let transactions = incoming_bundles
1532            .into_iter()
1533            .map(Transaction::ReceiveMessages)
1534            .collect::<Vec<_>>();
1535        let timestamp = self.next_timestamp(&transactions, info.timestamp);
1536        let block = ProposedBlock {
1537            epoch: info.epoch,
1538            chain_id: self.chain_id,
1539            transactions,
1540            previous_block_hash: info.block_hash,
1541            height: info.next_block_height,
1542            authenticated_owner: if owner == AccountOwner::CHAIN {
1543                None
1544            } else {
1545                Some(owner)
1546            },
1547            timestamp,
1548        };
1549        match self
1550            .client
1551            .stage_block_execution_with_policy(
1552                block,
1553                None,
1554                Vec::new(),
1555                BundleExecutionPolicy::AutoRetry {
1556                    max_failures: self.options.max_block_limit_errors,
1557                },
1558            )
1559            .await
1560        {
1561            Ok((_, response)) => Ok((
1562                response.info.chain_balance,
1563                response.info.requested_owner_balance,
1564            )),
1565            Err(Error::LocalNodeError(LocalNodeError::WorkerError(WorkerError::ChainError(
1566                error,
1567            )))) if matches!(
1568                &*error,
1569                ChainError::ExecutionError(
1570                    execution_error,
1571                    ChainExecutionContext::Block
1572                ) if matches!(
1573                    **execution_error,
1574                    ExecutionError::FeesExceedFunding { .. }
1575                )
1576            ) =>
1577            {
1578                // We can't even pay for the execution of one empty block. Let's return zero.
1579                Ok((Amount::ZERO, Some(Amount::ZERO)))
1580            }
1581            Err(error) => Err(error),
1582        }
1583    }
1584
1585    /// Reads the local balance of the chain account.
1586    ///
1587    /// Does not process the inbox or attempt to synchronize with validators.
1588    #[instrument(level = "trace")]
1589    pub async fn local_balance(&self) -> Result<Amount, Error> {
1590        let (balance, _) = self.local_balances_with_owner(AccountOwner::CHAIN).await?;
1591        Ok(balance)
1592    }
1593
1594    /// Reads the local balance of a user account.
1595    ///
1596    /// Does not process the inbox or attempt to synchronize with validators.
1597    #[instrument(level = "trace", skip(owner))]
1598    pub async fn local_owner_balance(&self, owner: AccountOwner) -> Result<Amount, Error> {
1599        if owner.is_chain() {
1600            self.local_balance().await
1601        } else {
1602            Ok(self
1603                .local_balances_with_owner(owner)
1604                .await?
1605                .1
1606                .unwrap_or(Amount::ZERO))
1607        }
1608    }
1609
1610    /// Reads the local balance of the chain account and optionally another user.
1611    ///
1612    /// Does not process the inbox or attempt to synchronize with validators.
1613    #[instrument(level = "trace", skip(owner))]
1614    pub(crate) async fn local_balances_with_owner(
1615        &self,
1616        owner: AccountOwner,
1617    ) -> Result<(Amount, Option<Amount>), Error> {
1618        ensure!(
1619            self.chain_info().await?.next_block_height >= self.initial_next_block_height,
1620            Error::WalletSynchronizationError
1621        );
1622        let mut query = ChainInfoQuery::new(self.chain_id);
1623        query.request_owner_balance = owner;
1624        let response = self
1625            .client
1626            .local_node
1627            .handle_chain_info_query(query)
1628            .await?;
1629        Ok((
1630            response.info.chain_balance,
1631            response.info.requested_owner_balance,
1632        ))
1633    }
1634
1635    /// Sends tokens to a chain.
1636    #[instrument(level = "trace")]
1637    pub async fn transfer_to_account(
1638        &self,
1639        from: AccountOwner,
1640        amount: Amount,
1641        account: Account,
1642    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1643        self.transfer(from, amount, account).await
1644    }
1645
1646    /// Burns tokens (transfer to a special address).
1647    #[cfg(with_testing)]
1648    #[instrument(level = "trace")]
1649    pub async fn burn(
1650        &self,
1651        owner: AccountOwner,
1652        amount: Amount,
1653    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1654        let recipient = Account::burn_address(self.chain_id);
1655        self.transfer(owner, amount, recipient).await
1656    }
1657
1658    #[instrument(level = "trace")]
1659    pub async fn fetch_chain_info(&self) -> Result<Box<ChainInfo>, Error> {
1660        let validators = self.client.validator_nodes().await?;
1661        self.client
1662            .fetch_chain_info(self.chain_id, &validators)
1663            .await
1664    }
1665
1666    /// Attempts to synchronize chains that have sent us messages and populate our local
1667    /// inbox.
1668    ///
1669    /// To create a block that actually executes the messages in the inbox,
1670    /// `process_inbox` must be called separately.
1671    ///
1672    /// If the chain is in follow-only mode, this only downloads blocks for this chain without
1673    /// fetching manager values or sender/publisher chains.
1674    #[instrument(level = "trace")]
1675    pub async fn synchronize_from_validators(&self) -> Result<Box<ChainInfo>, Error> {
1676        if self.is_follow_only() {
1677            return self.client.synchronize_chain_state(self.chain_id).await;
1678        }
1679        let info = self.prepare_chain().await?;
1680        self.synchronize_publisher_chains().await?;
1681        self.find_received_certificates().await?;
1682        Ok(info)
1683    }
1684
1685    /// Processes the last pending block
1686    #[instrument(level = "trace")]
1687    pub async fn process_pending_block(
1688        &self,
1689    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1690        self.prepare_chain().await?;
1691        self.process_pending_block_without_prepare().await
1692    }
1693
1694    /// Processes the last pending block. Assumes that the local chain is up to date.
1695    #[instrument(level = "trace")]
1696    async fn process_pending_block_without_prepare(
1697        &self,
1698    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1699        let info = self.request_leader_timeout_if_needed().await?;
1700
1701        // If there is a validated block in the current round, finalize it.
1702        if info.manager.has_locking_block_in_current_round()
1703            && !info.manager.current_round.is_fast()
1704        {
1705            return self.finalize_locking_block(info).await;
1706        }
1707        let owner = self.identity().await?;
1708
1709        let local_node = &self.client.local_node;
1710        // Otherwise we have to re-propose the highest validated block, if there is one.
1711        let pending_proposal = self.pending_proposal();
1712        let (block, blobs) = if let Some(locking) = &info.manager.requested_locking {
1713            match &**locking {
1714                LockingBlock::Regular(certificate) => {
1715                    let blob_ids = certificate.block().required_blob_ids();
1716                    let blobs = local_node
1717                        .get_locking_blobs(&blob_ids, self.chain_id)
1718                        .await?
1719                        .ok_or_else(|| Error::InternalError("Missing local locking blobs"))?;
1720                    debug!("Retrying locking block from round {}", certificate.round);
1721                    (certificate.block().clone(), blobs)
1722                }
1723                LockingBlock::Fast(proposal) => {
1724                    let proposed_block = proposal.content.block.clone();
1725                    let blob_ids = proposed_block.published_blob_ids();
1726                    let blobs = local_node
1727                        .get_locking_blobs(&blob_ids, self.chain_id)
1728                        .await?
1729                        .ok_or_else(|| Error::InternalError("Missing local locking blobs"))?;
1730                    let block = self
1731                        .client
1732                        .stage_block_execution(proposed_block, None, blobs.clone())
1733                        .await?
1734                        .0;
1735                    debug!("Retrying locking block from fast round.");
1736                    (block, blobs)
1737                }
1738            }
1739        } else if let Some(pending_proposal) = pending_proposal {
1740            // Otherwise we are free to propose our own pending block.
1741            let proposed_block = pending_proposal.block;
1742            let round = self.round_for_oracle(&info, &owner).await?;
1743            let (block, _) = self
1744                .client
1745                .stage_block_execution(proposed_block, round, pending_proposal.blobs.clone())
1746                .await?;
1747            debug!("Proposing the local pending block.");
1748            (block, pending_proposal.blobs)
1749        } else {
1750            return Ok(ClientOutcome::Committed(None)); // Nothing to do.
1751        };
1752
1753        let has_oracle_responses = block.has_oracle_responses();
1754        let (proposed_block, outcome) = block.into_proposal();
1755        let round = match self
1756            .round_for_new_proposal(&info, &owner, has_oracle_responses)
1757            .await?
1758        {
1759            Either::Left(round) => round,
1760            Either::Right(timeout) => return Ok(ClientOutcome::WaitForTimeout(timeout)),
1761        };
1762        debug!("Proposing block for round {}", round);
1763
1764        let already_handled_locally = info
1765            .manager
1766            .already_handled_proposal(round, &proposed_block);
1767        // Create the final block proposal.
1768        let proposal = if let Some(locking) = info.manager.requested_locking {
1769            Box::new(match *locking {
1770                LockingBlock::Regular(cert) => {
1771                    BlockProposal::new_retry_regular(owner, round, cert, self.signer())
1772                        .await
1773                        .map_err(Error::signer_failure)?
1774                }
1775                LockingBlock::Fast(proposal) => {
1776                    BlockProposal::new_retry_fast(owner, round, proposal, self.signer())
1777                        .await
1778                        .map_err(Error::signer_failure)?
1779                }
1780            })
1781        } else {
1782            Box::new(
1783                BlockProposal::new_initial(owner, round, proposed_block.clone(), self.signer())
1784                    .await
1785                    .map_err(Error::signer_failure)?,
1786            )
1787        };
1788        if !already_handled_locally {
1789            // Check the final block proposal. This will be cheaper after #1401.
1790            if let Err(err) = local_node.handle_block_proposal(*proposal.clone()).await {
1791                match err {
1792                    LocalNodeError::BlobsNotFound(_) => {
1793                        local_node
1794                            .handle_pending_blobs(self.chain_id, blobs)
1795                            .await?;
1796                        local_node.handle_block_proposal(*proposal.clone()).await?;
1797                    }
1798                    err => return Err(err.into()),
1799                }
1800            }
1801        }
1802        let committee = self.local_committee().await?;
1803        let block = Block::new(proposed_block, outcome);
1804        // Send the query to validators.
1805        let submit_block_proposal_start = linera_base::time::Instant::now();
1806        let certificate = if round.is_fast() {
1807            let hashed_value = ConfirmedBlock::new(block);
1808            self.client
1809                .submit_block_proposal(&committee, proposal, hashed_value)
1810                .await?
1811        } else {
1812            let hashed_value = ValidatedBlock::new(block);
1813            let certificate = self
1814                .client
1815                .submit_block_proposal(&committee, proposal, hashed_value.clone())
1816                .await?;
1817            self.client.finalize_block(&committee, certificate).await?
1818        };
1819        self.send_timing(submit_block_proposal_start, TimingType::SubmitBlockProposal);
1820        debug!(round = %certificate.round, "Sending confirmed block to validators");
1821        self.update_validators(Some(&committee), Some(certificate.clone()))
1822            .await?;
1823        Ok(ClientOutcome::Committed(Some(certificate)))
1824    }
1825
1826    fn send_timing(&self, start: Instant, timing_type: TimingType) {
1827        let Some(sender) = &self.timing_sender else {
1828            return;
1829        };
1830        if let Err(err) = sender.send((start.elapsed().as_millis() as u64, timing_type)) {
1831            tracing::warn!(%err, "Failed to send timing info");
1832        }
1833    }
1834
1835    /// Requests a leader timeout certificate if the current round has timed out. Returns the
1836    /// chain info for the (possibly new) current round.
1837    async fn request_leader_timeout_if_needed(&self) -> Result<Box<ChainInfo>, Error> {
1838        let mut info = self.chain_info_with_manager_values().await?;
1839        // If the current round has timed out, we request a timeout certificate and retry in
1840        // the next round.
1841        if let Some(round_timeout) = info.manager.round_timeout {
1842            if round_timeout <= self.storage_client().clock().current_time() {
1843                if let Err(e) = self.request_leader_timeout().await {
1844                    info!("Failed to obtain a timeout certificate: {}", e);
1845                } else {
1846                    info = self.chain_info_with_manager_values().await?;
1847                }
1848            }
1849        }
1850        Ok(info)
1851    }
1852
1853    /// Finalizes the locking block.
1854    ///
1855    /// Panics if there is no locking block; fails if the locking block is not in the current round.
1856    async fn finalize_locking_block(
1857        &self,
1858        info: Box<ChainInfo>,
1859    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1860        let locking = info
1861            .manager
1862            .requested_locking
1863            .expect("Should have a locking block");
1864        let LockingBlock::Regular(certificate) = *locking else {
1865            panic!("Should have a locking validated block");
1866        };
1867        debug!(
1868            round = %certificate.round,
1869            "Finalizing locking block"
1870        );
1871        let committee = self.local_committee().await?;
1872        let certificate = self
1873            .client
1874            .finalize_block(&committee, certificate.clone())
1875            .await?;
1876        self.update_validators(Some(&committee), Some(certificate.clone()))
1877            .await?;
1878        Ok(ClientOutcome::Committed(Some(certificate)))
1879    }
1880
1881    /// Returns the number for the round number oracle to use when staging a block proposal.
1882    async fn round_for_oracle(
1883        &self,
1884        info: &ChainInfo,
1885        identity: &AccountOwner,
1886    ) -> Result<Option<u32>, Error> {
1887        // Pretend we do use oracles: If we don't, the round number is never read anyway.
1888        match self.round_for_new_proposal(info, identity, true).await {
1889            // If it is a multi-leader round, use its number for the oracle.
1890            Ok(Either::Left(round)) => Ok(round.multi_leader()),
1891            // If there is no suitable round with oracles, use None: If it works without oracles,
1892            // the block won't read the value. If it returns a timeout, it will be a single-leader
1893            // round, in which the oracle returns None.
1894            Err(Error::BlockProposalError(_)) | Ok(Either::Right(_)) => Ok(None),
1895            Err(err) => Err(err),
1896        }
1897    }
1898
1899    /// Returns a round in which we can propose a new block or the given one, if possible.
1900    async fn round_for_new_proposal(
1901        &self,
1902        info: &ChainInfo,
1903        identity: &AccountOwner,
1904        has_oracle_responses: bool,
1905    ) -> Result<Either<Round, RoundTimeout>, Error> {
1906        let manager = &info.manager;
1907        let seed = manager.seed;
1908        // If there is a conflicting proposal in the current round, we can only propose if the
1909        // next round can be started without a timeout, i.e. if we are in a multi-leader round.
1910        // Similarly, we cannot propose a block that uses oracles in the fast round, and also
1911        // skip the fast round if fast blocks are not allowed.
1912        let skip_fast = manager.current_round.is_fast()
1913            && (has_oracle_responses || !self.options.allow_fast_blocks);
1914        let conflict = manager
1915            .requested_signed_proposal
1916            .as_ref()
1917            .into_iter()
1918            .chain(&manager.requested_proposed)
1919            .any(|proposal| proposal.content.round == manager.current_round)
1920            || skip_fast;
1921        let round = if !conflict {
1922            manager.current_round
1923        } else if let Some(round) = manager
1924            .ownership
1925            .next_round(manager.current_round)
1926            .filter(|_| manager.current_round.is_multi_leader() || manager.current_round.is_fast())
1927        {
1928            round
1929        } else if let Some(timeout) = info.round_timeout() {
1930            return Ok(Either::Right(timeout));
1931        } else {
1932            return Err(Error::BlockProposalError(
1933                "Conflicting proposal in the current round",
1934            ));
1935        };
1936        let current_committee = info
1937            .current_committee()?
1938            .validators
1939            .values()
1940            .map(|v| (AccountOwner::from(v.account_public_key), v.votes))
1941            .collect();
1942        if manager.should_propose(identity, round, seed, &current_committee) {
1943            return Ok(Either::Left(round));
1944        }
1945        if let Some(timeout) = info.round_timeout() {
1946            return Ok(Either::Right(timeout));
1947        }
1948        Err(Error::BlockProposalError(
1949            "Not a leader in the current round",
1950        ))
1951    }
1952
1953    /// Clears the information on any operation that previously failed.
1954    #[cfg(with_testing)]
1955    #[instrument(level = "trace")]
1956    pub fn clear_pending_proposal(&self) {
1957        self.update_state(|state| state.clear_pending_proposal());
1958    }
1959
1960    /// Rotates the key of the chain.
1961    ///
1962    /// Replaces current owners of the chain with the new key pair.
1963    #[instrument(level = "trace")]
1964    pub async fn rotate_key_pair(
1965        &self,
1966        public_key: AccountPublicKey,
1967    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1968        self.transfer_ownership(public_key.into()).await
1969    }
1970
1971    /// Transfers ownership of the chain to a single super owner.
1972    #[instrument(level = "trace")]
1973    pub async fn transfer_ownership(
1974        &self,
1975        new_owner: AccountOwner,
1976    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1977        self.execute_operation(SystemOperation::ChangeOwnership {
1978            super_owners: vec![new_owner],
1979            owners: Vec::new(),
1980            first_leader: None,
1981            multi_leader_rounds: 2,
1982            open_multi_leader_rounds: false,
1983            timeout_config: TimeoutConfig::default(),
1984        })
1985        .await
1986    }
1987
1988    /// Adds another owner to the chain, and turns existing super owners into regular owners.
1989    #[instrument(level = "trace")]
1990    pub async fn share_ownership(
1991        &self,
1992        new_owner: AccountOwner,
1993        new_weight: u64,
1994    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1995        let ownership = self.prepare_chain().await?.manager.ownership;
1996        ensure!(
1997            ownership.is_active(),
1998            ChainError::InactiveChain(self.chain_id)
1999        );
2000        let mut owners = ownership.owners.into_iter().collect::<Vec<_>>();
2001        owners.extend(ownership.super_owners.into_iter().zip(iter::repeat(100)));
2002        owners.push((new_owner, new_weight));
2003        let operations = vec![Operation::system(SystemOperation::ChangeOwnership {
2004            super_owners: Vec::new(),
2005            owners,
2006            first_leader: ownership.first_leader,
2007            multi_leader_rounds: ownership.multi_leader_rounds,
2008            open_multi_leader_rounds: ownership.open_multi_leader_rounds,
2009            timeout_config: ownership.timeout_config,
2010        })];
2011        self.execute_block(operations, vec![]).await
2012    }
2013
2014    /// Returns the current ownership settings on this chain.
2015    #[instrument(level = "trace")]
2016    pub async fn query_chain_ownership(&self) -> Result<ChainOwnership, Error> {
2017        Ok(self
2018            .client
2019            .local_node
2020            .chain_state_view(self.chain_id)
2021            .await?
2022            .execution_state
2023            .system
2024            .ownership
2025            .get()
2026            .clone())
2027    }
2028
2029    /// Changes the ownership of this chain. Fails if it would remove existing owners, unless
2030    /// `remove_owners` is `true`.
2031    #[instrument(level = "trace")]
2032    pub async fn change_ownership(
2033        &self,
2034        ownership: ChainOwnership,
2035    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2036        self.execute_operation(SystemOperation::ChangeOwnership {
2037            super_owners: ownership.super_owners.into_iter().collect(),
2038            owners: ownership.owners.into_iter().collect(),
2039            first_leader: ownership.first_leader,
2040            multi_leader_rounds: ownership.multi_leader_rounds,
2041            open_multi_leader_rounds: ownership.open_multi_leader_rounds,
2042            timeout_config: ownership.timeout_config.clone(),
2043        })
2044        .await
2045    }
2046
2047    /// Returns the current application permissions on this chain.
2048    #[instrument(level = "trace")]
2049    pub async fn query_application_permissions(&self) -> Result<ApplicationPermissions, Error> {
2050        Ok(self
2051            .client
2052            .local_node
2053            .chain_state_view(self.chain_id)
2054            .await?
2055            .execution_state
2056            .system
2057            .application_permissions
2058            .get()
2059            .clone())
2060    }
2061
2062    /// Changes the application permissions configuration on this chain.
2063    #[instrument(level = "trace", skip(application_permissions))]
2064    pub async fn change_application_permissions(
2065        &self,
2066        application_permissions: ApplicationPermissions,
2067    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2068        self.execute_operation(SystemOperation::ChangeApplicationPermissions(
2069            application_permissions,
2070        ))
2071        .await
2072    }
2073
2074    /// Opens a new chain with a derived UID.
2075    #[instrument(level = "trace", skip(self))]
2076    pub async fn open_chain(
2077        &self,
2078        ownership: ChainOwnership,
2079        application_permissions: ApplicationPermissions,
2080        balance: Amount,
2081    ) -> Result<ClientOutcome<(ChainDescription, ConfirmedBlockCertificate)>, Error> {
2082        // Check if we have a key for any owner before consuming ownership.
2083        let mut has_key = false;
2084        for owner in ownership.all_owners() {
2085            if self.has_key_for(owner).await? {
2086                has_key = true;
2087                break;
2088            }
2089        }
2090        let config = OpenChainConfig {
2091            ownership,
2092            balance,
2093            application_permissions,
2094        };
2095        let operation = Operation::system(SystemOperation::OpenChain(config));
2096        let certificate = match self.execute_block(vec![operation], vec![]).await? {
2097            ClientOutcome::Committed(certificate) => certificate,
2098            ClientOutcome::Conflict(certificate) => {
2099                return Ok(ClientOutcome::Conflict(certificate));
2100            }
2101            ClientOutcome::WaitForTimeout(timeout) => {
2102                return Ok(ClientOutcome::WaitForTimeout(timeout));
2103            }
2104        };
2105        // The only operation, i.e. the last transaction, created the new chain.
2106        let chain_blob = certificate
2107            .block()
2108            .body
2109            .blobs
2110            .last()
2111            .and_then(|blobs| blobs.last())
2112            .ok_or_else(|| Error::InternalError("Failed to create a new chain"))?;
2113        let description = bcs::from_bytes::<ChainDescription>(chain_blob.bytes())?;
2114        // If we have a key for any owner, add it to the list of tracked chains.
2115        if has_key {
2116            self.client
2117                .extend_chain_mode(description.id(), ListeningMode::FullChain);
2118            self.client
2119                .local_node
2120                .retry_pending_cross_chain_requests(self.chain_id)
2121                .await?;
2122        }
2123        Ok(ClientOutcome::Committed((description, certificate)))
2124    }
2125
2126    /// Closes the chain (and loses everything in it!!).
2127    /// Returns `None` if the chain was already closed.
2128    #[instrument(level = "trace")]
2129    pub async fn close_chain(
2130        &self,
2131    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
2132        match self.execute_operation(SystemOperation::CloseChain).await {
2133            Ok(outcome) => Ok(outcome.map(Some)),
2134            Err(Error::LocalNodeError(LocalNodeError::WorkerError(WorkerError::ChainError(
2135                chain_error,
2136            )))) if matches!(*chain_error, ChainError::ClosedChain) => {
2137                Ok(ClientOutcome::Committed(None)) // Chain is already closed.
2138            }
2139            Err(error) => Err(error),
2140        }
2141    }
2142
2143    /// Publishes some module.
2144    #[cfg(not(target_arch = "wasm32"))]
2145    #[instrument(level = "trace", skip(contract, service))]
2146    pub async fn publish_module(
2147        &self,
2148        contract: Bytecode,
2149        service: Bytecode,
2150        vm_runtime: VmRuntime,
2151    ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, Error> {
2152        let (blobs, module_id) = super::create_bytecode_blobs(contract, service, vm_runtime).await;
2153        self.publish_module_blobs(blobs, module_id).await
2154    }
2155
2156    /// Publishes some module.
2157    #[cfg(not(target_arch = "wasm32"))]
2158    #[instrument(level = "trace", skip(blobs, module_id))]
2159    pub async fn publish_module_blobs(
2160        &self,
2161        blobs: Vec<Blob>,
2162        module_id: ModuleId,
2163    ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, Error> {
2164        self.execute_operations(
2165            vec![Operation::system(SystemOperation::PublishModule {
2166                module_id,
2167            })],
2168            blobs,
2169        )
2170        .await?
2171        .try_map(|certificate| Ok((module_id, certificate)))
2172    }
2173
2174    /// Publishes some data blobs.
2175    #[instrument(level = "trace", skip(bytes))]
2176    pub async fn publish_data_blobs(
2177        &self,
2178        bytes: Vec<Vec<u8>>,
2179    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2180        let blobs = bytes.into_iter().map(Blob::new_data);
2181        let publish_blob_operations = blobs
2182            .clone()
2183            .map(|blob| {
2184                Operation::system(SystemOperation::PublishDataBlob {
2185                    blob_hash: blob.id().hash,
2186                })
2187            })
2188            .collect();
2189        self.execute_operations(publish_blob_operations, blobs.collect())
2190            .await
2191    }
2192
2193    /// Publishes some data blob.
2194    #[instrument(level = "trace", skip(bytes))]
2195    pub async fn publish_data_blob(
2196        &self,
2197        bytes: Vec<u8>,
2198    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2199        self.publish_data_blobs(vec![bytes]).await
2200    }
2201
2202    /// Creates an application by instantiating some bytecode.
2203    #[instrument(
2204        level = "trace",
2205        skip(self, parameters, instantiation_argument, required_application_ids)
2206    )]
2207    pub async fn create_application<
2208        A: Abi,
2209        Parameters: Serialize,
2210        InstantiationArgument: Serialize,
2211    >(
2212        &self,
2213        module_id: ModuleId<A, Parameters, InstantiationArgument>,
2214        parameters: &Parameters,
2215        instantiation_argument: &InstantiationArgument,
2216        required_application_ids: Vec<ApplicationId>,
2217    ) -> Result<ClientOutcome<(ApplicationId<A>, ConfirmedBlockCertificate)>, Error> {
2218        let instantiation_argument = serde_json::to_vec(instantiation_argument)?;
2219        let parameters = serde_json::to_vec(parameters)?;
2220        Ok(self
2221            .create_application_untyped(
2222                module_id.forget_abi(),
2223                parameters,
2224                instantiation_argument,
2225                required_application_ids,
2226            )
2227            .await?
2228            .map(|(app_id, cert)| (app_id.with_abi(), cert)))
2229    }
2230
2231    /// Creates an application by instantiating some bytecode.
2232    #[instrument(
2233        level = "trace",
2234        skip(
2235            self,
2236            module_id,
2237            parameters,
2238            instantiation_argument,
2239            required_application_ids
2240        )
2241    )]
2242    pub async fn create_application_untyped(
2243        &self,
2244        module_id: ModuleId,
2245        parameters: Vec<u8>,
2246        instantiation_argument: Vec<u8>,
2247        required_application_ids: Vec<ApplicationId>,
2248    ) -> Result<ClientOutcome<(ApplicationId, ConfirmedBlockCertificate)>, Error> {
2249        self.execute_operation(SystemOperation::CreateApplication {
2250            module_id,
2251            parameters,
2252            instantiation_argument,
2253            required_application_ids,
2254        })
2255        .await?
2256        .try_map(|certificate| {
2257            // The first message of the only operation created the application.
2258            let mut creation: Vec<_> = certificate
2259                .block()
2260                .created_blob_ids()
2261                .into_iter()
2262                .filter(|blob_id| blob_id.blob_type == BlobType::ApplicationDescription)
2263                .collect();
2264            if creation.len() > 1 {
2265                return Err(Error::InternalError(
2266                    "Unexpected number of application descriptions published",
2267                ));
2268            }
2269            let blob_id = creation.pop().ok_or(Error::InternalError(
2270                "ApplicationDescription blob not found.",
2271            ))?;
2272            let id = ApplicationId::new(blob_id.hash);
2273            Ok((id, certificate))
2274        })
2275    }
2276
2277    /// Creates a new committee and starts using it (admin chains only).
2278    #[instrument(level = "trace", skip(committee))]
2279    pub async fn stage_new_committee(
2280        &self,
2281        committee: Committee,
2282    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2283        let blob = Blob::new(BlobContent::new_committee(bcs::to_bytes(&committee)?));
2284        let blob_hash = blob.id().hash;
2285        match self
2286            .execute_operations(
2287                vec![Operation::system(SystemOperation::Admin(
2288                    AdminOperation::PublishCommitteeBlob { blob_hash },
2289                ))],
2290                vec![blob],
2291            )
2292            .await?
2293        {
2294            ClientOutcome::Committed(_) => {}
2295            outcome @ ClientOutcome::WaitForTimeout(_) | outcome @ ClientOutcome::Conflict(_) => {
2296                return Ok(outcome)
2297            }
2298        }
2299        let epoch = self.chain_info().await?.epoch.try_add_one()?;
2300        self.execute_operation(SystemOperation::Admin(AdminOperation::CreateCommittee {
2301            epoch,
2302            blob_hash,
2303        }))
2304        .await
2305    }
2306
2307    /// Synchronizes the chain with the validators and creates blocks without any operations to
2308    /// process all incoming messages. This may require several blocks.
2309    ///
2310    /// If not all certificates could be processed due to a timeout, the timestamp for when to retry
2311    /// is returned, too.
2312    #[instrument(level = "trace")]
2313    pub async fn process_inbox(
2314        &self,
2315    ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), Error> {
2316        self.prepare_chain().await?;
2317        self.process_inbox_without_prepare().await
2318    }
2319
2320    /// Creates blocks without any operations to process all incoming messages. This may require
2321    /// several blocks.
2322    ///
2323    /// If not all certificates could be processed due to a timeout, the timestamp for when to retry
2324    /// is returned, too.
2325    #[instrument(level = "trace")]
2326    pub async fn process_inbox_without_prepare(
2327        &self,
2328    ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), Error> {
2329        #[cfg(with_metrics)]
2330        let _latency = super::metrics::PROCESS_INBOX_WITHOUT_PREPARE_LATENCY.measure_latency();
2331
2332        let mut certificates = Vec::new();
2333        loop {
2334            // We provide no operations - this means that the only operations executed
2335            // will be epoch changes, receiving messages and processing event stream
2336            // updates, if any are pending.
2337            match self.execute_block(vec![], vec![]).await {
2338                Ok(ClientOutcome::Committed(certificate)) => certificates.push(certificate),
2339                Ok(ClientOutcome::Conflict(certificate)) => certificates.push(*certificate),
2340                Ok(ClientOutcome::WaitForTimeout(timeout)) => {
2341                    return Ok((certificates, Some(timeout)));
2342                }
2343                // Nothing in the inbox and no stream updates to be processed.
2344                Err(Error::LocalNodeError(LocalNodeError::WorkerError(
2345                    WorkerError::ChainError(chain_error),
2346                ))) if matches!(*chain_error, ChainError::EmptyBlock) => {
2347                    return Ok((certificates, None));
2348                }
2349                Err(error) => return Err(error),
2350            };
2351        }
2352    }
2353
2354    /// Returns operations to process all pending epoch changes: first the new epochs, in order,
2355    /// then the removed epochs, in order.
2356    async fn collect_epoch_changes(&self) -> Result<Vec<Operation>, Error> {
2357        let (mut min_epoch, mut next_epoch) = {
2358            let (epoch, committees) = self.epoch_and_committees().await?;
2359            let min_epoch = *committees.keys().next().unwrap_or(&Epoch::ZERO);
2360            (min_epoch, epoch.try_add_one()?)
2361        };
2362        let mut epoch_change_ops = Vec::new();
2363        while self
2364            .has_admin_event(EPOCH_STREAM_NAME, next_epoch.0)
2365            .await?
2366        {
2367            epoch_change_ops.push(Operation::system(SystemOperation::ProcessNewEpoch(
2368                next_epoch,
2369            )));
2370            next_epoch.try_add_assign_one()?;
2371        }
2372        while self
2373            .has_admin_event(REMOVED_EPOCH_STREAM_NAME, min_epoch.0)
2374            .await?
2375        {
2376            epoch_change_ops.push(Operation::system(SystemOperation::ProcessRemovedEpoch(
2377                min_epoch,
2378            )));
2379            min_epoch.try_add_assign_one()?;
2380        }
2381        Ok(epoch_change_ops)
2382    }
2383
2384    /// Returns whether the system event on the admin chain with the given stream name and key
2385    /// exists in storage.
2386    async fn has_admin_event(&self, stream_name: &[u8], index: u32) -> Result<bool, Error> {
2387        let event_id = EventId {
2388            chain_id: self.client.admin_chain_id,
2389            stream_id: StreamId::system(stream_name),
2390            index,
2391        };
2392        Ok(self
2393            .client
2394            .storage_client()
2395            .read_event(event_id)
2396            .await?
2397            .is_some())
2398    }
2399
2400    /// Returns the indices and events from the storage
2401    pub async fn events_from_index(
2402        &self,
2403        stream_id: StreamId,
2404        start_index: u32,
2405    ) -> Result<Vec<IndexAndEvent>, Error> {
2406        Ok(self
2407            .client
2408            .storage_client()
2409            .read_events_from_index(&self.chain_id, &stream_id, start_index)
2410            .await?)
2411    }
2412
2413    /// Deprecates all the configurations of voting rights up to the given one (admin chains
2414    /// only). Currently, each individual chain is still entitled to wait before accepting
2415    /// this command. However, it is expected that deprecated validators stop functioning
2416    /// shortly after such command is issued.
2417    #[instrument(level = "trace")]
2418    pub async fn revoke_epochs(
2419        &self,
2420        revoked_epoch: Epoch,
2421    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2422        self.prepare_chain().await?;
2423        let (current_epoch, committees) = self.epoch_and_committees().await?;
2424        ensure!(
2425            revoked_epoch < current_epoch,
2426            Error::CannotRevokeCurrentEpoch(current_epoch)
2427        );
2428        ensure!(
2429            committees.contains_key(&revoked_epoch),
2430            Error::EpochAlreadyRevoked
2431        );
2432        let operations = committees
2433            .keys()
2434            .filter_map(|epoch| {
2435                if *epoch <= revoked_epoch {
2436                    Some(Operation::system(SystemOperation::Admin(
2437                        AdminOperation::RemoveCommittee { epoch: *epoch },
2438                    )))
2439                } else {
2440                    None
2441                }
2442            })
2443            .collect();
2444        self.execute_operations(operations, vec![]).await
2445    }
2446
2447    /// Sends money to a chain.
2448    /// Do not check balance. (This may block the client)
2449    /// Do not confirm the transaction.
2450    #[instrument(level = "trace")]
2451    pub async fn transfer_to_account_unsafe_unconfirmed(
2452        &self,
2453        owner: AccountOwner,
2454        amount: Amount,
2455        recipient: Account,
2456    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2457        self.execute_operation(SystemOperation::Transfer {
2458            owner,
2459            recipient,
2460            amount,
2461        })
2462        .await
2463    }
2464
2465    #[instrument(level = "trace", skip(hash))]
2466    pub async fn read_confirmed_block(&self, hash: CryptoHash) -> Result<ConfirmedBlock, Error> {
2467        let block = self
2468            .client
2469            .storage_client()
2470            .read_confirmed_block(hash)
2471            .await?;
2472        block.ok_or(Error::MissingConfirmedBlock(hash))
2473    }
2474
2475    #[instrument(level = "trace", skip(hash))]
2476    pub async fn read_certificate(
2477        &self,
2478        hash: CryptoHash,
2479    ) -> Result<ConfirmedBlockCertificate, Error> {
2480        let certificate = self.client.storage_client().read_certificate(hash).await?;
2481        certificate.ok_or(Error::ReadCertificatesError(vec![hash]))
2482    }
2483
2484    /// Handles any cross-chain requests for any pending outgoing messages.
2485    #[instrument(level = "trace")]
2486    pub async fn retry_pending_outgoing_messages(&self) -> Result<(), Error> {
2487        self.client
2488            .local_node
2489            .retry_pending_cross_chain_requests(self.chain_id)
2490            .await?;
2491        Ok(())
2492    }
2493
2494    #[instrument(level = "trace", skip(local_node))]
2495    async fn local_chain_info(
2496        &self,
2497        chain_id: ChainId,
2498        local_node: &mut LocalNodeClient<Env::Storage>,
2499    ) -> Result<Option<Box<ChainInfo>>, Error> {
2500        match local_node.chain_info(chain_id).await {
2501            Ok(info) => {
2502                // Useful in case `chain_id` is the same as a local chain.
2503                self.client.update_from_info(&info);
2504                Ok(Some(info))
2505            }
2506            Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => Ok(None),
2507            Err(err) => Err(err.into()),
2508        }
2509    }
2510
2511    #[instrument(level = "trace", skip(chain_id, local_node))]
2512    async fn local_next_block_height(
2513        &self,
2514        chain_id: ChainId,
2515        local_node: &mut LocalNodeClient<Env::Storage>,
2516    ) -> Result<BlockHeight, Error> {
2517        Ok(self
2518            .local_chain_info(chain_id, local_node)
2519            .await?
2520            .map_or(BlockHeight::ZERO, |info| info.next_block_height))
2521    }
2522
2523    /// Returns the next height we expect to receive from the given sender chain, according to the
2524    /// local inbox.
2525    #[instrument(level = "trace")]
2526    async fn local_next_height_to_receive(&self, origin: ChainId) -> Result<BlockHeight, Error> {
2527        Ok(self
2528            .client
2529            .local_node
2530            .get_inbox_next_height(self.chain_id, origin)
2531            .await?)
2532    }
2533
2534    #[instrument(level = "trace", skip(remote_node, local_node, notification))]
2535    async fn process_notification(
2536        &self,
2537        remote_node: RemoteNode<Env::ValidatorNode>,
2538        mut local_node: LocalNodeClient<Env::Storage>,
2539        notification: Notification,
2540    ) -> Result<(), Error> {
2541        let listening_mode = self.listening_mode();
2542        let relevant = listening_mode
2543            .as_ref()
2544            .is_some_and(|mode| mode.is_relevant(&notification.reason));
2545        if !relevant {
2546            debug!(
2547                chain_id = %self.chain_id,
2548                reason = ?notification.reason,
2549                ?listening_mode,
2550                "Ignoring notification due to listening mode"
2551            );
2552            return Ok(());
2553        }
2554        match notification.reason {
2555            Reason::NewIncomingBundle { origin, height } => {
2556                if self.local_next_height_to_receive(origin).await? > height {
2557                    debug!(
2558                        chain_id = %self.chain_id,
2559                        "Accepting redundant notification for new message"
2560                    );
2561                    return Ok(());
2562                }
2563                self.client
2564                    .download_sender_block_with_sending_ancestors(
2565                        self.chain_id,
2566                        origin,
2567                        height,
2568                        &remote_node,
2569                    )
2570                    .await?;
2571                if self.local_next_height_to_receive(origin).await? <= height {
2572                    info!(
2573                        chain_id = %self.chain_id,
2574                        "NewIncomingBundle: Fail to synchronize new message after notification"
2575                    );
2576                }
2577            }
2578            Reason::NewBlock { height, .. } => {
2579                let chain_id = notification.chain_id;
2580                if self
2581                    .local_next_block_height(chain_id, &mut local_node)
2582                    .await?
2583                    > height
2584                {
2585                    debug!(
2586                        chain_id = %self.chain_id,
2587                        "Accepting redundant notification for new block"
2588                    );
2589                    return Ok(());
2590                }
2591                self.client
2592                    .synchronize_chain_state_from(&remote_node, chain_id)
2593                    .await?;
2594                if self
2595                    .local_next_block_height(chain_id, &mut local_node)
2596                    .await?
2597                    <= height
2598                {
2599                    info!("NewBlock: Fail to synchronize new block after notification");
2600                }
2601                trace!(
2602                    chain_id = %self.chain_id,
2603                    %height,
2604                    "NewBlock: processed notification",
2605                );
2606            }
2607            Reason::NewEvents { height, hash, .. } => {
2608                if self
2609                    .local_next_block_height(notification.chain_id, &mut local_node)
2610                    .await?
2611                    > height
2612                {
2613                    debug!(
2614                        chain_id = %self.chain_id,
2615                        "Accepting redundant notification for new block"
2616                    );
2617                    return Ok(());
2618                }
2619                trace!(
2620                    chain_id = %self.chain_id,
2621                    %height,
2622                    "NewEvents: processing notification"
2623                );
2624                let mut certificates = remote_node.node.download_certificates(vec![hash]).await?;
2625                // download_certificates ensures that we will get exactly one
2626                // certificate in the result.
2627                let certificate = certificates
2628                    .pop()
2629                    .expect("download_certificates should have returned one certificate");
2630                self.client
2631                    .receive_sender_certificate(
2632                        certificate,
2633                        ReceiveCertificateMode::NeedsCheck,
2634                        None,
2635                    )
2636                    .await?;
2637            }
2638            Reason::NewRound { height, round } => {
2639                let chain_id = notification.chain_id;
2640                if let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? {
2641                    if (info.next_block_height, info.manager.current_round) >= (height, round) {
2642                        debug!(
2643                            chain_id = %self.chain_id,
2644                            "Accepting redundant notification for new round"
2645                        );
2646                        return Ok(());
2647                    }
2648                }
2649                self.client
2650                    .synchronize_chain_state_from(&remote_node, chain_id)
2651                    .await?;
2652                let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? else {
2653                    error!(
2654                        chain_id = %self.chain_id,
2655                        "NewRound: Fail to read local chain info for {chain_id}"
2656                    );
2657                    return Ok(());
2658                };
2659                if (info.next_block_height, info.manager.current_round) < (height, round) {
2660                    error!(
2661                        chain_id = %self.chain_id,
2662                        "NewRound: Fail to synchronize new block after notification"
2663                    );
2664                }
2665            }
2666            Reason::BlockExecuted { .. } => {
2667                // No action needed.
2668            }
2669        }
2670        Ok(())
2671    }
2672
2673    /// Returns whether this chain is tracked by the client, i.e. we are updating its inbox.
2674    pub fn is_tracked(&self) -> bool {
2675        self.client.is_tracked(self.chain_id)
2676    }
2677
2678    /// Returns the listening mode for this chain, if it is tracked.
2679    pub fn listening_mode(&self) -> Option<ListeningMode> {
2680        self.client.chain_mode(self.chain_id)
2681    }
2682
2683    /// Spawns a task that listens to notifications about the current chain from all validators,
2684    /// and synchronizes the local state accordingly.
2685    ///
2686    /// The listening mode must be set in `Client::chain_modes` before calling this method.
2687    #[instrument(level = "trace", fields(chain_id = ?self.chain_id))]
2688    pub async fn listen(
2689        &self,
2690    ) -> Result<(impl Future<Output = ()>, AbortOnDrop, NotificationStream), Error> {
2691        use future::FutureExt as _;
2692
2693        async fn await_while_polling<F: FusedFuture>(
2694            future: F,
2695            background_work: impl FusedStream<Item = ()>,
2696        ) -> F::Output {
2697            tokio::pin!(future);
2698            tokio::pin!(background_work);
2699            loop {
2700                futures::select! {
2701                    _ = background_work.next() => (),
2702                    result = future => return result,
2703                }
2704            }
2705        }
2706
2707        let mut senders = HashMap::new(); // Senders to cancel notification streams.
2708        let notifications = self.subscribe()?;
2709        let (abortable_notifications, abort) = stream::abortable(self.subscribe()?);
2710
2711        // Beware: if this future ceases to make progress, notification processing will
2712        // deadlock, because of the issue described in
2713        // https://github.com/linera-io/linera-protocol/pull/1173.
2714
2715        // TODO(#2013): replace this lock with an asynchronous communication channel
2716
2717        let mut process_notifications = FuturesUnordered::new();
2718
2719        match self.update_notification_streams(&mut senders).await {
2720            Ok(handler) => process_notifications.push(handler),
2721            Err(error) => error!("Failed to update committee: {error}"),
2722        };
2723
2724        let this = self.clone();
2725        let update_streams = async move {
2726            let mut abortable_notifications = abortable_notifications.fuse();
2727
2728            while let Some(notification) =
2729                await_while_polling(abortable_notifications.next(), &mut process_notifications)
2730                    .await
2731            {
2732                if let Reason::NewBlock { .. } = notification.reason {
2733                    match Box::pin(await_while_polling(
2734                        this.update_notification_streams(&mut senders).fuse(),
2735                        &mut process_notifications,
2736                    ))
2737                    .await
2738                    {
2739                        Ok(handler) => process_notifications.push(handler),
2740                        Err(error) => error!("Failed to update committee: {error}"),
2741                    }
2742                }
2743            }
2744
2745            for abort in senders.into_values() {
2746                abort.abort();
2747            }
2748
2749            let () = process_notifications.collect().await;
2750        }
2751        .in_current_span();
2752
2753        Ok((update_streams, AbortOnDrop(abort), notifications))
2754    }
2755
2756    #[instrument(level = "trace", skip(senders))]
2757    async fn update_notification_streams(
2758        &self,
2759        senders: &mut HashMap<ValidatorPublicKey, AbortHandle>,
2760    ) -> Result<impl Future<Output = ()>, Error> {
2761        let (nodes, local_node) = {
2762            let committee = self.local_committee().await?;
2763            let nodes: HashMap<_, _> = self
2764                .client
2765                .validator_node_provider()
2766                .make_nodes(&committee)?
2767                .collect();
2768            (nodes, self.client.local_node.clone())
2769        };
2770        // Drop removed validators.
2771        senders.retain(|validator, abort| {
2772            if !nodes.contains_key(validator) {
2773                abort.abort();
2774            }
2775            !abort.is_aborted()
2776        });
2777        // Add tasks for new validators.
2778        let validator_tasks = FuturesUnordered::new();
2779        for (public_key, node) in nodes {
2780            let hash_map::Entry::Vacant(entry) = senders.entry(public_key) else {
2781                continue;
2782            };
2783            let address = node.address();
2784            let this = self.clone();
2785            let stream = stream::once({
2786                let node = node.clone();
2787                async move {
2788                    let stream = node.subscribe(vec![this.chain_id]).await?;
2789                    // Only now the notification stream is established. We may have missed
2790                    // notifications since the last time we synchronized.
2791                    let remote_node = RemoteNode { public_key, node };
2792                    this.client
2793                        .synchronize_chain_state_from(&remote_node, this.chain_id)
2794                        .await?;
2795                    Ok::<_, Error>(stream)
2796                }
2797            })
2798            .filter_map(move |result| {
2799                let address = address.clone();
2800                async move {
2801                    if let Err(error) = &result {
2802                        info!(?error, address, "could not connect to validator");
2803                    } else {
2804                        debug!(address, "connected to validator");
2805                    }
2806                    result.ok()
2807                }
2808            })
2809            .flatten();
2810            let (stream, abort) = stream::abortable(stream);
2811            let mut stream = Box::pin(stream);
2812            let this = self.clone();
2813            let local_node = local_node.clone();
2814            let remote_node = RemoteNode { public_key, node };
2815            validator_tasks.push(async move {
2816                while let Some(notification) = stream.next().await {
2817                    if let Err(error) = this
2818                        .process_notification(
2819                            remote_node.clone(),
2820                            local_node.clone(),
2821                            notification.clone(),
2822                        )
2823                        .await
2824                    {
2825                        tracing::info!(
2826                            chain_id = %this.chain_id,
2827                            address = remote_node.address(),
2828                            ?notification,
2829                            %error,
2830                            "failed to process notification",
2831                        );
2832                    }
2833                }
2834            });
2835            entry.insert(abort);
2836        }
2837        Ok(validator_tasks.collect())
2838    }
2839
2840    /// Attempts to update a validator with the local information.
2841    #[instrument(level = "trace", skip(remote_node))]
2842    pub async fn sync_validator(&self, remote_node: Env::ValidatorNode) -> Result<(), Error> {
2843        let validator_next_block_height = match remote_node
2844            .handle_chain_info_query(ChainInfoQuery::new(self.chain_id))
2845            .await
2846        {
2847            Ok(info) => info.info.next_block_height,
2848            Err(NodeError::BlobsNotFound(_)) => BlockHeight::ZERO,
2849            Err(err) => return Err(err.into()),
2850        };
2851        let local_next_block_height = self.chain_info().await?.next_block_height;
2852
2853        if validator_next_block_height >= local_next_block_height {
2854            debug!("Validator is up-to-date with local state");
2855            return Ok(());
2856        }
2857
2858        let heights: Vec<_> = (validator_next_block_height.0..local_next_block_height.0)
2859            .map(BlockHeight)
2860            .collect();
2861
2862        let certificates = self
2863            .client
2864            .storage_client()
2865            .read_certificates_by_heights(self.chain_id, &heights)
2866            .await?
2867            .into_iter()
2868            .flatten()
2869            .collect::<Vec<_>>();
2870
2871        for certificate in certificates {
2872            match remote_node
2873                .handle_confirmed_certificate(
2874                    certificate.clone(),
2875                    CrossChainMessageDelivery::NonBlocking,
2876                )
2877                .await
2878            {
2879                Ok(_) => (),
2880                Err(NodeError::BlobsNotFound(missing_blob_ids)) => {
2881                    // Upload the missing blobs we have and retry.
2882                    let missing_blobs: Vec<_> = self
2883                        .client
2884                        .storage_client()
2885                        .read_blobs(&missing_blob_ids)
2886                        .await?
2887                        .into_iter()
2888                        .flatten()
2889                        .collect();
2890                    remote_node.upload_blobs(missing_blobs).await?;
2891                    remote_node
2892                        .handle_confirmed_certificate(
2893                            certificate,
2894                            CrossChainMessageDelivery::NonBlocking,
2895                        )
2896                        .await?;
2897                }
2898                Err(err) => return Err(err.into()),
2899            }
2900        }
2901
2902        Ok(())
2903    }
2904}
2905
2906#[cfg(with_testing)]
2907impl<Env: Environment> ChainClient<Env> {
2908    pub async fn process_notification_from(
2909        &self,
2910        notification: Notification,
2911        validator: (ValidatorPublicKey, &str),
2912    ) {
2913        let mut node_list = self
2914            .client
2915            .validator_node_provider()
2916            .make_nodes_from_list(vec![validator])
2917            .unwrap();
2918        let (public_key, node) = node_list.next().unwrap();
2919        let remote_node = RemoteNode { node, public_key };
2920        let local_node = self.client.local_node.clone();
2921        self.process_notification(remote_node, local_node, notification)
2922            .await
2923            .unwrap();
2924    }
2925}