linera_core/client/chain_client/
mod.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4mod state;
5use std::{
6    collections::{hash_map, BTreeMap, BTreeSet, HashMap},
7    convert::Infallible,
8    iter,
9    sync::Arc,
10};
11
12use custom_debug_derive::Debug;
13use futures::{
14    future::{self, Either, FusedFuture, Future, FutureExt},
15    select,
16    stream::{self, AbortHandle, FusedStream, FuturesUnordered, StreamExt, TryStreamExt},
17};
18#[cfg(with_metrics)]
19use linera_base::prometheus_util::MeasureLatency as _;
20use linera_base::{
21    abi::Abi,
22    crypto::{signer, AccountPublicKey, CryptoHash, Signer, ValidatorPublicKey},
23    data_types::{
24        Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
25        ChainDescription, Epoch, Round, Timestamp,
26    },
27    ensure,
28    identifiers::{
29        Account, AccountOwner, ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent,
30        ModuleId, StreamId,
31    },
32    ownership::{ChainOwnership, TimeoutConfig},
33    time::{Duration, Instant},
34};
35#[cfg(not(target_arch = "wasm32"))]
36use linera_base::{data_types::Bytecode, vm::VmRuntime};
37use linera_chain::{
38    data_types::{BlockProposal, ChainAndHeight, IncomingBundle, ProposedBlock, Transaction},
39    manager::LockingBlock,
40    types::{
41        Block, ConfirmedBlock, ConfirmedBlockCertificate, Timeout, TimeoutCertificate,
42        ValidatedBlock,
43    },
44    ChainError, ChainExecutionContext, ChainStateView,
45};
46use linera_execution::{
47    committee::Committee,
48    system::{
49        AdminOperation, OpenChainConfig, SystemOperation, EPOCH_STREAM_NAME,
50        REMOVED_EPOCH_STREAM_NAME,
51    },
52    ExecutionError, Operation, Query, QueryOutcome, QueryResponse, SystemQuery, SystemResponse,
53};
54use linera_storage::{Clock as _, ResultReadCertificates, Storage as _};
55use linera_views::ViewError;
56use rand::seq::SliceRandom;
57use serde::Serialize;
58pub use state::State;
59use thiserror::Error;
60use tokio::sync::{mpsc, OwnedRwLockReadGuard};
61use tokio_stream::wrappers::UnboundedReceiverStream;
62use tokio_util::sync::CancellationToken;
63use tracing::{debug, error, info, instrument, trace, warn, Instrument as _};
64
65use super::{
66    received_log::ReceivedLogs, validator_trackers::ValidatorTrackers, AbortOnDrop, Client,
67    ExecuteBlockOutcome, ListeningMode, MessagePolicy, PendingProposal, ReceiveCertificateMode,
68    TimingType,
69};
70use crate::{
71    data_types::{ChainInfo, ChainInfoQuery, ClientOutcome, RoundTimeout},
72    environment::Environment,
73    local_node::{LocalChainInfoExt as _, LocalNodeClient, LocalNodeError},
74    node::{
75        CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
76        ValidatorNodeProvider as _,
77    },
78    remote_node::RemoteNode,
79    updater::{communicate_with_quorum, CommunicateAction, CommunicationError},
80    worker::{Notification, Reason, WorkerError},
81};
82
83#[derive(Debug, Clone)]
84pub struct Options {
85    /// Maximum number of pending message bundles processed at a time in a block.
86    pub max_pending_message_bundles: usize,
87    /// The policy for automatically handling incoming messages.
88    pub message_policy: MessagePolicy,
89    /// Whether to block on cross-chain message delivery.
90    pub cross_chain_message_delivery: CrossChainMessageDelivery,
91    /// An additional delay, after reaching a quorum, to wait for additional validator signatures,
92    /// as a fraction of time taken to reach quorum.
93    pub quorum_grace_period: f64,
94    /// The delay when downloading a blob, after which we try a second validator.
95    pub blob_download_timeout: Duration,
96    /// The delay when downloading a batch of certificates, after which we try a second validator.
97    pub certificate_batch_download_timeout: Duration,
98    /// Maximum number of certificates that we download at a time from one validator when
99    /// synchronizing one of our chains.
100    pub certificate_download_batch_size: u64,
101    /// Maximum number of sender certificates we try to download and receive in one go
102    /// when syncing sender chains.
103    pub sender_certificate_download_batch_size: usize,
104    /// Maximum number of tasks that can be joined concurrently using buffer_unordered.
105    pub max_joined_tasks: usize,
106}
107
108#[cfg(with_testing)]
109impl Options {
110    pub fn test_default() -> Self {
111        use super::{
112            DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE, DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
113        };
114        use crate::DEFAULT_QUORUM_GRACE_PERIOD;
115
116        Options {
117            max_pending_message_bundles: 10,
118            message_policy: MessagePolicy::new_accept_all(),
119            cross_chain_message_delivery: CrossChainMessageDelivery::NonBlocking,
120            quorum_grace_period: DEFAULT_QUORUM_GRACE_PERIOD,
121            blob_download_timeout: Duration::from_secs(1),
122            certificate_batch_download_timeout: Duration::from_secs(1),
123            certificate_download_batch_size: DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
124            sender_certificate_download_batch_size: DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
125            max_joined_tasks: 100,
126        }
127    }
128}
129
130/// Client to operate a chain by interacting with validators and the given local storage
131/// implementation.
132/// * The chain being operated is called the "local chain" or just the "chain".
133/// * As a rule, operations are considered successful (and communication may stop) when
134///   they succeeded in gathering a quorum of responses.
135#[derive(Debug)]
136pub struct ChainClient<Env: Environment> {
137    /// The Linera [`Client`] that manages operations for this chain client.
138    #[debug(skip)]
139    pub(crate) client: Arc<Client<Env>>,
140    /// The off-chain chain ID.
141    chain_id: ChainId,
142    /// The client options.
143    #[debug(skip)]
144    options: Options,
145    /// The preferred owner of the chain used to sign proposals.
146    /// `None` if we cannot propose on this chain.
147    preferred_owner: Option<AccountOwner>,
148    /// The next block height as read from the wallet.
149    initial_next_block_height: BlockHeight,
150    /// The last block hash as read from the wallet.
151    initial_block_hash: Option<CryptoHash>,
152    /// Optional timing sender for benchmarking.
153    timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
154}
155
156impl<Env: Environment> Clone for ChainClient<Env> {
157    fn clone(&self) -> Self {
158        Self {
159            client: self.client.clone(),
160            chain_id: self.chain_id,
161            options: self.options.clone(),
162            preferred_owner: self.preferred_owner,
163            initial_next_block_height: self.initial_next_block_height,
164            initial_block_hash: self.initial_block_hash,
165            timing_sender: self.timing_sender.clone(),
166        }
167    }
168}
169
170/// Error type for [`ChainClient`].
171#[derive(Debug, Error)]
172pub enum Error {
173    #[error("Local node operation failed: {0}")]
174    LocalNodeError(#[from] LocalNodeError),
175
176    #[error("Remote node operation failed: {0}")]
177    RemoteNodeError(#[from] NodeError),
178
179    #[error(transparent)]
180    ArithmeticError(#[from] ArithmeticError),
181
182    #[error("Missing certificates: {0:?}")]
183    ReadCertificatesError(Vec<CryptoHash>),
184
185    #[error("Missing confirmed block: {0:?}")]
186    MissingConfirmedBlock(CryptoHash),
187
188    #[error("JSON (de)serialization error: {0}")]
189    JsonError(#[from] serde_json::Error),
190
191    #[error("Chain operation failed: {0}")]
192    ChainError(#[from] ChainError),
193
194    #[error(transparent)]
195    CommunicationError(#[from] CommunicationError<NodeError>),
196
197    #[error("Internal error within chain client: {0}")]
198    InternalError(&'static str),
199
200    #[error(
201        "Cannot accept a certificate from an unknown committee in the future. \
202         Please synchronize the local view of the admin chain"
203    )]
204    CommitteeSynchronizationError,
205
206    #[error("The local node is behind the trusted state in wallet and needs synchronization with validators")]
207    WalletSynchronizationError,
208
209    #[error("The state of the client is incompatible with the proposed block: {0}")]
210    BlockProposalError(&'static str),
211
212    #[error(
213        "Cannot accept a certificate from a committee that was retired. \
214         Try a newer certificate from the same origin"
215    )]
216    CommitteeDeprecationError,
217
218    #[error("Protocol error within chain client: {0}")]
219    ProtocolError(&'static str),
220
221    #[error("Signer doesn't have key to sign for chain {0}")]
222    CannotFindKeyForChain(ChainId),
223
224    #[error("client is not configured to propose on chain {0}")]
225    NoAccountKeyConfigured(ChainId),
226
227    #[error("The chain client isn't owner on chain {0}")]
228    NotAnOwner(ChainId),
229
230    #[error(transparent)]
231    ViewError(#[from] ViewError),
232
233    #[error(
234        "Failed to download certificates and update local node to the next height \
235         {target_next_block_height} of chain {chain_id}"
236    )]
237    CannotDownloadCertificates {
238        chain_id: ChainId,
239        target_next_block_height: BlockHeight,
240    },
241
242    #[error(transparent)]
243    BcsError(#[from] bcs::Error),
244
245    #[error(
246        "Unexpected quorum: validators voted for block hash {hash} in {round}, \
247         expected block hash {expected_hash} in {expected_round}"
248    )]
249    UnexpectedQuorum {
250        hash: CryptoHash,
251        round: Round,
252        expected_hash: CryptoHash,
253        expected_round: Round,
254    },
255
256    #[error("signer error: {0:?}")]
257    Signer(#[source] Box<dyn signer::Error>),
258
259    #[error("Cannot revoke the current epoch {0}")]
260    CannotRevokeCurrentEpoch(Epoch),
261
262    #[error("Epoch is already revoked")]
263    EpochAlreadyRevoked,
264
265    #[error("Failed to download missing sender blocks from chain {chain_id} at height {height}")]
266    CannotDownloadMissingSenderBlock {
267        chain_id: ChainId,
268        height: BlockHeight,
269    },
270}
271
272impl From<Infallible> for Error {
273    fn from(infallible: Infallible) -> Self {
274        match infallible {}
275    }
276}
277
278impl Error {
279    pub fn signer_failure(err: impl signer::Error + 'static) -> Self {
280        Self::Signer(Box::new(err))
281    }
282}
283
284impl<Env: Environment> ChainClient<Env> {
285    pub fn new(
286        client: Arc<Client<Env>>,
287        chain_id: ChainId,
288        options: Options,
289        initial_block_hash: Option<CryptoHash>,
290        initial_next_block_height: BlockHeight,
291        preferred_owner: Option<AccountOwner>,
292        timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
293    ) -> Self {
294        ChainClient {
295            client,
296            chain_id,
297            options,
298            preferred_owner,
299            initial_block_hash,
300            initial_next_block_height,
301            timing_sender,
302        }
303    }
304
305    /// Returns whether this chain is in follow-only mode.
306    pub fn is_follow_only(&self) -> bool {
307        self.client.is_chain_follow_only(self.chain_id)
308    }
309
310    /// Gets the client mutex from the chain's state.
311    #[instrument(level = "trace", skip(self))]
312    fn client_mutex(&self) -> Arc<tokio::sync::Mutex<()>> {
313        self.client
314            .chains
315            .pin()
316            .get(&self.chain_id)
317            .expect("Chain client constructed for invalid chain")
318            .client_mutex()
319    }
320
321    /// Gets the next pending block.
322    #[instrument(level = "trace", skip(self))]
323    pub fn pending_proposal(&self) -> Option<PendingProposal> {
324        self.client
325            .chains
326            .pin()
327            .get(&self.chain_id)
328            .expect("Chain client constructed for invalid chain")
329            .pending_proposal()
330            .clone()
331    }
332
333    /// Updates the chain's state using a closure.
334    #[instrument(level = "trace", skip(self, f))]
335    fn update_state<F>(&self, f: F)
336    where
337        F: Fn(&mut State),
338    {
339        let chains = self.client.chains.pin();
340        chains
341            .update(self.chain_id, |state| {
342                let mut state = state.clone_for_update_unchecked();
343                f(&mut state);
344                state
345            })
346            .expect("Chain client constructed for invalid chain");
347    }
348
349    /// Gets a reference to the client's signer instance.
350    #[instrument(level = "trace", skip(self))]
351    pub fn signer(&self) -> &impl Signer {
352        self.client.signer()
353    }
354
355    /// Gets a mutable reference to the per-`ChainClient` options.
356    #[instrument(level = "trace", skip(self))]
357    pub fn options_mut(&mut self) -> &mut Options {
358        &mut self.options
359    }
360
361    /// Gets a reference to the per-`ChainClient` options.
362    #[instrument(level = "trace", skip(self))]
363    pub fn options(&self) -> &Options {
364        &self.options
365    }
366
367    /// Gets the ID of the associated chain.
368    #[instrument(level = "trace", skip(self))]
369    pub fn chain_id(&self) -> ChainId {
370        self.chain_id
371    }
372
373    /// Gets a clone of the timing sender for benchmarking.
374    pub fn timing_sender(&self) -> Option<mpsc::UnboundedSender<(u64, TimingType)>> {
375        self.timing_sender.clone()
376    }
377
378    /// Gets the ID of the admin chain.
379    #[instrument(level = "trace", skip(self))]
380    pub fn admin_id(&self) -> ChainId {
381        self.client.admin_id
382    }
383
384    /// Gets the currently preferred owner for signing the blocks.
385    #[instrument(level = "trace", skip(self))]
386    pub fn preferred_owner(&self) -> Option<AccountOwner> {
387        self.preferred_owner
388    }
389
390    /// Sets the new, preferred owner for signing the blocks.
391    #[instrument(level = "trace", skip(self))]
392    pub fn set_preferred_owner(&mut self, preferred_owner: AccountOwner) {
393        self.preferred_owner = Some(preferred_owner);
394    }
395
396    /// Unsets the preferred owner for signing the blocks.
397    #[instrument(level = "trace", skip(self))]
398    pub fn unset_preferred_owner(&mut self) {
399        self.preferred_owner = None;
400    }
401
402    /// Obtains a `ChainStateView` for this client's chain.
403    #[instrument(level = "trace")]
404    pub async fn chain_state_view(
405        &self,
406    ) -> Result<OwnedRwLockReadGuard<ChainStateView<Env::StorageContext>>, LocalNodeError> {
407        self.client.local_node.chain_state_view(self.chain_id).await
408    }
409
410    /// Returns chain IDs that this chain subscribes to.
411    #[instrument(level = "trace", skip(self))]
412    pub async fn event_stream_publishers(
413        &self,
414    ) -> Result<BTreeMap<ChainId, BTreeSet<StreamId>>, LocalNodeError> {
415        let subscriptions = self
416            .client
417            .local_node
418            .get_event_subscriptions(self.chain_id)
419            .await?;
420        let mut publishers = subscriptions.into_iter().fold(
421            BTreeMap::<ChainId, BTreeSet<StreamId>>::new(),
422            |mut map, ((chain_id, stream_id), _)| {
423                map.entry(chain_id).or_default().insert(stream_id);
424                map
425            },
426        );
427        if self.chain_id != self.client.admin_id {
428            publishers.insert(
429                self.client.admin_id,
430                vec![
431                    StreamId::system(EPOCH_STREAM_NAME),
432                    StreamId::system(REMOVED_EPOCH_STREAM_NAME),
433                ]
434                .into_iter()
435                .collect(),
436            );
437        }
438        Ok(publishers)
439    }
440
441    /// Subscribes to notifications from this client's chain.
442    #[instrument(level = "trace")]
443    pub fn subscribe(&self) -> Result<NotificationStream, LocalNodeError> {
444        self.subscribe_to(self.chain_id)
445    }
446
447    /// Subscribes to notifications from the specified chain.
448    #[instrument(level = "trace")]
449    pub fn subscribe_to(&self, chain_id: ChainId) -> Result<NotificationStream, LocalNodeError> {
450        Ok(Box::pin(UnboundedReceiverStream::new(
451            self.client.notifier.subscribe(vec![chain_id]),
452        )))
453    }
454
455    /// Returns the storage client used by this client's local node.
456    #[instrument(level = "trace")]
457    pub fn storage_client(&self) -> &Env::Storage {
458        self.client.storage_client()
459    }
460
461    /// Obtains the basic `ChainInfo` data for the local chain.
462    #[instrument(level = "trace")]
463    pub async fn chain_info(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
464        let query = ChainInfoQuery::new(self.chain_id);
465        let response = self
466            .client
467            .local_node
468            .handle_chain_info_query(query)
469            .await?;
470        self.client.update_from_info(&response.info);
471        Ok(response.info)
472    }
473
474    /// Obtains the basic `ChainInfo` data for the local chain, with chain manager values.
475    #[instrument(level = "trace")]
476    pub async fn chain_info_with_manager_values(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
477        let query = ChainInfoQuery::new(self.chain_id)
478            .with_manager_values()
479            .with_committees();
480        let response = self
481            .client
482            .local_node
483            .handle_chain_info_query(query)
484            .await?;
485        self.client.update_from_info(&response.info);
486        Ok(response.info)
487    }
488
489    /// Returns the chain's description. Fetches it from the validators if necessary.
490    pub async fn get_chain_description(&self) -> Result<ChainDescription, Error> {
491        self.client.get_chain_description(self.chain_id).await
492    }
493
494    /// Obtains up to `self.options.max_pending_message_bundles` pending message bundles for the
495    /// local chain.
496    #[instrument(level = "trace")]
497    async fn pending_message_bundles(&self) -> Result<Vec<IncomingBundle>, Error> {
498        if self.options.message_policy.is_ignore() {
499            // Ignore all messages.
500            return Ok(Vec::new());
501        }
502
503        let query = ChainInfoQuery::new(self.chain_id).with_pending_message_bundles();
504        let info = self
505            .client
506            .local_node
507            .handle_chain_info_query(query)
508            .await?
509            .info;
510        if self.preferred_owner.is_some_and(|owner| {
511            info.manager
512                .ownership
513                .is_super_owner_no_regular_owners(&owner)
514        }) {
515            // There are only super owners; they are expected to sync manually.
516            ensure!(
517                info.next_block_height >= self.initial_next_block_height,
518                Error::WalletSynchronizationError
519            );
520        }
521
522        Ok(info
523            .requested_pending_message_bundles
524            .into_iter()
525            .filter_map(|bundle| self.options.message_policy.apply(bundle))
526            .take(self.options.max_pending_message_bundles)
527            .collect())
528    }
529
530    /// Returns an `UpdateStreams` operation that updates this client's chain about new events
531    /// in any of the streams its applications are subscribing to. Returns `None` if there are no
532    /// new events.
533    #[instrument(level = "trace")]
534    async fn collect_stream_updates(&self) -> Result<Option<Operation>, Error> {
535        // Load all our subscriptions.
536        let subscription_map = self
537            .client
538            .local_node
539            .get_event_subscriptions(self.chain_id)
540            .await?;
541        // Collect the indices of all new events.
542        let futures = subscription_map
543            .into_iter()
544            .filter(|((chain_id, _), _)| {
545                self.options
546                    .message_policy
547                    .restrict_chain_ids_to
548                    .as_ref()
549                    .is_none_or(|chain_set| chain_set.contains(chain_id))
550            })
551            .map(|((chain_id, stream_id), subscriptions)| {
552                let client = self.client.clone();
553                async move {
554                    let next_expected_index = client
555                        .local_node
556                        .get_next_expected_event(chain_id, stream_id.clone())
557                        .await?;
558                    if let Some(next_index) = next_expected_index
559                        .filter(|next_index| *next_index > subscriptions.next_index)
560                    {
561                        Ok(Some((chain_id, stream_id, next_index)))
562                    } else {
563                        Ok::<_, Error>(None)
564                    }
565                }
566            });
567        let updates = futures::stream::iter(futures)
568            .buffer_unordered(self.options.max_joined_tasks)
569            .try_collect::<Vec<_>>()
570            .await?
571            .into_iter()
572            .flatten()
573            .collect::<Vec<_>>();
574        if updates.is_empty() {
575            return Ok(None);
576        }
577        Ok(Some(SystemOperation::UpdateStreams(updates).into()))
578    }
579
580    #[instrument(level = "trace")]
581    async fn chain_info_with_committees(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
582        self.client.chain_info_with_committees(self.chain_id).await
583    }
584
585    /// Obtains the current epoch of the local chain as well as its set of trusted committees.
586    #[instrument(level = "trace")]
587    async fn epoch_and_committees(
588        &self,
589    ) -> Result<(Epoch, BTreeMap<Epoch, Committee>), LocalNodeError> {
590        let info = self.chain_info_with_committees().await?;
591        let epoch = info.epoch;
592        let committees = info.into_committees()?;
593        Ok((epoch, committees))
594    }
595
596    /// Obtains the committee for the current epoch of the local chain.
597    #[instrument(level = "trace")]
598    pub async fn local_committee(&self) -> Result<Committee, Error> {
599        let info = match self.chain_info_with_committees().await {
600            Ok(info) => info,
601            Err(LocalNodeError::BlobsNotFound(_)) => {
602                self.synchronize_chain_state(self.chain_id).await?;
603                self.chain_info_with_committees().await?
604            }
605            Err(err) => return Err(err.into()),
606        };
607        Ok(info.into_current_committee()?)
608    }
609
610    /// Obtains the committee for the latest epoch on the admin chain.
611    #[instrument(level = "trace")]
612    pub async fn admin_committee(&self) -> Result<(Epoch, Committee), LocalNodeError> {
613        self.client.admin_committee().await
614    }
615
616    /// Obtains the identity of the current owner of the chain.
617    ///
618    /// Returns an error if we don't have the private key for the identity.
619    #[instrument(level = "trace")]
620    pub async fn identity(&self) -> Result<AccountOwner, Error> {
621        let Some(preferred_owner) = self.preferred_owner else {
622            return Err(Error::NoAccountKeyConfigured(self.chain_id));
623        };
624        let manager = self.chain_info().await?.manager;
625        ensure!(
626            manager.ownership.is_active(),
627            LocalNodeError::InactiveChain(self.chain_id)
628        );
629        let fallback_owners = if manager.ownership.has_fallback() {
630            self.local_committee()
631                .await?
632                .account_keys_and_weights()
633                .map(|(key, _)| AccountOwner::from(key))
634                .collect()
635        } else {
636            BTreeSet::new()
637        };
638
639        let is_owner = manager.ownership.is_owner(&preferred_owner)
640            || fallback_owners.contains(&preferred_owner);
641
642        if !is_owner {
643            warn!(
644                chain_id = %self.chain_id,
645                ownership = ?manager.ownership,
646                ?fallback_owners,
647                ?preferred_owner,
648                "The preferred owner is not configured as an owner of this chain",
649            );
650            return Err(Error::NotAnOwner(self.chain_id));
651        }
652
653        let has_signer = self
654            .signer()
655            .contains_key(&preferred_owner)
656            .await
657            .map_err(Error::signer_failure)?;
658
659        if !has_signer {
660            warn!(%self.chain_id, ?preferred_owner,
661                "Chain is one of the owners but its Signer instance doesn't contain the key",
662            );
663            return Err(Error::CannotFindKeyForChain(self.chain_id));
664        }
665
666        Ok(preferred_owner)
667    }
668
669    /// Prepares the chain for the next operation, i.e. makes sure we have synchronized it up to
670    /// its current height.
671    #[instrument(level = "trace")]
672    pub async fn prepare_chain(&self) -> Result<Box<ChainInfo>, Error> {
673        #[cfg(with_metrics)]
674        let _latency = super::metrics::PREPARE_CHAIN_LATENCY.measure_latency();
675
676        let mut info = self.synchronize_to_known_height().await?;
677
678        if self.preferred_owner.is_none_or(|owner| {
679            !info
680                .manager
681                .ownership
682                .is_super_owner_no_regular_owners(&owner)
683        }) {
684            // If we are not a super owner or there are regular owners, we could be missing recent
685            // certificates created by other clients. Further synchronize blocks from the network.
686            // This is a best-effort that depends on network conditions.
687            info = self.client.synchronize_chain_state(self.chain_id).await?;
688        }
689
690        if info.epoch > self.client.admin_committees().await?.0 {
691            self.client
692                .synchronize_chain_state(self.client.admin_id)
693                .await?;
694        }
695
696        self.client.update_from_info(&info);
697        Ok(info)
698    }
699
700    // Verifies that our local storage contains enough history compared to the
701    // known block height. Otherwise, downloads the missing history from the
702    // network.
703    // The known height only differs if the wallet is ahead of storage.
704    async fn synchronize_to_known_height(&self) -> Result<Box<ChainInfo>, Error> {
705        let info = self
706            .client
707            .download_certificates(self.chain_id, self.initial_next_block_height)
708            .await?;
709        if info.next_block_height == self.initial_next_block_height {
710            // Check that our local node has the expected block hash.
711            ensure!(
712                self.initial_block_hash == info.block_hash,
713                Error::InternalError("Invalid chain of blocks in local node")
714            );
715        }
716        Ok(info)
717    }
718
719    /// Attempts to update all validators about the local chain.
720    #[instrument(level = "trace", skip(old_committee, latest_certificate))]
721    pub async fn update_validators(
722        &self,
723        old_committee: Option<&Committee>,
724        latest_certificate: Option<ConfirmedBlockCertificate>,
725    ) -> Result<(), Error> {
726        let update_validators_start = linera_base::time::Instant::now();
727        // Communicate the new certificate now.
728        if let Some(old_committee) = old_committee {
729            self.communicate_chain_updates(old_committee, latest_certificate.clone())
730                .await?
731        };
732        if let Ok(new_committee) = self.local_committee().await {
733            if Some(&new_committee) != old_committee {
734                // If the configuration just changed, communicate to the new committee as well.
735                // (This is actually more important that updating the previous committee.)
736                self.communicate_chain_updates(&new_committee, latest_certificate)
737                    .await?;
738            }
739        }
740        self.send_timing(update_validators_start, TimingType::UpdateValidators);
741        Ok(())
742    }
743
744    /// Broadcasts certified blocks to validators.
745    #[instrument(level = "trace", skip(committee))]
746    pub async fn communicate_chain_updates(
747        &self,
748        committee: &Committee,
749        latest_certificate: Option<ConfirmedBlockCertificate>,
750    ) -> Result<(), Error> {
751        let delivery = self.options.cross_chain_message_delivery;
752        let height = self.chain_info().await?.next_block_height;
753        self.client
754            .communicate_chain_updates(
755                committee,
756                self.chain_id,
757                height,
758                delivery,
759                latest_certificate,
760            )
761            .await
762    }
763
764    /// Synchronizes all chains that any application on this chain subscribes to.
765    /// We always consider the admin chain a relevant publishing chain, for new epochs.
766    async fn synchronize_publisher_chains(&self) -> Result<(), Error> {
767        let subscriptions = self
768            .client
769            .local_node
770            .get_event_subscriptions(self.chain_id)
771            .await?;
772        let chain_ids = subscriptions
773            .iter()
774            .map(|((chain_id, _), _)| *chain_id)
775            .chain(iter::once(self.client.admin_id))
776            .filter(|chain_id| *chain_id != self.chain_id)
777            .collect::<BTreeSet<_>>();
778        stream::iter(
779            chain_ids
780                .into_iter()
781                .map(|chain_id| self.client.synchronize_chain_state(chain_id)),
782        )
783        .buffer_unordered(self.options.max_joined_tasks)
784        .collect::<Vec<_>>()
785        .await
786        .into_iter()
787        .collect::<Result<Vec<_>, _>>()?;
788        Ok(())
789    }
790
791    /// Attempts to download new received certificates.
792    ///
793    /// This is a best effort: it will only find certificates that have been confirmed
794    /// amongst sufficiently many validators of the current committee of the target
795    /// chain.
796    ///
797    /// However, this should be the case whenever a sender's chain is still in use and
798    /// is regularly upgraded to new committees.
799    #[instrument(level = "trace")]
800    pub async fn find_received_certificates(
801        &self,
802        cancellation_token: Option<CancellationToken>,
803    ) -> Result<(), Error> {
804        debug!(chain_id = %self.chain_id, "starting find_received_certificates");
805        #[cfg(with_metrics)]
806        let _latency = super::metrics::FIND_RECEIVED_CERTIFICATES_LATENCY.measure_latency();
807        // Use network information from the local chain.
808        let chain_id = self.chain_id;
809        let (_, committee) = self.admin_committee().await?;
810        let nodes = self.client.make_nodes(&committee)?;
811
812        let trackers = self
813            .client
814            .local_node
815            .get_received_certificate_trackers(chain_id)
816            .await?;
817
818        trace!("find_received_certificates: read trackers");
819
820        let received_log_batches = Arc::new(std::sync::Mutex::new(Vec::new()));
821        // Proceed to downloading received logs.
822        let result = communicate_with_quorum(
823            &nodes,
824            &committee,
825            |_| (),
826            |remote_node| {
827                let client = &self.client;
828                let tracker = trackers.get(&remote_node.public_key).copied().unwrap_or(0);
829                let received_log_batches = Arc::clone(&received_log_batches);
830                Box::pin(async move {
831                    let batch = client
832                        .get_received_log_from_validator(chain_id, &remote_node, tracker)
833                        .await?;
834                    let mut batches = received_log_batches.lock().unwrap();
835                    batches.push((remote_node.public_key, batch));
836                    Ok(())
837                })
838            },
839            self.options.quorum_grace_period,
840        )
841        .await;
842
843        if let Err(error) = result {
844            error!(
845                %error,
846                "Failed to synchronize received_logs from at least a quorum of validators",
847            );
848        }
849
850        let received_logs: Vec<_> = {
851            let mut received_log_batches = received_log_batches.lock().unwrap();
852            std::mem::take(received_log_batches.as_mut())
853        };
854
855        debug!(
856            received_logs_len = %received_logs.len(),
857            received_logs_total = %received_logs.iter().map(|x| x.1.len()).sum::<usize>(),
858            "collected received logs"
859        );
860
861        let (received_logs, mut validator_trackers) = {
862            (
863                ReceivedLogs::from_received_result(received_logs.clone()),
864                ValidatorTrackers::new(received_logs, &trackers),
865            )
866        };
867
868        debug!(
869            num_chains = %received_logs.num_chains(),
870            num_certs = %received_logs.num_certs(),
871            "find_received_certificates: total number of chains and certificates to sync",
872        );
873
874        let max_blocks_per_chain =
875            self.options.sender_certificate_download_batch_size / self.options.max_joined_tasks * 2;
876        for received_log in received_logs.into_batches(
877            self.options.sender_certificate_download_batch_size,
878            max_blocks_per_chain,
879        ) {
880            validator_trackers = self
881                .receive_sender_certificates(
882                    received_log,
883                    validator_trackers,
884                    &nodes,
885                    cancellation_token.clone(),
886                )
887                .await?;
888
889            self.update_received_certificate_trackers(&validator_trackers)
890                .await;
891        }
892
893        info!("find_received_certificates finished");
894
895        Ok(())
896    }
897
898    async fn update_received_certificate_trackers(&self, trackers: &ValidatorTrackers) {
899        let updated_trackers = trackers.to_map();
900        trace!(?updated_trackers, "updated tracker values");
901
902        // Update the trackers.
903        if let Err(error) = self
904            .client
905            .local_node
906            .update_received_certificate_trackers(self.chain_id, updated_trackers)
907            .await
908        {
909            error!(
910                chain_id = %self.chain_id,
911                %error,
912                "Failed to update the certificate trackers for chain",
913            );
914        }
915    }
916
917    /// Downloads and processes or preprocesses the certificates for blocks sending messages to
918    /// this chain that we are still missing.
919    async fn receive_sender_certificates(
920        &self,
921        mut received_logs: ReceivedLogs,
922        mut validator_trackers: ValidatorTrackers,
923        nodes: &[RemoteNode<Env::ValidatorNode>],
924        cancellation_token: Option<CancellationToken>,
925    ) -> Result<ValidatorTrackers, Error> {
926        debug!(
927            num_chains = %received_logs.num_chains(),
928            num_certs = %received_logs.num_certs(),
929            "receive_sender_certificates: number of chains and certificates to sync",
930        );
931
932        // Obtain the next block height we need in the local node, for each chain.
933        let local_next_heights = self
934            .client
935            .local_node
936            .next_outbox_heights(received_logs.chains(), self.chain_id)
937            .await?;
938
939        validator_trackers.filter_out_already_known(&mut received_logs, local_next_heights);
940
941        debug!(
942            remaining_total_certificates = %received_logs.num_certs(),
943            "receive_sender_certificates: computed remote_heights"
944        );
945
946        let mut other_sender_chains = Vec::new();
947        let (sender, mut receiver) = mpsc::unbounded_channel::<ChainAndHeight>();
948
949        let cert_futures = received_logs.heights_per_chain().into_iter().filter_map(
950            |(sender_chain_id, remote_heights)| {
951                if remote_heights.is_empty() {
952                    // Our highest, locally executed block is higher than any block height
953                    // from the current batch. Skip this batch, but remember to wait for
954                    // the messages to be delivered to the inboxes.
955                    other_sender_chains.push(sender_chain_id);
956                    return None;
957                };
958                let remote_heights = remote_heights.into_iter().collect::<Vec<_>>();
959                let sender = sender.clone();
960                let client = self.client.clone();
961                let mut nodes = nodes.to_vec();
962                nodes.shuffle(&mut rand::thread_rng());
963                let received_logs_ref = &received_logs;
964                Some(async move {
965                    client
966                        .download_and_process_sender_chain(
967                            sender_chain_id,
968                            &nodes,
969                            received_logs_ref,
970                            remote_heights,
971                            sender,
972                        )
973                        .await
974                })
975            },
976        );
977
978        let update_trackers = linera_base::task::spawn(async move {
979            while let Some(chain_and_height) = receiver.recv().await {
980                validator_trackers.downloaded_cert(chain_and_height);
981            }
982            validator_trackers
983        });
984
985        let mut cancellation_future = Box::pin(
986            async move {
987                if let Some(token) = cancellation_token {
988                    token.cancelled().await
989                } else {
990                    future::pending().await
991                }
992            }
993            .fuse(),
994        );
995
996        select! {
997            _ = stream::iter(cert_futures)
998            .buffer_unordered(self.options.max_joined_tasks)
999            .for_each(future::ready)
1000            => (),
1001            _ = cancellation_future => ()
1002        };
1003
1004        drop(sender);
1005
1006        let validator_trackers = update_trackers.await;
1007
1008        debug!(
1009            num_other_chains = %other_sender_chains.len(),
1010            "receive_sender_certificates: processing certificates finished"
1011        );
1012
1013        // Certificates for these chains were omitted from `certificates` because they were
1014        // already processed locally. If they were processed in a concurrent task, it is not
1015        // guaranteed that their cross-chain messages were already handled.
1016        self.retry_pending_cross_chain_requests(nodes, other_sender_chains)
1017            .await;
1018
1019        debug!("receive_sender_certificates: finished processing other_sender_chains");
1020
1021        Ok(validator_trackers)
1022    }
1023
1024    /// Retries cross chain requests on the chains which may have been processed on
1025    /// another task without the messages being correctly handled.
1026    async fn retry_pending_cross_chain_requests(
1027        &self,
1028        nodes: &[RemoteNode<Env::ValidatorNode>],
1029        other_sender_chains: Vec<ChainId>,
1030    ) {
1031        let stream = FuturesUnordered::from_iter(other_sender_chains.into_iter().map(|chain_id| {
1032            let local_node = self.client.local_node.clone();
1033            async move {
1034                if let Err(error) = match local_node
1035                    .retry_pending_cross_chain_requests(chain_id)
1036                    .await
1037                {
1038                    Ok(()) => Ok(()),
1039                    Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1040                        if let Err(error) = self
1041                            .client
1042                            .update_local_node_with_blobs_from(blob_ids.clone(), nodes)
1043                            .await
1044                        {
1045                            error!(
1046                                ?blob_ids,
1047                                %error,
1048                                "Error while attempting to download blobs during retrying outgoing \
1049                                messages"
1050                            );
1051                        }
1052                        local_node
1053                            .retry_pending_cross_chain_requests(chain_id)
1054                            .await
1055                    }
1056                    err => err,
1057                } {
1058                    error!(
1059                        %chain_id,
1060                        %error,
1061                        "Failed to retry outgoing messages from chain"
1062                    );
1063                }
1064            }
1065        }));
1066        stream.for_each(future::ready).await;
1067    }
1068
1069    /// Sends money.
1070    #[instrument(level = "trace")]
1071    pub async fn transfer(
1072        &self,
1073        owner: AccountOwner,
1074        amount: Amount,
1075        recipient: Account,
1076    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1077        // TODO(#467): check the balance of `owner` before signing any block proposal.
1078        self.execute_operation(SystemOperation::Transfer {
1079            owner,
1080            recipient,
1081            amount,
1082        })
1083        .await
1084    }
1085
1086    /// Verify if a data blob is readable from storage.
1087    // TODO(#2490): Consider removing or renaming this.
1088    #[instrument(level = "trace")]
1089    pub async fn read_data_blob(
1090        &self,
1091        hash: CryptoHash,
1092    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1093        let blob_id = BlobId {
1094            hash,
1095            blob_type: BlobType::Data,
1096        };
1097        self.execute_operation(SystemOperation::VerifyBlob { blob_id })
1098            .await
1099    }
1100
1101    /// Claims money in a remote chain.
1102    #[instrument(level = "trace")]
1103    pub async fn claim(
1104        &self,
1105        owner: AccountOwner,
1106        target_id: ChainId,
1107        recipient: Account,
1108        amount: Amount,
1109    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1110        self.execute_operation(SystemOperation::Claim {
1111            owner,
1112            target_id,
1113            recipient,
1114            amount,
1115        })
1116        .await
1117    }
1118
1119    /// Requests a leader timeout vote from all validators. If a quorum signs it, creates a
1120    /// certificate and sends it to all validators, to make them enter the next round.
1121    #[instrument(level = "trace")]
1122    pub async fn request_leader_timeout(&self) -> Result<TimeoutCertificate, Error> {
1123        let chain_id = self.chain_id;
1124        let info = self.chain_info_with_committees().await?;
1125        let committee = info.current_committee()?;
1126        let height = info.next_block_height;
1127        let round = info.manager.current_round;
1128        let action = CommunicateAction::RequestTimeout {
1129            height,
1130            round,
1131            chain_id,
1132        };
1133        let value = Timeout::new(chain_id, height, info.epoch);
1134        let certificate = Box::new(
1135            self.client
1136                .communicate_chain_action(committee, action, value)
1137                .await?,
1138        );
1139        self.client.process_certificate(certificate.clone()).await?;
1140        // The block height didn't increase, but this will communicate the timeout as well.
1141        self.client
1142            .communicate_chain_updates(
1143                committee,
1144                chain_id,
1145                height,
1146                CrossChainMessageDelivery::NonBlocking,
1147                None,
1148            )
1149            .await?;
1150        Ok(*certificate)
1151    }
1152
1153    /// Downloads and processes any certificates we are missing for the given chain.
1154    #[instrument(level = "trace", skip_all)]
1155    pub async fn synchronize_chain_state(
1156        &self,
1157        chain_id: ChainId,
1158    ) -> Result<Box<ChainInfo>, Error> {
1159        self.client.synchronize_chain_state(chain_id).await
1160    }
1161
1162    /// Downloads and processes any certificates we are missing for this chain, from the given
1163    /// committee.
1164    #[instrument(level = "trace", skip_all)]
1165    pub async fn synchronize_chain_state_from_committee(
1166        &self,
1167        committee: Committee,
1168    ) -> Result<Box<ChainInfo>, Error> {
1169        self.client
1170            .synchronize_chain_from_committee(self.chain_id, committee)
1171            .await
1172    }
1173
1174    /// Executes a list of operations.
1175    #[instrument(level = "trace", skip(operations, blobs))]
1176    pub async fn execute_operations(
1177        &self,
1178        operations: Vec<Operation>,
1179        blobs: Vec<Blob>,
1180    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1181        let timing_start = linera_base::time::Instant::now();
1182
1183        let result = loop {
1184            let execute_block_start = linera_base::time::Instant::now();
1185            // TODO(#2066): Remove boxing once the call-stack is shallower
1186            match Box::pin(self.execute_block(operations.clone(), blobs.clone())).await {
1187                Ok(ExecuteBlockOutcome::Executed(certificate)) => {
1188                    self.send_timing(execute_block_start, TimingType::ExecuteBlock);
1189                    break Ok(ClientOutcome::Committed(certificate));
1190                }
1191                Ok(ExecuteBlockOutcome::WaitForTimeout(timeout)) => {
1192                    break Ok(ClientOutcome::WaitForTimeout(timeout));
1193                }
1194                Ok(ExecuteBlockOutcome::Conflict(certificate)) => {
1195                    info!(
1196                        height = %certificate.block().header.height,
1197                        "Another block was committed; retrying."
1198                    );
1199                }
1200                Err(Error::CommunicationError(CommunicationError::Trusted(
1201                    NodeError::UnexpectedBlockHeight {
1202                        expected_block_height,
1203                        found_block_height,
1204                    },
1205                ))) if expected_block_height > found_block_height => {
1206                    tracing::info!(
1207                        "Local state is outdated; synchronizing chain {:.8}",
1208                        self.chain_id
1209                    );
1210                    self.synchronize_chain_state(self.chain_id).await?;
1211                }
1212                Err(err) => return Err(err),
1213            };
1214        };
1215
1216        self.send_timing(timing_start, TimingType::ExecuteOperations);
1217
1218        result
1219    }
1220
1221    /// Executes an operation.
1222    pub async fn execute_operation(
1223        &self,
1224        operation: impl Into<Operation>,
1225    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1226        self.execute_operations(vec![operation.into()], vec![])
1227            .await
1228    }
1229
1230    /// Executes a new block.
1231    ///
1232    /// This must be preceded by a call to `prepare_chain()`.
1233    #[instrument(level = "trace", skip(operations, blobs))]
1234    async fn execute_block(
1235        &self,
1236        operations: Vec<Operation>,
1237        blobs: Vec<Blob>,
1238    ) -> Result<ExecuteBlockOutcome, Error> {
1239        #[cfg(with_metrics)]
1240        let _latency = super::metrics::EXECUTE_BLOCK_LATENCY.measure_latency();
1241
1242        let mutex = self.client_mutex();
1243        let _guard = mutex.lock_owned().await;
1244        // TODO(#5092): We shouldn't need to call this explicitly.
1245        match self.process_pending_block_without_prepare().await? {
1246            ClientOutcome::Committed(Some(certificate)) => {
1247                return Ok(ExecuteBlockOutcome::Conflict(certificate))
1248            }
1249            ClientOutcome::WaitForTimeout(timeout) => {
1250                return Ok(ExecuteBlockOutcome::WaitForTimeout(timeout))
1251            }
1252            ClientOutcome::Committed(None) => {}
1253        }
1254
1255        // Collect pending messages and epoch changes after acquiring the lock to avoid
1256        // race conditions where messages valid for one block height are proposed at a
1257        // different height.
1258        let transactions = self.prepend_epochs_messages_and_events(operations).await?;
1259
1260        if transactions.is_empty() {
1261            return Err(Error::LocalNodeError(LocalNodeError::WorkerError(
1262                WorkerError::ChainError(Box::new(ChainError::EmptyBlock)),
1263            )));
1264        }
1265
1266        let block = self.new_pending_block(transactions, blobs).await?;
1267
1268        match self.process_pending_block_without_prepare().await? {
1269            ClientOutcome::Committed(Some(certificate)) if certificate.block() == &block => {
1270                Ok(ExecuteBlockOutcome::Executed(certificate))
1271            }
1272            ClientOutcome::Committed(Some(certificate)) => {
1273                Ok(ExecuteBlockOutcome::Conflict(certificate))
1274            }
1275            // Should be unreachable: We did set a pending block.
1276            ClientOutcome::Committed(None) => {
1277                Err(Error::BlockProposalError("Unexpected block proposal error"))
1278            }
1279            ClientOutcome::WaitForTimeout(timeout) => {
1280                Ok(ExecuteBlockOutcome::WaitForTimeout(timeout))
1281            }
1282        }
1283    }
1284
1285    /// Creates a vector of transactions which, in addition to the provided operations,
1286    /// also contains epoch changes, receiving message bundles and event stream updates
1287    /// (if there are any to be processed).
1288    /// This should be called when executing a block, in order to make sure that any pending
1289    /// messages or events are included in it.
1290    #[instrument(level = "trace", skip(operations))]
1291    async fn prepend_epochs_messages_and_events(
1292        &self,
1293        operations: Vec<Operation>,
1294    ) -> Result<Vec<Transaction>, Error> {
1295        let incoming_bundles = self.pending_message_bundles().await?;
1296        let stream_updates = self.collect_stream_updates().await?;
1297        Ok(self
1298            .collect_epoch_changes()
1299            .await?
1300            .into_iter()
1301            .map(Transaction::ExecuteOperation)
1302            .chain(
1303                incoming_bundles
1304                    .into_iter()
1305                    .map(Transaction::ReceiveMessages),
1306            )
1307            .chain(
1308                stream_updates
1309                    .into_iter()
1310                    .map(Transaction::ExecuteOperation),
1311            )
1312            .chain(operations.into_iter().map(Transaction::ExecuteOperation))
1313            .collect::<Vec<_>>())
1314    }
1315
1316    /// Creates a new pending block and handles the proposal in the local node.
1317    /// Next time `process_pending_block_without_prepare` is called, this block will be proposed
1318    /// to the validators.
1319    #[instrument(level = "trace", skip(transactions, blobs))]
1320    async fn new_pending_block(
1321        &self,
1322        transactions: Vec<Transaction>,
1323        blobs: Vec<Blob>,
1324    ) -> Result<Block, Error> {
1325        let identity = self.identity().await?;
1326
1327        ensure!(
1328            self.pending_proposal().is_none(),
1329            Error::BlockProposalError(
1330                "Client state already has a pending block; \
1331                use the `linera retry-pending-block` command to commit that first"
1332            )
1333        );
1334        let info = self.chain_info_with_committees().await?;
1335        let timestamp = self.next_timestamp(&transactions, info.timestamp);
1336        let proposed_block = ProposedBlock {
1337            epoch: info.epoch,
1338            chain_id: self.chain_id,
1339            transactions,
1340            previous_block_hash: info.block_hash,
1341            height: info.next_block_height,
1342            authenticated_owner: Some(identity),
1343            timestamp,
1344        };
1345
1346        let round = self.round_for_oracle(&info, &identity).await?;
1347        // Make sure every incoming message succeeds and otherwise remove them.
1348        // Also, compute the final certified hash while we're at it.
1349        let (block, _) = self
1350            .client
1351            .stage_block_execution_and_discard_failing_messages(
1352                proposed_block,
1353                round,
1354                blobs.clone(),
1355            )
1356            .await?;
1357        let (proposed_block, _) = block.clone().into_proposal();
1358        self.update_state(|state| {
1359            state.set_pending_proposal(proposed_block.clone(), blobs.clone())
1360        });
1361        Ok(block)
1362    }
1363
1364    /// Returns a suitable timestamp for the next block.
1365    ///
1366    /// This will usually be the current time according to the local clock, but may be slightly
1367    /// ahead to make sure it's not earlier than the incoming messages or the previous block.
1368    #[instrument(level = "trace", skip(transactions))]
1369    fn next_timestamp(&self, transactions: &[Transaction], block_time: Timestamp) -> Timestamp {
1370        let local_time = self.storage_client().clock().current_time();
1371        transactions
1372            .iter()
1373            .filter_map(Transaction::incoming_bundle)
1374            .map(|msg| msg.bundle.timestamp)
1375            .max()
1376            .map_or(local_time, |timestamp| timestamp.max(local_time))
1377            .max(block_time)
1378    }
1379
1380    /// Queries an application.
1381    #[instrument(level = "trace", skip(query))]
1382    pub async fn query_application(
1383        &self,
1384        query: Query,
1385        block_hash: Option<CryptoHash>,
1386    ) -> Result<QueryOutcome, Error> {
1387        loop {
1388            let result = self
1389                .client
1390                .local_node
1391                .query_application(self.chain_id, query.clone(), block_hash)
1392                .await;
1393            if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1394                let validators = self.client.validator_nodes().await?;
1395                self.client
1396                    .update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1397                    .await?;
1398                continue; // We found the missing blob: retry.
1399            }
1400            return Ok(result?);
1401        }
1402    }
1403
1404    /// Queries a system application.
1405    #[instrument(level = "trace", skip(query))]
1406    pub async fn query_system_application(
1407        &self,
1408        query: SystemQuery,
1409    ) -> Result<QueryOutcome<SystemResponse>, Error> {
1410        let QueryOutcome {
1411            response,
1412            operations,
1413        } = self.query_application(Query::System(query), None).await?;
1414        match response {
1415            QueryResponse::System(response) => Ok(QueryOutcome {
1416                response,
1417                operations,
1418            }),
1419            _ => Err(Error::InternalError("Unexpected response for system query")),
1420        }
1421    }
1422
1423    /// Queries a user application.
1424    #[instrument(level = "trace", skip(application_id, query))]
1425    #[cfg(with_testing)]
1426    pub async fn query_user_application<A: Abi>(
1427        &self,
1428        application_id: ApplicationId<A>,
1429        query: &A::Query,
1430    ) -> Result<QueryOutcome<A::QueryResponse>, Error> {
1431        let query = Query::user(application_id, query)?;
1432        let QueryOutcome {
1433            response,
1434            operations,
1435        } = self.query_application(query, None).await?;
1436        match response {
1437            QueryResponse::User(response_bytes) => {
1438                let response = serde_json::from_slice(&response_bytes)?;
1439                Ok(QueryOutcome {
1440                    response,
1441                    operations,
1442                })
1443            }
1444            _ => Err(Error::InternalError("Unexpected response for user query")),
1445        }
1446    }
1447
1448    /// Obtains the local balance of the chain account after staging the execution of
1449    /// incoming messages in a new block.
1450    ///
1451    /// Does not attempt to synchronize with validators. The result will reflect up to
1452    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
1453    /// block.
1454    #[instrument(level = "trace")]
1455    pub async fn query_balance(&self) -> Result<Amount, Error> {
1456        let (balance, _) = self.query_balances_with_owner(AccountOwner::CHAIN).await?;
1457        Ok(balance)
1458    }
1459
1460    /// Obtains the local balance of an account after staging the execution of incoming messages in
1461    /// a new block.
1462    ///
1463    /// Does not attempt to synchronize with validators. The result will reflect up to
1464    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
1465    /// block.
1466    #[instrument(level = "trace", skip(owner))]
1467    pub async fn query_owner_balance(&self, owner: AccountOwner) -> Result<Amount, Error> {
1468        if owner.is_chain() {
1469            self.query_balance().await
1470        } else {
1471            Ok(self
1472                .query_balances_with_owner(owner)
1473                .await?
1474                .1
1475                .unwrap_or(Amount::ZERO))
1476        }
1477    }
1478
1479    /// Obtains the local balance of an account and optionally another user after staging the
1480    /// execution of incoming messages in a new block.
1481    ///
1482    /// Does not attempt to synchronize with validators. The result will reflect up to
1483    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
1484    /// block.
1485    #[instrument(level = "trace", skip(owner))]
1486    pub(crate) async fn query_balances_with_owner(
1487        &self,
1488        owner: AccountOwner,
1489    ) -> Result<(Amount, Option<Amount>), Error> {
1490        let incoming_bundles = self.pending_message_bundles().await?;
1491        // Since we disallow empty blocks, and there is no incoming messages,
1492        // that could change it, we query for the balance immediately.
1493        if incoming_bundles.is_empty() {
1494            let chain_balance = self.local_balance().await?;
1495            let owner_balance = self.local_owner_balance(owner).await?;
1496            return Ok((chain_balance, Some(owner_balance)));
1497        }
1498        let info = self.chain_info().await?;
1499        let transactions = incoming_bundles
1500            .into_iter()
1501            .map(Transaction::ReceiveMessages)
1502            .collect::<Vec<_>>();
1503        let timestamp = self.next_timestamp(&transactions, info.timestamp);
1504        let block = ProposedBlock {
1505            epoch: info.epoch,
1506            chain_id: self.chain_id,
1507            transactions,
1508            previous_block_hash: info.block_hash,
1509            height: info.next_block_height,
1510            authenticated_owner: if owner == AccountOwner::CHAIN {
1511                None
1512            } else {
1513                Some(owner)
1514            },
1515            timestamp,
1516        };
1517        match self
1518            .client
1519            .stage_block_execution_and_discard_failing_messages(block, None, Vec::new())
1520            .await
1521        {
1522            Ok((_, response)) => Ok((
1523                response.info.chain_balance,
1524                response.info.requested_owner_balance,
1525            )),
1526            Err(Error::LocalNodeError(LocalNodeError::WorkerError(WorkerError::ChainError(
1527                error,
1528            )))) if matches!(
1529                &*error,
1530                ChainError::ExecutionError(
1531                    execution_error,
1532                    ChainExecutionContext::Block
1533                ) if matches!(
1534                    **execution_error,
1535                    ExecutionError::FeesExceedFunding { .. }
1536                )
1537            ) =>
1538            {
1539                // We can't even pay for the execution of one empty block. Let's return zero.
1540                Ok((Amount::ZERO, Some(Amount::ZERO)))
1541            }
1542            Err(error) => Err(error),
1543        }
1544    }
1545
1546    /// Reads the local balance of the chain account.
1547    ///
1548    /// Does not process the inbox or attempt to synchronize with validators.
1549    #[instrument(level = "trace")]
1550    pub async fn local_balance(&self) -> Result<Amount, Error> {
1551        let (balance, _) = self.local_balances_with_owner(AccountOwner::CHAIN).await?;
1552        Ok(balance)
1553    }
1554
1555    /// Reads the local balance of a user account.
1556    ///
1557    /// Does not process the inbox or attempt to synchronize with validators.
1558    #[instrument(level = "trace", skip(owner))]
1559    pub async fn local_owner_balance(&self, owner: AccountOwner) -> Result<Amount, Error> {
1560        if owner.is_chain() {
1561            self.local_balance().await
1562        } else {
1563            Ok(self
1564                .local_balances_with_owner(owner)
1565                .await?
1566                .1
1567                .unwrap_or(Amount::ZERO))
1568        }
1569    }
1570
1571    /// Reads the local balance of the chain account and optionally another user.
1572    ///
1573    /// Does not process the inbox or attempt to synchronize with validators.
1574    #[instrument(level = "trace", skip(owner))]
1575    pub(crate) async fn local_balances_with_owner(
1576        &self,
1577        owner: AccountOwner,
1578    ) -> Result<(Amount, Option<Amount>), Error> {
1579        ensure!(
1580            self.chain_info().await?.next_block_height >= self.initial_next_block_height,
1581            Error::WalletSynchronizationError
1582        );
1583        let mut query = ChainInfoQuery::new(self.chain_id);
1584        query.request_owner_balance = owner;
1585        let response = self
1586            .client
1587            .local_node
1588            .handle_chain_info_query(query)
1589            .await?;
1590        Ok((
1591            response.info.chain_balance,
1592            response.info.requested_owner_balance,
1593        ))
1594    }
1595
1596    /// Sends tokens to a chain.
1597    #[instrument(level = "trace")]
1598    pub async fn transfer_to_account(
1599        &self,
1600        from: AccountOwner,
1601        amount: Amount,
1602        account: Account,
1603    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1604        self.transfer(from, amount, account).await
1605    }
1606
1607    /// Burns tokens (transfer to a special address).
1608    #[cfg(with_testing)]
1609    #[instrument(level = "trace")]
1610    pub async fn burn(
1611        &self,
1612        owner: AccountOwner,
1613        amount: Amount,
1614    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1615        let recipient = Account::burn_address(self.chain_id);
1616        self.transfer(owner, amount, recipient).await
1617    }
1618
1619    #[instrument(level = "trace")]
1620    pub async fn fetch_chain_info(&self) -> Result<Box<ChainInfo>, Error> {
1621        let validators = self.client.validator_nodes().await?;
1622        self.client
1623            .fetch_chain_info(self.chain_id, &validators)
1624            .await
1625    }
1626
1627    /// Attempts to synchronize chains that have sent us messages and populate our local
1628    /// inbox.
1629    ///
1630    /// To create a block that actually executes the messages in the inbox,
1631    /// `process_inbox` must be called separately.
1632    ///
1633    /// If the chain is in follow-only mode, this only downloads blocks for this chain without
1634    /// fetching manager values or sender/publisher chains.
1635    #[instrument(level = "trace")]
1636    pub async fn synchronize_from_validators(&self) -> Result<Box<ChainInfo>, Error> {
1637        if self.is_follow_only() {
1638            return self.client.synchronize_chain_state(self.chain_id).await;
1639        }
1640        let info = self.prepare_chain().await?;
1641        self.synchronize_publisher_chains().await?;
1642        self.find_received_certificates(None).await?;
1643        Ok(info)
1644    }
1645
1646    /// Processes the last pending block
1647    #[instrument(level = "trace")]
1648    pub async fn process_pending_block(
1649        &self,
1650    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1651        self.prepare_chain().await?;
1652        self.process_pending_block_without_prepare().await
1653    }
1654
1655    /// Processes the last pending block. Assumes that the local chain is up to date.
1656    #[instrument(level = "trace")]
1657    async fn process_pending_block_without_prepare(
1658        &self,
1659    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1660        let info = self.request_leader_timeout_if_needed().await?;
1661
1662        // If there is a validated block in the current round, finalize it.
1663        if info.manager.has_locking_block_in_current_round()
1664            && !info.manager.current_round.is_fast()
1665        {
1666            return self.finalize_locking_block(info).await;
1667        }
1668        let owner = self.identity().await?;
1669
1670        let local_node = &self.client.local_node;
1671        // Otherwise we have to re-propose the highest validated block, if there is one.
1672        let pending_proposal = self.pending_proposal();
1673        let (block, blobs) = if let Some(locking) = &info.manager.requested_locking {
1674            match &**locking {
1675                LockingBlock::Regular(certificate) => {
1676                    let blob_ids = certificate.block().required_blob_ids();
1677                    let blobs = local_node
1678                        .get_locking_blobs(&blob_ids, self.chain_id)
1679                        .await?
1680                        .ok_or_else(|| Error::InternalError("Missing local locking blobs"))?;
1681                    debug!("Retrying locking block from round {}", certificate.round);
1682                    (certificate.block().clone(), blobs)
1683                }
1684                LockingBlock::Fast(proposal) => {
1685                    let proposed_block = proposal.content.block.clone();
1686                    let blob_ids = proposed_block.published_blob_ids();
1687                    let blobs = local_node
1688                        .get_locking_blobs(&blob_ids, self.chain_id)
1689                        .await?
1690                        .ok_or_else(|| Error::InternalError("Missing local locking blobs"))?;
1691                    let block = self
1692                        .client
1693                        .stage_block_execution(proposed_block, None, blobs.clone())
1694                        .await?
1695                        .0;
1696                    debug!("Retrying locking block from fast round.");
1697                    (block, blobs)
1698                }
1699            }
1700        } else if let Some(pending_proposal) = pending_proposal {
1701            // Otherwise we are free to propose our own pending block.
1702            let proposed_block = pending_proposal.block;
1703            let round = self.round_for_oracle(&info, &owner).await?;
1704            let (block, _) = self
1705                .client
1706                .stage_block_execution(proposed_block, round, pending_proposal.blobs.clone())
1707                .await?;
1708            debug!("Proposing the local pending block.");
1709            (block, pending_proposal.blobs)
1710        } else {
1711            return Ok(ClientOutcome::Committed(None)); // Nothing to do.
1712        };
1713
1714        let has_oracle_responses = block.has_oracle_responses();
1715        let (proposed_block, outcome) = block.into_proposal();
1716        let round = match self
1717            .round_for_new_proposal(&info, &owner, has_oracle_responses)
1718            .await?
1719        {
1720            Either::Left(round) => round,
1721            Either::Right(timeout) => return Ok(ClientOutcome::WaitForTimeout(timeout)),
1722        };
1723        debug!("Proposing block for round {}", round);
1724
1725        let already_handled_locally = info
1726            .manager
1727            .already_handled_proposal(round, &proposed_block);
1728        // Create the final block proposal.
1729        let proposal = if let Some(locking) = info.manager.requested_locking {
1730            Box::new(match *locking {
1731                LockingBlock::Regular(cert) => {
1732                    BlockProposal::new_retry_regular(owner, round, cert, self.signer())
1733                        .await
1734                        .map_err(Error::signer_failure)?
1735                }
1736                LockingBlock::Fast(proposal) => {
1737                    BlockProposal::new_retry_fast(owner, round, proposal, self.signer())
1738                        .await
1739                        .map_err(Error::signer_failure)?
1740                }
1741            })
1742        } else {
1743            Box::new(
1744                BlockProposal::new_initial(owner, round, proposed_block.clone(), self.signer())
1745                    .await
1746                    .map_err(Error::signer_failure)?,
1747            )
1748        };
1749        if !already_handled_locally {
1750            // Check the final block proposal. This will be cheaper after #1401.
1751            if let Err(err) = local_node.handle_block_proposal(*proposal.clone()).await {
1752                match err {
1753                    LocalNodeError::BlobsNotFound(_) => {
1754                        local_node
1755                            .handle_pending_blobs(self.chain_id, blobs)
1756                            .await?;
1757                        local_node.handle_block_proposal(*proposal.clone()).await?;
1758                    }
1759                    err => return Err(err.into()),
1760                }
1761            }
1762        }
1763        let committee = self.local_committee().await?;
1764        let block = Block::new(proposed_block, outcome);
1765        // Send the query to validators.
1766        let submit_block_proposal_start = linera_base::time::Instant::now();
1767        let certificate = if round.is_fast() {
1768            let hashed_value = ConfirmedBlock::new(block);
1769            self.client
1770                .submit_block_proposal(&committee, proposal, hashed_value)
1771                .await?
1772        } else {
1773            let hashed_value = ValidatedBlock::new(block);
1774            let certificate = self
1775                .client
1776                .submit_block_proposal(&committee, proposal, hashed_value.clone())
1777                .await?;
1778            self.client.finalize_block(&committee, certificate).await?
1779        };
1780        self.send_timing(submit_block_proposal_start, TimingType::SubmitBlockProposal);
1781        debug!(round = %certificate.round, "Sending confirmed block to validators");
1782        self.update_validators(Some(&committee), Some(certificate.clone()))
1783            .await?;
1784        Ok(ClientOutcome::Committed(Some(certificate)))
1785    }
1786
1787    fn send_timing(&self, start: Instant, timing_type: TimingType) {
1788        let Some(sender) = &self.timing_sender else {
1789            return;
1790        };
1791        if let Err(err) = sender.send((start.elapsed().as_millis() as u64, timing_type)) {
1792            tracing::warn!(%err, "Failed to send timing info");
1793        }
1794    }
1795
1796    /// Requests a leader timeout certificate if the current round has timed out. Returns the
1797    /// chain info for the (possibly new) current round.
1798    async fn request_leader_timeout_if_needed(&self) -> Result<Box<ChainInfo>, Error> {
1799        let mut info = self.chain_info_with_manager_values().await?;
1800        // If the current round has timed out, we request a timeout certificate and retry in
1801        // the next round.
1802        if let Some(round_timeout) = info.manager.round_timeout {
1803            if round_timeout <= self.storage_client().clock().current_time() {
1804                if let Err(e) = self.request_leader_timeout().await {
1805                    info!("Failed to obtain a timeout certificate: {}", e);
1806                } else {
1807                    info = self.chain_info_with_manager_values().await?;
1808                }
1809            }
1810        }
1811        Ok(info)
1812    }
1813
1814    /// Finalizes the locking block.
1815    ///
1816    /// Panics if there is no locking block; fails if the locking block is not in the current round.
1817    async fn finalize_locking_block(
1818        &self,
1819        info: Box<ChainInfo>,
1820    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1821        let locking = info
1822            .manager
1823            .requested_locking
1824            .expect("Should have a locking block");
1825        let LockingBlock::Regular(certificate) = *locking else {
1826            panic!("Should have a locking validated block");
1827        };
1828        debug!(
1829            round = %certificate.round,
1830            "Finalizing locking block"
1831        );
1832        let committee = self.local_committee().await?;
1833        let certificate = self
1834            .client
1835            .finalize_block(&committee, certificate.clone())
1836            .await?;
1837        self.update_validators(Some(&committee), Some(certificate.clone()))
1838            .await?;
1839        Ok(ClientOutcome::Committed(Some(certificate)))
1840    }
1841
1842    /// Returns the number for the round number oracle to use when staging a block proposal.
1843    async fn round_for_oracle(
1844        &self,
1845        info: &ChainInfo,
1846        identity: &AccountOwner,
1847    ) -> Result<Option<u32>, Error> {
1848        // Pretend we do use oracles: If we don't, the round number is never read anyway.
1849        match self.round_for_new_proposal(info, identity, true).await {
1850            // If it is a multi-leader round, use its number for the oracle.
1851            Ok(Either::Left(round)) => Ok(round.multi_leader()),
1852            // If there is no suitable round with oracles, use None: If it works without oracles,
1853            // the block won't read the value. If it returns a timeout, it will be a single-leader
1854            // round, in which the oracle returns None.
1855            Err(Error::BlockProposalError(_)) | Ok(Either::Right(_)) => Ok(None),
1856            Err(err) => Err(err),
1857        }
1858    }
1859
1860    /// Returns a round in which we can propose a new block or the given one, if possible.
1861    async fn round_for_new_proposal(
1862        &self,
1863        info: &ChainInfo,
1864        identity: &AccountOwner,
1865        has_oracle_responses: bool,
1866    ) -> Result<Either<Round, RoundTimeout>, Error> {
1867        let manager = &info.manager;
1868        let seed = manager.seed;
1869        // If there is a conflicting proposal in the current round, we can only propose if the
1870        // next round can be started without a timeout, i.e. if we are in a multi-leader round.
1871        // Similarly, we cannot propose a block that uses oracles in the fast round.
1872        let conflict = manager
1873            .requested_signed_proposal
1874            .as_ref()
1875            .into_iter()
1876            .chain(&manager.requested_proposed)
1877            .any(|proposal| proposal.content.round == manager.current_round)
1878            || (manager.current_round.is_fast() && has_oracle_responses);
1879        let round = if !conflict {
1880            manager.current_round
1881        } else if let Some(round) = manager
1882            .ownership
1883            .next_round(manager.current_round)
1884            .filter(|_| manager.current_round.is_multi_leader() || manager.current_round.is_fast())
1885        {
1886            round
1887        } else if let Some(timeout) = info.round_timeout() {
1888            return Ok(Either::Right(timeout));
1889        } else {
1890            return Err(Error::BlockProposalError(
1891                "Conflicting proposal in the current round",
1892            ));
1893        };
1894        let current_committee = info
1895            .current_committee()?
1896            .validators
1897            .values()
1898            .map(|v| (AccountOwner::from(v.account_public_key), v.votes))
1899            .collect();
1900        if manager.should_propose(identity, round, seed, &current_committee) {
1901            return Ok(Either::Left(round));
1902        }
1903        if let Some(timeout) = info.round_timeout() {
1904            return Ok(Either::Right(timeout));
1905        }
1906        Err(Error::BlockProposalError(
1907            "Not a leader in the current round",
1908        ))
1909    }
1910
1911    /// Clears the information on any operation that previously failed.
1912    #[cfg(with_testing)]
1913    #[instrument(level = "trace")]
1914    pub fn clear_pending_proposal(&self) {
1915        self.update_state(|state| state.clear_pending_proposal());
1916    }
1917
1918    /// Rotates the key of the chain.
1919    ///
1920    /// Replaces current owners of the chain with the new key pair.
1921    #[instrument(level = "trace")]
1922    pub async fn rotate_key_pair(
1923        &self,
1924        public_key: AccountPublicKey,
1925    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1926        self.transfer_ownership(public_key.into()).await
1927    }
1928
1929    /// Transfers ownership of the chain to a single super owner.
1930    #[instrument(level = "trace")]
1931    pub async fn transfer_ownership(
1932        &self,
1933        new_owner: AccountOwner,
1934    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1935        self.execute_operation(SystemOperation::ChangeOwnership {
1936            super_owners: vec![new_owner],
1937            owners: Vec::new(),
1938            first_leader: None,
1939            multi_leader_rounds: 2,
1940            open_multi_leader_rounds: false,
1941            timeout_config: TimeoutConfig::default(),
1942        })
1943        .await
1944    }
1945
1946    /// Adds another owner to the chain, and turns existing super owners into regular owners.
1947    #[instrument(level = "trace")]
1948    pub async fn share_ownership(
1949        &self,
1950        new_owner: AccountOwner,
1951        new_weight: u64,
1952    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1953        loop {
1954            let ownership = self.prepare_chain().await?.manager.ownership;
1955            ensure!(
1956                ownership.is_active(),
1957                ChainError::InactiveChain(self.chain_id)
1958            );
1959            let mut owners = ownership.owners.into_iter().collect::<Vec<_>>();
1960            owners.extend(ownership.super_owners.into_iter().zip(iter::repeat(100)));
1961            owners.push((new_owner, new_weight));
1962            let operations = vec![Operation::system(SystemOperation::ChangeOwnership {
1963                super_owners: Vec::new(),
1964                owners,
1965                first_leader: ownership.first_leader,
1966                multi_leader_rounds: ownership.multi_leader_rounds,
1967                open_multi_leader_rounds: ownership.open_multi_leader_rounds,
1968                timeout_config: ownership.timeout_config,
1969            })];
1970            match self.execute_block(operations, vec![]).await? {
1971                ExecuteBlockOutcome::Executed(certificate) => {
1972                    return Ok(ClientOutcome::Committed(certificate));
1973                }
1974                ExecuteBlockOutcome::Conflict(certificate) => {
1975                    info!(
1976                        height = %certificate.block().header.height,
1977                        "Another block was committed; retrying."
1978                    );
1979                }
1980                ExecuteBlockOutcome::WaitForTimeout(timeout) => {
1981                    return Ok(ClientOutcome::WaitForTimeout(timeout));
1982                }
1983            };
1984        }
1985    }
1986
1987    /// Changes the ownership of this chain. Fails if it would remove existing owners, unless
1988    /// `remove_owners` is `true`.
1989    #[instrument(level = "trace")]
1990    pub async fn change_ownership(
1991        &self,
1992        ownership: ChainOwnership,
1993    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1994        self.execute_operation(SystemOperation::ChangeOwnership {
1995            super_owners: ownership.super_owners.into_iter().collect(),
1996            owners: ownership.owners.into_iter().collect(),
1997            first_leader: ownership.first_leader,
1998            multi_leader_rounds: ownership.multi_leader_rounds,
1999            open_multi_leader_rounds: ownership.open_multi_leader_rounds,
2000            timeout_config: ownership.timeout_config.clone(),
2001        })
2002        .await
2003    }
2004
2005    /// Changes the application permissions configuration on this chain.
2006    #[instrument(level = "trace", skip(application_permissions))]
2007    pub async fn change_application_permissions(
2008        &self,
2009        application_permissions: ApplicationPermissions,
2010    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2011        self.execute_operation(SystemOperation::ChangeApplicationPermissions(
2012            application_permissions,
2013        ))
2014        .await
2015    }
2016
2017    /// Opens a new chain with a derived UID.
2018    #[instrument(level = "trace", skip(self))]
2019    pub async fn open_chain(
2020        &self,
2021        ownership: ChainOwnership,
2022        application_permissions: ApplicationPermissions,
2023        balance: Amount,
2024    ) -> Result<ClientOutcome<(ChainDescription, ConfirmedBlockCertificate)>, Error> {
2025        loop {
2026            let config = OpenChainConfig {
2027                ownership: ownership.clone(),
2028                balance,
2029                application_permissions: application_permissions.clone(),
2030            };
2031            let operation = Operation::system(SystemOperation::OpenChain(config));
2032            let certificate = match self.execute_block(vec![operation], vec![]).await? {
2033                ExecuteBlockOutcome::Executed(certificate) => certificate,
2034                ExecuteBlockOutcome::Conflict(_) => continue,
2035                ExecuteBlockOutcome::WaitForTimeout(timeout) => {
2036                    return Ok(ClientOutcome::WaitForTimeout(timeout));
2037                }
2038            };
2039            // The only operation, i.e. the last transaction, created the new chain.
2040            let chain_blob = certificate
2041                .block()
2042                .body
2043                .blobs
2044                .last()
2045                .and_then(|blobs| blobs.last())
2046                .ok_or_else(|| Error::InternalError("Failed to create a new chain"))?;
2047            let description = bcs::from_bytes::<ChainDescription>(chain_blob.bytes())?;
2048            // Add the new chain to the list of tracked chains
2049            self.client.track_chain(description.id());
2050            self.client
2051                .local_node
2052                .retry_pending_cross_chain_requests(self.chain_id)
2053                .await?;
2054            return Ok(ClientOutcome::Committed((description, certificate)));
2055        }
2056    }
2057
2058    /// Closes the chain (and loses everything in it!!).
2059    /// Returns `None` if the chain was already closed.
2060    #[instrument(level = "trace")]
2061    pub async fn close_chain(
2062        &self,
2063    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
2064        match self.execute_operation(SystemOperation::CloseChain).await {
2065            Ok(outcome) => Ok(outcome.map(Some)),
2066            Err(Error::LocalNodeError(LocalNodeError::WorkerError(WorkerError::ChainError(
2067                chain_error,
2068            )))) if matches!(*chain_error, ChainError::ClosedChain) => {
2069                Ok(ClientOutcome::Committed(None)) // Chain is already closed.
2070            }
2071            Err(error) => Err(error),
2072        }
2073    }
2074
2075    /// Publishes some module.
2076    #[cfg(not(target_arch = "wasm32"))]
2077    #[instrument(level = "trace", skip(contract, service))]
2078    pub async fn publish_module(
2079        &self,
2080        contract: Bytecode,
2081        service: Bytecode,
2082        vm_runtime: VmRuntime,
2083    ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, Error> {
2084        let (blobs, module_id) = super::create_bytecode_blobs(contract, service, vm_runtime).await;
2085        self.publish_module_blobs(blobs, module_id).await
2086    }
2087
2088    /// Publishes some module.
2089    #[cfg(not(target_arch = "wasm32"))]
2090    #[instrument(level = "trace", skip(blobs, module_id))]
2091    pub async fn publish_module_blobs(
2092        &self,
2093        blobs: Vec<Blob>,
2094        module_id: ModuleId,
2095    ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, Error> {
2096        self.execute_operations(
2097            vec![Operation::system(SystemOperation::PublishModule {
2098                module_id,
2099            })],
2100            blobs,
2101        )
2102        .await?
2103        .try_map(|certificate| Ok((module_id, certificate)))
2104    }
2105
2106    /// Publishes some data blobs.
2107    #[instrument(level = "trace", skip(bytes))]
2108    pub async fn publish_data_blobs(
2109        &self,
2110        bytes: Vec<Vec<u8>>,
2111    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2112        let blobs = bytes.into_iter().map(Blob::new_data);
2113        let publish_blob_operations = blobs
2114            .clone()
2115            .map(|blob| {
2116                Operation::system(SystemOperation::PublishDataBlob {
2117                    blob_hash: blob.id().hash,
2118                })
2119            })
2120            .collect();
2121        self.execute_operations(publish_blob_operations, blobs.collect())
2122            .await
2123    }
2124
2125    /// Publishes some data blob.
2126    #[instrument(level = "trace", skip(bytes))]
2127    pub async fn publish_data_blob(
2128        &self,
2129        bytes: Vec<u8>,
2130    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2131        self.publish_data_blobs(vec![bytes]).await
2132    }
2133
2134    /// Creates an application by instantiating some bytecode.
2135    #[instrument(
2136        level = "trace",
2137        skip(self, parameters, instantiation_argument, required_application_ids)
2138    )]
2139    pub async fn create_application<
2140        A: Abi,
2141        Parameters: Serialize,
2142        InstantiationArgument: Serialize,
2143    >(
2144        &self,
2145        module_id: ModuleId<A, Parameters, InstantiationArgument>,
2146        parameters: &Parameters,
2147        instantiation_argument: &InstantiationArgument,
2148        required_application_ids: Vec<ApplicationId>,
2149    ) -> Result<ClientOutcome<(ApplicationId<A>, ConfirmedBlockCertificate)>, Error> {
2150        let instantiation_argument = serde_json::to_vec(instantiation_argument)?;
2151        let parameters = serde_json::to_vec(parameters)?;
2152        Ok(self
2153            .create_application_untyped(
2154                module_id.forget_abi(),
2155                parameters,
2156                instantiation_argument,
2157                required_application_ids,
2158            )
2159            .await?
2160            .map(|(app_id, cert)| (app_id.with_abi(), cert)))
2161    }
2162
2163    /// Creates an application by instantiating some bytecode.
2164    #[instrument(
2165        level = "trace",
2166        skip(
2167            self,
2168            module_id,
2169            parameters,
2170            instantiation_argument,
2171            required_application_ids
2172        )
2173    )]
2174    pub async fn create_application_untyped(
2175        &self,
2176        module_id: ModuleId,
2177        parameters: Vec<u8>,
2178        instantiation_argument: Vec<u8>,
2179        required_application_ids: Vec<ApplicationId>,
2180    ) -> Result<ClientOutcome<(ApplicationId, ConfirmedBlockCertificate)>, Error> {
2181        self.execute_operation(SystemOperation::CreateApplication {
2182            module_id,
2183            parameters,
2184            instantiation_argument,
2185            required_application_ids,
2186        })
2187        .await?
2188        .try_map(|certificate| {
2189            // The first message of the only operation created the application.
2190            let mut creation: Vec<_> = certificate
2191                .block()
2192                .created_blob_ids()
2193                .into_iter()
2194                .filter(|blob_id| blob_id.blob_type == BlobType::ApplicationDescription)
2195                .collect();
2196            if creation.len() > 1 {
2197                return Err(Error::InternalError(
2198                    "Unexpected number of application descriptions published",
2199                ));
2200            }
2201            let blob_id = creation.pop().ok_or(Error::InternalError(
2202                "ApplicationDescription blob not found.",
2203            ))?;
2204            let id = ApplicationId::new(blob_id.hash);
2205            Ok((id, certificate))
2206        })
2207    }
2208
2209    /// Creates a new committee and starts using it (admin chains only).
2210    #[instrument(level = "trace", skip(committee))]
2211    pub async fn stage_new_committee(
2212        &self,
2213        committee: Committee,
2214    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2215        let blob = Blob::new(BlobContent::new_committee(bcs::to_bytes(&committee)?));
2216        let blob_hash = blob.id().hash;
2217        match self
2218            .execute_operations(
2219                vec![Operation::system(SystemOperation::Admin(
2220                    AdminOperation::PublishCommitteeBlob { blob_hash },
2221                ))],
2222                vec![blob],
2223            )
2224            .await?
2225        {
2226            ClientOutcome::Committed(_) => {}
2227            outcome @ ClientOutcome::WaitForTimeout(_) => return Ok(outcome),
2228        }
2229        let epoch = self.chain_info().await?.epoch.try_add_one()?;
2230        self.execute_operation(SystemOperation::Admin(AdminOperation::CreateCommittee {
2231            epoch,
2232            blob_hash,
2233        }))
2234        .await
2235    }
2236
2237    /// Synchronizes the chain with the validators and creates blocks without any operations to
2238    /// process all incoming messages. This may require several blocks.
2239    ///
2240    /// If not all certificates could be processed due to a timeout, the timestamp for when to retry
2241    /// is returned, too.
2242    #[instrument(level = "trace")]
2243    pub async fn process_inbox(
2244        &self,
2245    ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), Error> {
2246        self.prepare_chain().await?;
2247        self.process_inbox_without_prepare().await
2248    }
2249
2250    /// Creates blocks without any operations to process all incoming messages. This may require
2251    /// several blocks.
2252    ///
2253    /// If not all certificates could be processed due to a timeout, the timestamp for when to retry
2254    /// is returned, too.
2255    #[instrument(level = "trace")]
2256    pub async fn process_inbox_without_prepare(
2257        &self,
2258    ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), Error> {
2259        #[cfg(with_metrics)]
2260        let _latency = super::metrics::PROCESS_INBOX_WITHOUT_PREPARE_LATENCY.measure_latency();
2261
2262        let mut certificates = Vec::new();
2263        loop {
2264            // We provide no operations - this means that the only operations executed
2265            // will be epoch changes, receiving messages and processing event stream
2266            // updates, if any are pending.
2267            match self.execute_block(vec![], vec![]).await {
2268                Ok(ExecuteBlockOutcome::Executed(certificate))
2269                | Ok(ExecuteBlockOutcome::Conflict(certificate)) => certificates.push(certificate),
2270                Ok(ExecuteBlockOutcome::WaitForTimeout(timeout)) => {
2271                    return Ok((certificates, Some(timeout)));
2272                }
2273                // Nothing in the inbox and no stream updates to be processed.
2274                Err(Error::LocalNodeError(LocalNodeError::WorkerError(
2275                    WorkerError::ChainError(chain_error),
2276                ))) if matches!(*chain_error, ChainError::EmptyBlock) => {
2277                    return Ok((certificates, None));
2278                }
2279                Err(error) => return Err(error),
2280            };
2281        }
2282    }
2283
2284    /// Returns operations to process all pending epoch changes: first the new epochs, in order,
2285    /// then the removed epochs, in order.
2286    async fn collect_epoch_changes(&self) -> Result<Vec<Operation>, Error> {
2287        let (mut min_epoch, mut next_epoch) = {
2288            let (epoch, committees) = self.epoch_and_committees().await?;
2289            let min_epoch = *committees.keys().next().unwrap_or(&Epoch::ZERO);
2290            (min_epoch, epoch.try_add_one()?)
2291        };
2292        let mut epoch_change_ops = Vec::new();
2293        while self
2294            .has_admin_event(EPOCH_STREAM_NAME, next_epoch.0)
2295            .await?
2296        {
2297            epoch_change_ops.push(Operation::system(SystemOperation::ProcessNewEpoch(
2298                next_epoch,
2299            )));
2300            next_epoch.try_add_assign_one()?;
2301        }
2302        while self
2303            .has_admin_event(REMOVED_EPOCH_STREAM_NAME, min_epoch.0)
2304            .await?
2305        {
2306            epoch_change_ops.push(Operation::system(SystemOperation::ProcessRemovedEpoch(
2307                min_epoch,
2308            )));
2309            min_epoch.try_add_assign_one()?;
2310        }
2311        Ok(epoch_change_ops)
2312    }
2313
2314    /// Returns whether the system event on the admin chain with the given stream name and key
2315    /// exists in storage.
2316    async fn has_admin_event(&self, stream_name: &[u8], index: u32) -> Result<bool, Error> {
2317        let event_id = EventId {
2318            chain_id: self.client.admin_id,
2319            stream_id: StreamId::system(stream_name),
2320            index,
2321        };
2322        Ok(self
2323            .client
2324            .storage_client()
2325            .read_event(event_id)
2326            .await?
2327            .is_some())
2328    }
2329
2330    /// Returns the indices and events from the storage
2331    pub async fn events_from_index(
2332        &self,
2333        stream_id: StreamId,
2334        start_index: u32,
2335    ) -> Result<Vec<IndexAndEvent>, Error> {
2336        Ok(self
2337            .client
2338            .storage_client()
2339            .read_events_from_index(&self.chain_id, &stream_id, start_index)
2340            .await?)
2341    }
2342
2343    /// Deprecates all the configurations of voting rights up to the given one (admin chains
2344    /// only). Currently, each individual chain is still entitled to wait before accepting
2345    /// this command. However, it is expected that deprecated validators stop functioning
2346    /// shortly after such command is issued.
2347    #[instrument(level = "trace")]
2348    pub async fn revoke_epochs(
2349        &self,
2350        revoked_epoch: Epoch,
2351    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2352        self.prepare_chain().await?;
2353        let (current_epoch, committees) = self.epoch_and_committees().await?;
2354        ensure!(
2355            revoked_epoch < current_epoch,
2356            Error::CannotRevokeCurrentEpoch(current_epoch)
2357        );
2358        ensure!(
2359            committees.contains_key(&revoked_epoch),
2360            Error::EpochAlreadyRevoked
2361        );
2362        let operations = committees
2363            .keys()
2364            .filter_map(|epoch| {
2365                if *epoch <= revoked_epoch {
2366                    Some(Operation::system(SystemOperation::Admin(
2367                        AdminOperation::RemoveCommittee { epoch: *epoch },
2368                    )))
2369                } else {
2370                    None
2371                }
2372            })
2373            .collect();
2374        self.execute_operations(operations, vec![]).await
2375    }
2376
2377    /// Sends money to a chain.
2378    /// Do not check balance. (This may block the client)
2379    /// Do not confirm the transaction.
2380    #[instrument(level = "trace")]
2381    pub async fn transfer_to_account_unsafe_unconfirmed(
2382        &self,
2383        owner: AccountOwner,
2384        amount: Amount,
2385        recipient: Account,
2386    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2387        self.execute_operation(SystemOperation::Transfer {
2388            owner,
2389            recipient,
2390            amount,
2391        })
2392        .await
2393    }
2394
2395    #[instrument(level = "trace", skip(hash))]
2396    pub async fn read_confirmed_block(&self, hash: CryptoHash) -> Result<ConfirmedBlock, Error> {
2397        let block = self
2398            .client
2399            .storage_client()
2400            .read_confirmed_block(hash)
2401            .await?;
2402        block.ok_or(Error::MissingConfirmedBlock(hash))
2403    }
2404
2405    #[instrument(level = "trace", skip(hash))]
2406    pub async fn read_certificate(
2407        &self,
2408        hash: CryptoHash,
2409    ) -> Result<ConfirmedBlockCertificate, Error> {
2410        let certificate = self.client.storage_client().read_certificate(hash).await?;
2411        certificate.ok_or(Error::ReadCertificatesError(vec![hash]))
2412    }
2413
2414    /// Handles any cross-chain requests for any pending outgoing messages.
2415    #[instrument(level = "trace")]
2416    pub async fn retry_pending_outgoing_messages(&self) -> Result<(), Error> {
2417        self.client
2418            .local_node
2419            .retry_pending_cross_chain_requests(self.chain_id)
2420            .await?;
2421        Ok(())
2422    }
2423
2424    #[instrument(level = "trace", skip(local_node))]
2425    async fn local_chain_info(
2426        &self,
2427        chain_id: ChainId,
2428        local_node: &mut LocalNodeClient<Env::Storage>,
2429    ) -> Result<Option<Box<ChainInfo>>, Error> {
2430        match local_node.chain_info(chain_id).await {
2431            Ok(info) => {
2432                // Useful in case `chain_id` is the same as a local chain.
2433                self.client.update_from_info(&info);
2434                Ok(Some(info))
2435            }
2436            Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => Ok(None),
2437            Err(err) => Err(err.into()),
2438        }
2439    }
2440
2441    #[instrument(level = "trace", skip(chain_id, local_node))]
2442    async fn local_next_block_height(
2443        &self,
2444        chain_id: ChainId,
2445        local_node: &mut LocalNodeClient<Env::Storage>,
2446    ) -> Result<BlockHeight, Error> {
2447        Ok(self
2448            .local_chain_info(chain_id, local_node)
2449            .await?
2450            .map_or(BlockHeight::ZERO, |info| info.next_block_height))
2451    }
2452
2453    /// Returns the next height we expect to receive from the given sender chain, according to the
2454    /// local inbox.
2455    #[instrument(level = "trace")]
2456    async fn local_next_height_to_receive(&self, origin: ChainId) -> Result<BlockHeight, Error> {
2457        Ok(self
2458            .client
2459            .local_node
2460            .get_inbox_next_height(self.chain_id, origin)
2461            .await?)
2462    }
2463
2464    #[instrument(level = "trace", skip(remote_node, local_node, notification))]
2465    async fn process_notification(
2466        &self,
2467        remote_node: RemoteNode<Env::ValidatorNode>,
2468        mut local_node: LocalNodeClient<Env::Storage>,
2469        notification: Notification,
2470        listening_mode: &ListeningMode,
2471    ) -> Result<(), Error> {
2472        if !listening_mode.is_relevant(&notification.reason) {
2473            debug!(
2474                chain_id = %self.chain_id,
2475                reason = ?notification.reason,
2476                "Ignoring notification due to listening mode"
2477            );
2478            return Ok(());
2479        }
2480        match notification.reason {
2481            Reason::NewIncomingBundle { origin, height } => {
2482                if self.local_next_height_to_receive(origin).await? > height {
2483                    debug!(
2484                        chain_id = %self.chain_id,
2485                        "Accepting redundant notification for new message"
2486                    );
2487                    return Ok(());
2488                }
2489                self.client
2490                    .download_sender_block_with_sending_ancestors(
2491                        self.chain_id,
2492                        origin,
2493                        height,
2494                        &remote_node,
2495                    )
2496                    .await?;
2497                if self.local_next_height_to_receive(origin).await? <= height {
2498                    info!(
2499                        chain_id = %self.chain_id,
2500                        "NewIncomingBundle: Fail to synchronize new message after notification"
2501                    );
2502                }
2503            }
2504            Reason::NewBlock { height, .. } => {
2505                let chain_id = notification.chain_id;
2506                if self
2507                    .local_next_block_height(chain_id, &mut local_node)
2508                    .await?
2509                    > height
2510                {
2511                    debug!(
2512                        chain_id = %self.chain_id,
2513                        "Accepting redundant notification for new block"
2514                    );
2515                    return Ok(());
2516                }
2517                self.client
2518                    .synchronize_chain_state_from(&remote_node, chain_id)
2519                    .await?;
2520                if self
2521                    .local_next_block_height(chain_id, &mut local_node)
2522                    .await?
2523                    <= height
2524                {
2525                    info!("NewBlock: Fail to synchronize new block after notification");
2526                }
2527                trace!(
2528                    chain_id = %self.chain_id,
2529                    %height,
2530                    "NewBlock: processed notification",
2531                );
2532            }
2533            Reason::NewEvents { height, hash, .. } => {
2534                if self
2535                    .local_next_block_height(notification.chain_id, &mut local_node)
2536                    .await?
2537                    > height
2538                {
2539                    debug!(
2540                        chain_id = %self.chain_id,
2541                        "Accepting redundant notification for new block"
2542                    );
2543                    return Ok(());
2544                }
2545                trace!(
2546                    chain_id = %self.chain_id,
2547                    %height,
2548                    "NewEvents: processing notification"
2549                );
2550                let mut certificates = remote_node.node.download_certificates(vec![hash]).await?;
2551                // download_certificates ensures that we will get exactly one
2552                // certificate in the result.
2553                let certificate = certificates
2554                    .pop()
2555                    .expect("download_certificates should have returned one certificate");
2556                self.client
2557                    .receive_sender_certificate(
2558                        certificate,
2559                        ReceiveCertificateMode::NeedsCheck,
2560                        None,
2561                    )
2562                    .await?;
2563            }
2564            Reason::NewRound { height, round } => {
2565                let chain_id = notification.chain_id;
2566                if let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? {
2567                    if (info.next_block_height, info.manager.current_round) >= (height, round) {
2568                        debug!(
2569                            chain_id = %self.chain_id,
2570                            "Accepting redundant notification for new round"
2571                        );
2572                        return Ok(());
2573                    }
2574                }
2575                self.client
2576                    .synchronize_chain_state_from(&remote_node, chain_id)
2577                    .await?;
2578                let Some(info) = self.local_chain_info(chain_id, &mut local_node).await? else {
2579                    error!(
2580                        chain_id = %self.chain_id,
2581                        "NewRound: Fail to read local chain info for {chain_id}"
2582                    );
2583                    return Ok(());
2584                };
2585                if (info.next_block_height, info.manager.current_round) < (height, round) {
2586                    error!(
2587                        chain_id = %self.chain_id,
2588                        "NewRound: Fail to synchronize new block after notification"
2589                    );
2590                }
2591            }
2592            Reason::BlockExecuted { .. } => {
2593                // No action needed.
2594            }
2595        }
2596        Ok(())
2597    }
2598
2599    /// Returns whether this chain is tracked by the client, i.e. we are updating its inbox.
2600    pub fn is_tracked(&self) -> bool {
2601        self.client
2602            .tracked_chains
2603            .read()
2604            .unwrap()
2605            .contains(&self.chain_id)
2606    }
2607
2608    /// Spawns a task that listens to notifications about the current chain from all validators,
2609    /// and synchronizes the local state accordingly.
2610    #[instrument(level = "trace", fields(chain_id = ?self.chain_id))]
2611    pub async fn listen(
2612        &self,
2613        listening_mode: ListeningMode,
2614    ) -> Result<(impl Future<Output = ()>, AbortOnDrop, NotificationStream), Error> {
2615        use future::FutureExt as _;
2616
2617        async fn await_while_polling<F: FusedFuture>(
2618            future: F,
2619            background_work: impl FusedStream<Item = ()>,
2620        ) -> F::Output {
2621            tokio::pin!(future);
2622            tokio::pin!(background_work);
2623            loop {
2624                futures::select! {
2625                    _ = background_work.next() => (),
2626                    result = future => return result,
2627                }
2628            }
2629        }
2630
2631        let mut senders = HashMap::new(); // Senders to cancel notification streams.
2632        let notifications = self.subscribe()?;
2633        let (abortable_notifications, abort) = stream::abortable(self.subscribe()?);
2634
2635        // Beware: if this future ceases to make progress, notification processing will
2636        // deadlock, because of the issue described in
2637        // https://github.com/linera-io/linera-protocol/pull/1173.
2638
2639        // TODO(#2013): replace this lock with an asynchronous communication channel
2640
2641        let mut process_notifications = FuturesUnordered::new();
2642
2643        match self
2644            .update_notification_streams(&mut senders, &listening_mode)
2645            .await
2646        {
2647            Ok(handler) => process_notifications.push(handler),
2648            Err(error) => error!("Failed to update committee: {error}"),
2649        };
2650
2651        let this = self.clone();
2652        let update_streams = async move {
2653            let mut abortable_notifications = abortable_notifications.fuse();
2654
2655            while let Some(notification) =
2656                await_while_polling(abortable_notifications.next(), &mut process_notifications)
2657                    .await
2658            {
2659                if let Reason::NewBlock { .. } = notification.reason {
2660                    match Box::pin(await_while_polling(
2661                        this.update_notification_streams(&mut senders, &listening_mode)
2662                            .fuse(),
2663                        &mut process_notifications,
2664                    ))
2665                    .await
2666                    {
2667                        Ok(handler) => process_notifications.push(handler),
2668                        Err(error) => error!("Failed to update committee: {error}"),
2669                    }
2670                }
2671            }
2672
2673            for abort in senders.into_values() {
2674                abort.abort();
2675            }
2676
2677            let () = process_notifications.collect().await;
2678        }
2679        .in_current_span();
2680
2681        Ok((update_streams, AbortOnDrop(abort), notifications))
2682    }
2683
2684    #[instrument(level = "trace", skip(senders))]
2685    async fn update_notification_streams(
2686        &self,
2687        senders: &mut HashMap<ValidatorPublicKey, AbortHandle>,
2688        listening_mode: &ListeningMode,
2689    ) -> Result<impl Future<Output = ()>, Error> {
2690        let (nodes, local_node) = {
2691            let committee = self.local_committee().await?;
2692            let nodes: HashMap<_, _> = self
2693                .client
2694                .validator_node_provider()
2695                .make_nodes(&committee)?
2696                .collect();
2697            (nodes, self.client.local_node.clone())
2698        };
2699        // Drop removed validators.
2700        senders.retain(|validator, abort| {
2701            if !nodes.contains_key(validator) {
2702                abort.abort();
2703            }
2704            !abort.is_aborted()
2705        });
2706        // Add tasks for new validators.
2707        let validator_tasks = FuturesUnordered::new();
2708        for (public_key, node) in nodes {
2709            let hash_map::Entry::Vacant(entry) = senders.entry(public_key) else {
2710                continue;
2711            };
2712            let address = node.address();
2713            let this = self.clone();
2714            let stream = stream::once({
2715                let node = node.clone();
2716                async move {
2717                    let stream = node.subscribe(vec![this.chain_id]).await?;
2718                    // Only now the notification stream is established. We may have missed
2719                    // notifications since the last time we synchronized.
2720                    let remote_node = RemoteNode { public_key, node };
2721                    this.client
2722                        .synchronize_chain_state_from(&remote_node, this.chain_id)
2723                        .await?;
2724                    Ok::<_, Error>(stream)
2725                }
2726            })
2727            .filter_map(move |result| {
2728                let address = address.clone();
2729                async move {
2730                    if let Err(error) = &result {
2731                        info!(?error, address, "could not connect to validator");
2732                    } else {
2733                        debug!(address, "connected to validator");
2734                    }
2735                    result.ok()
2736                }
2737            })
2738            .flatten();
2739            let (stream, abort) = stream::abortable(stream);
2740            let mut stream = Box::pin(stream);
2741            let this = self.clone();
2742            let local_node = local_node.clone();
2743            let remote_node = RemoteNode { public_key, node };
2744            let listening_mode_cloned = listening_mode.clone();
2745            validator_tasks.push(async move {
2746                while let Some(notification) = stream.next().await {
2747                    if let Err(error) = this
2748                        .process_notification(
2749                            remote_node.clone(),
2750                            local_node.clone(),
2751                            notification.clone(),
2752                            &listening_mode_cloned,
2753                        )
2754                        .await
2755                    {
2756                        tracing::info!(
2757                            chain_id = %this.chain_id,
2758                            address = remote_node.address(),
2759                            ?notification,
2760                            %error,
2761                            "failed to process notification",
2762                        );
2763                    }
2764                }
2765            });
2766            entry.insert(abort);
2767        }
2768        Ok(validator_tasks.collect())
2769    }
2770
2771    /// Attempts to update a validator with the local information.
2772    #[instrument(level = "trace", skip(remote_node))]
2773    pub async fn sync_validator(&self, remote_node: Env::ValidatorNode) -> Result<(), Error> {
2774        let validator_next_block_height = match remote_node
2775            .handle_chain_info_query(ChainInfoQuery::new(self.chain_id))
2776            .await
2777        {
2778            Ok(info) => info.info.next_block_height,
2779            Err(NodeError::BlobsNotFound(_)) => BlockHeight::ZERO,
2780            Err(err) => return Err(err.into()),
2781        };
2782        let local_next_block_height = self.chain_info().await?.next_block_height;
2783
2784        if validator_next_block_height >= local_next_block_height {
2785            debug!("Validator is up-to-date with local state");
2786            return Ok(());
2787        }
2788
2789        let heights: Vec<_> = (validator_next_block_height.0..local_next_block_height.0)
2790            .map(BlockHeight)
2791            .collect();
2792
2793        let missing_certificate_hashes = self
2794            .client
2795            .local_node
2796            .get_block_hashes(self.chain_id, heights)
2797            .await?;
2798
2799        let certificates = self
2800            .client
2801            .storage_client()
2802            .read_certificates(missing_certificate_hashes.clone())
2803            .await?;
2804        let certificates =
2805            match ResultReadCertificates::new(certificates, missing_certificate_hashes) {
2806                ResultReadCertificates::Certificates(certificates) => certificates,
2807                ResultReadCertificates::InvalidHashes(hashes) => {
2808                    return Err(Error::ReadCertificatesError(hashes))
2809                }
2810            };
2811        for certificate in certificates {
2812            match remote_node
2813                .handle_confirmed_certificate(
2814                    certificate.clone(),
2815                    CrossChainMessageDelivery::NonBlocking,
2816                )
2817                .await
2818            {
2819                Ok(_) => (),
2820                Err(NodeError::BlobsNotFound(missing_blob_ids)) => {
2821                    // Upload the missing blobs we have and retry.
2822                    let missing_blobs: Vec<_> = self
2823                        .client
2824                        .storage_client()
2825                        .read_blobs(&missing_blob_ids)
2826                        .await?
2827                        .into_iter()
2828                        .flatten()
2829                        .collect();
2830                    remote_node.upload_blobs(missing_blobs).await?;
2831                    remote_node
2832                        .handle_confirmed_certificate(
2833                            certificate,
2834                            CrossChainMessageDelivery::NonBlocking,
2835                        )
2836                        .await?;
2837                }
2838                Err(err) => return Err(err.into()),
2839            }
2840        }
2841
2842        Ok(())
2843    }
2844}
2845
2846#[cfg(with_testing)]
2847impl<Env: Environment> ChainClient<Env> {
2848    pub async fn process_notification_from(
2849        &self,
2850        notification: Notification,
2851        validator: (ValidatorPublicKey, &str),
2852    ) {
2853        let mut node_list = self
2854            .client
2855            .validator_node_provider()
2856            .make_nodes_from_list(vec![validator])
2857            .unwrap();
2858        let (public_key, node) = node_list.next().unwrap();
2859        let remote_node = RemoteNode { node, public_key };
2860        let local_node = self.client.local_node.clone();
2861        self.process_notification(
2862            remote_node,
2863            local_node,
2864            notification,
2865            &ListeningMode::FullChain,
2866        )
2867        .await
2868        .unwrap();
2869    }
2870}