Skip to main content

linera_core/client/chain_client/
mod.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4mod state;
5use std::{
6    collections::{hash_map, BTreeMap, BTreeSet, HashMap, HashSet},
7    convert::Infallible,
8    iter,
9    sync::Arc,
10};
11
12use custom_debug_derive::Debug;
13use futures::{
14    future::{self, Either, FusedFuture, Future},
15    stream::{self, AbortHandle, FusedStream, FuturesUnordered, StreamExt, TryStreamExt},
16};
17#[cfg(with_metrics)]
18use linera_base::prometheus_util::MeasureLatency as _;
19use linera_base::{
20    abi::Abi,
21    crypto::{signer, CryptoHash, Signer, ValidatorPublicKey},
22    data_types::{
23        Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
24        ChainDescription, Epoch, MessagePolicy, Round, TimeDelta, Timestamp,
25    },
26    ensure,
27    identifiers::{
28        Account, AccountOwner, ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent,
29        ModuleId, StreamId,
30    },
31    ownership::{ChainOwnership, TimeoutConfig},
32    time::{Duration, Instant},
33};
34#[cfg(not(target_arch = "wasm32"))]
35use linera_base::{data_types::Bytecode, vm::VmRuntime};
36use linera_chain::{
37    data_types::{
38        BlockProposal, BundleExecutionPolicy, BundleFailurePolicy, ChainAndHeight, IncomingBundle,
39        ProposedBlock, Transaction,
40    },
41    manager::LockingBlock,
42    types::{
43        Block, ConfirmedBlock, ConfirmedBlockCertificate, Timeout, TimeoutCertificate,
44        ValidatedBlock,
45    },
46    ChainError, ChainExecutionContext,
47};
48use linera_execution::{
49    committee::Committee,
50    system::{
51        AdminOperation, OpenChainConfig, SystemOperation, EPOCH_STREAM_NAME,
52        REMOVED_EPOCH_STREAM_NAME,
53    },
54    ExecutionError, Operation, Query, QueryOutcome,
55};
56use linera_storage::{Arc as CacheArc, Clock as _, Storage as _};
57use linera_views::ViewError;
58use serde::Serialize;
59pub use state::State;
60use thiserror::Error;
61use tokio::sync::mpsc;
62use tokio_stream::wrappers::UnboundedReceiverStream;
63use tracing::{debug, error, info, instrument, trace, warn, Instrument as _};
64
65use super::{
66    received_log::ReceivedLogs, validator_trackers::ValidatorTrackers, AbortOnDrop, Client,
67    ListeningMode, PendingProposal, TimingType,
68};
69use crate::{
70    data_types::{ChainInfo, ChainInfoQuery, ClientOutcome, RoundTimeout},
71    environment::Environment,
72    local_node::{LocalNodeClient, LocalNodeError},
73    node::{
74        CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
75        ValidatorNodeProvider as _,
76    },
77    remote_node::RemoteNode,
78    updater::{communicate_with_quorum, CommunicateAction, CommunicationError},
79    worker::{Notification, Reason, WorkerError},
80};
81
82#[derive(Debug, Clone)]
83pub struct Options {
84    /// Maximum number of pending message bundles processed at a time in a block.
85    pub max_pending_message_bundles: usize,
86    /// Maximum number of message bundles to discard from a block proposal due to block limit
87    /// errors before discarding all remaining bundles.
88    ///
89    /// Discarded bundles can be retried in the next block.
90    pub max_block_limit_errors: u32,
91    /// Time budget for staging message bundles. When set, limits bundle execution by
92    /// wall-clock time, in addition to the count limit from `max_pending_message_bundles`.
93    pub staging_bundles_time_budget: Option<Duration>,
94    /// The policy for automatically handling incoming messages.
95    pub message_policy: MessagePolicy,
96    /// Chain IDs whose incoming bundles should be processed first when proposing a block.
97    pub priority_bundle_origins: HashSet<ChainId>,
98    /// Whether to block on cross-chain message delivery.
99    pub cross_chain_message_delivery: CrossChainMessageDelivery,
100    /// An additional delay, after reaching a quorum, to wait for additional validator signatures,
101    /// as a fraction of time taken to reach quorum.
102    pub quorum_grace_period: f64,
103    /// The delay when downloading a blob, after which we try a second validator.
104    pub blob_download_timeout: Duration,
105    /// The delay when downloading a batch of certificates, after which we try a second validator.
106    pub certificate_batch_download_timeout: Duration,
107    /// Maximum number of certificates that we download at a time from one validator when
108    /// synchronizing one of our chains.
109    pub certificate_download_batch_size: u64,
110    /// Maximum number of certificates read from local storage and uploaded to a validator
111    /// at a time when synchronizing a chain.
112    pub certificate_upload_batch_size: usize,
113    /// Maximum number of sender certificates we try to download and receive in one go
114    /// when syncing sender chains.
115    pub sender_certificate_download_batch_size: usize,
116    /// Maximum number of certificate batches downloaded concurrently during chain sync.
117    pub max_concurrent_batch_downloads: usize,
118    /// Maximum number of tasks that can be joined concurrently using buffer_unordered.
119    pub max_joined_tasks: usize,
120    /// Whether to allow creating blocks in the fast round. Fast blocks have lower latency but
121    /// must be used carefully so that there are never any conflicting fast block proposals.
122    pub allow_fast_blocks: bool,
123    /// Whether to apply the multi-leader jitter delay before proposing in a multi-leader
124    /// round with index `>= 1`, to spread out concurrent proposals across honest clients.
125    /// The owner with the lowest `hash(owner, round)` still proposes immediately. The
126    /// jitter only takes effect when the round has a configured timeout.
127    pub multi_leader_jitter: bool,
128    /// Initial probe interval for the notification circuit breaker. When a validator's
129    /// notification stream exhausts retries, the circuit breaker waits this long before
130    /// probing again. Doubles on each failed probe.
131    pub notification_circuit_breaker_initial_probe_interval: Duration,
132    /// Maximum probe interval for the notification circuit breaker. The probe interval
133    /// doubles on each failure but is capped at this value.
134    pub notification_circuit_breaker_max_probe_interval: Duration,
135    /// Maximum number of event stream IDs to include in a single `PreviousEventBlocks`
136    /// request. Larger sets are split into multiple requests.
137    pub max_event_stream_queries: usize,
138}
139
140struct CircuitBreakerState {
141    next_probe_at: Instant,
142    probe_interval: Duration,
143}
144
145/// A fingerprint of the chain manager's observable consensus state, used to detect
146/// whether a per-validator updater absorbed new info during a proposal attempt.
147#[derive(Clone, Copy, Debug, PartialEq, Eq)]
148struct ConsensusStateSnapshot {
149    next_block_height: BlockHeight,
150    current_round: Round,
151    lock_round: Option<Round>,
152    timeout_round: Option<Round>,
153}
154
155#[cfg(with_testing)]
156impl Options {
157    pub fn test_default() -> Self {
158        use super::{
159            DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE, DEFAULT_CERTIFICATE_UPLOAD_BATCH_SIZE,
160            DEFAULT_MAX_CONCURRENT_BATCH_DOWNLOADS, DEFAULT_MAX_EVENT_STREAM_QUERIES,
161            DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
162        };
163        use crate::DEFAULT_QUORUM_GRACE_PERIOD;
164
165        Options {
166            max_pending_message_bundles: 10,
167            max_block_limit_errors: 3,
168            staging_bundles_time_budget: None,
169            message_policy: MessagePolicy::default(),
170            priority_bundle_origins: HashSet::new(),
171            cross_chain_message_delivery: CrossChainMessageDelivery::NonBlocking,
172            quorum_grace_period: DEFAULT_QUORUM_GRACE_PERIOD,
173            blob_download_timeout: Duration::from_secs(1),
174            certificate_batch_download_timeout: Duration::from_secs(1),
175            certificate_download_batch_size: DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
176            certificate_upload_batch_size: DEFAULT_CERTIFICATE_UPLOAD_BATCH_SIZE,
177            sender_certificate_download_batch_size: DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
178            max_concurrent_batch_downloads: DEFAULT_MAX_CONCURRENT_BATCH_DOWNLOADS,
179            max_joined_tasks: 100,
180            allow_fast_blocks: false,
181            multi_leader_jitter: false,
182            notification_circuit_breaker_initial_probe_interval: Duration::from_secs(300),
183            notification_circuit_breaker_max_probe_interval: Duration::from_secs(3600),
184            max_event_stream_queries: DEFAULT_MAX_EVENT_STREAM_QUERIES,
185        }
186    }
187}
188
189impl Options {
190    /// Builds the [`BundleExecutionPolicy`] based on the client options.
191    pub fn bundle_execution_policy(&self) -> BundleExecutionPolicy {
192        BundleExecutionPolicy {
193            on_failure: BundleFailurePolicy::AutoRetry {
194                max_failures: self.max_block_limit_errors,
195                never_reject_application_ids: Arc::new(
196                    self.message_policy.never_reject_application_ids.clone(),
197                ),
198            },
199            time_budget: self.staging_bundles_time_budget,
200        }
201    }
202}
203
204/// Client to operate a chain by interacting with validators and the given local storage
205/// implementation.
206/// * The chain being operated is called the "local chain" or just the "chain".
207/// * As a rule, operations are considered successful (and communication may stop) when
208///   they succeeded in gathering a quorum of responses.
209#[derive(Debug)]
210pub struct ChainClient<Env: Environment> {
211    /// The Linera [`Client`] that manages operations for this chain client.
212    #[debug(skip)]
213    pub(crate) client: Arc<Client<Env>>,
214    /// The off-chain chain ID.
215    chain_id: ChainId,
216    /// The client options.
217    #[debug(skip)]
218    options: Options,
219    /// The preferred owner of the chain used to sign proposals.
220    /// `None` if we cannot propose on this chain.
221    preferred_owner: Option<AccountOwner>,
222    /// The next block height as read from the wallet.
223    initial_next_block_height: BlockHeight,
224    /// The last block hash as read from the wallet.
225    initial_block_hash: Option<CryptoHash>,
226    /// Optional timing sender for benchmarking.
227    timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
228    /// Sender chain IDs whose bundles were discarded due to the never-reject policy.
229    /// These origins are excluded from `process_inbox` until the client is restarted.
230    skipped_origins: Arc<papaya::HashSet<ChainId>>,
231}
232
233impl<Env: Environment> Clone for ChainClient<Env> {
234    fn clone(&self) -> Self {
235        Self {
236            client: self.client.clone(),
237            chain_id: self.chain_id,
238            options: self.options.clone(),
239            preferred_owner: self.preferred_owner,
240            initial_next_block_height: self.initial_next_block_height,
241            initial_block_hash: self.initial_block_hash,
242            timing_sender: self.timing_sender.clone(),
243            skipped_origins: self.skipped_origins.clone(),
244        }
245    }
246}
247
248/// Error type for [`ChainClient`].
249#[derive(Debug, Error)]
250pub enum Error {
251    #[error("Local node operation failed: {0}")]
252    LocalNodeError(#[from] LocalNodeError),
253
254    #[error("Remote node operation failed: {0}")]
255    RemoteNodeError(#[from] NodeError),
256
257    #[error(transparent)]
258    ArithmeticError(#[from] ArithmeticError),
259
260    #[error("Missing certificates: {0:?}")]
261    ReadCertificatesError(Vec<CryptoHash>),
262
263    #[error("Missing confirmed block: {0:?}")]
264    MissingConfirmedBlock(CryptoHash),
265
266    #[error("JSON (de)serialization error: {0}")]
267    JsonError(#[from] serde_json::Error),
268
269    #[error("Chain operation failed: {0}")]
270    ChainError(#[from] ChainError),
271
272    #[error(transparent)]
273    CommunicationError(#[from] CommunicationError<NodeError>),
274
275    #[error("Internal error within chain client: {0}")]
276    InternalError(&'static str),
277
278    #[error(
279        "Cannot accept a certificate from an unknown committee in the future. \
280         Please synchronize the local view of the admin chain"
281    )]
282    CommitteeSynchronizationError,
283
284    #[error("The local node is behind the trusted state in wallet and needs synchronization with validators")]
285    WalletSynchronizationError,
286
287    #[error("The state of the client is incompatible with the proposed block: {0}")]
288    BlockProposalError(&'static str),
289
290    #[error(
291        "Cannot accept a certificate from a committee that was retired. \
292         Try a newer certificate from the same origin"
293    )]
294    CommitteeDeprecationError,
295
296    #[error("Protocol error within chain client: {0}")]
297    ProtocolError(&'static str),
298
299    #[error("Signer doesn't have key to sign for chain {0}")]
300    CannotFindKeyForChain(ChainId),
301
302    #[error("client is not configured to propose on chain {0}")]
303    NoAccountKeyConfigured(ChainId),
304
305    #[error("The chain client isn't owner on chain {0}")]
306    NotAnOwner(ChainId),
307
308    #[error(transparent)]
309    ViewError(#[from] ViewError),
310
311    #[error(
312        "Failed to download certificates and update local node to the next height \
313         {target_next_block_height} of chain {chain_id}"
314    )]
315    CannotDownloadCertificates {
316        chain_id: ChainId,
317        target_next_block_height: BlockHeight,
318    },
319
320    #[error(transparent)]
321    BcsError(#[from] bcs::Error),
322
323    #[error(
324        "Unexpected quorum: validators voted for block hash {hash} in {round}, \
325         expected block hash {expected_hash} in {expected_round}"
326    )]
327    UnexpectedQuorum {
328        hash: CryptoHash,
329        round: Round,
330        expected_hash: CryptoHash,
331        expected_round: Round,
332    },
333
334    #[error("signer error: {0:?}")]
335    Signer(#[source] Box<dyn signer::Error>),
336
337    #[error("Cannot revoke the current epoch {0}")]
338    CannotRevokeCurrentEpoch(Epoch),
339
340    #[error("Epoch is already revoked")]
341    EpochAlreadyRevoked,
342
343    #[error("Failed to download missing sender blocks from chain {chain_id} at height {height}")]
344    CannotDownloadMissingSenderBlock {
345        chain_id: ChainId,
346        height: BlockHeight,
347    },
348
349    #[error(
350        "A different block was already committed at this height. \
351         The committed certificate hash is {0}"
352    )]
353    Conflict(CryptoHash),
354
355    #[error(
356        "Execution outcome mismatch: AutoRetry and committed execution produced \
357         different outcomes for the same block"
358    )]
359    ExecutionOutcomeMismatch,
360}
361
362impl From<Infallible> for Error {
363    fn from(infallible: Infallible) -> Self {
364        match infallible {}
365    }
366}
367
368impl Error {
369    pub fn signer_failure(err: impl signer::Error + 'static) -> Self {
370        Self::Signer(Box::new(err))
371    }
372}
373
374impl<Env: Environment> ChainClient<Env> {
375    pub fn new(
376        client: Arc<Client<Env>>,
377        chain_id: ChainId,
378        options: Options,
379        initial_block_hash: Option<CryptoHash>,
380        initial_next_block_height: BlockHeight,
381        preferred_owner: Option<AccountOwner>,
382        timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
383    ) -> Self {
384        ChainClient {
385            client,
386            chain_id,
387            options,
388            preferred_owner,
389            initial_block_hash,
390            initial_next_block_height,
391            timing_sender,
392            skipped_origins: Arc::new(papaya::HashSet::new()),
393        }
394    }
395
396    /// Returns whether this chain is in follow-only mode.
397    pub fn is_follow_only(&self) -> bool {
398        self.client.is_chain_follow_only(self.chain_id)
399    }
400
401    /// Returns the proposal mutex for this chain.
402    ///
403    /// The mutex serializes block proposals and holds the pending proposal (if any).
404    #[instrument(level = "trace", skip(self))]
405    fn proposal_mutex(&self) -> Arc<tokio::sync::Mutex<Option<PendingProposal>>> {
406        self.client
407            .chains
408            .pin()
409            .get(&self.chain_id)
410            .expect("Chain client constructed for invalid chain")
411            .proposal_mutex()
412    }
413
414    /// Returns the pending proposal, if any.
415    #[instrument(level = "trace", skip(self))]
416    pub async fn pending_proposal(&self) -> Option<PendingProposal> {
417        self.proposal_mutex().lock().await.clone()
418    }
419
420    /// Gets a reference to the client's signer instance.
421    #[instrument(level = "trace", skip(self))]
422    pub fn signer(&self) -> &impl Signer {
423        self.client.signer()
424    }
425
426    /// Returns whether the signer has a key for the given owner.
427    pub async fn has_key_for(&self, owner: &AccountOwner) -> Result<bool, Error> {
428        self.client.has_key_for(owner).await
429    }
430
431    /// Gets a mutable reference to the per-`ChainClient` options.
432    #[instrument(level = "trace", skip(self))]
433    pub fn options_mut(&mut self) -> &mut Options {
434        &mut self.options
435    }
436
437    /// Gets a reference to the per-`ChainClient` options.
438    #[instrument(level = "trace", skip(self))]
439    pub fn options(&self) -> &Options {
440        &self.options
441    }
442
443    /// Gets the ID of the associated chain.
444    #[instrument(level = "trace", skip(self))]
445    pub fn chain_id(&self) -> ChainId {
446        self.chain_id
447    }
448
449    /// Gets a clone of the timing sender for benchmarking.
450    pub fn timing_sender(&self) -> Option<mpsc::UnboundedSender<(u64, TimingType)>> {
451        self.timing_sender.clone()
452    }
453
454    /// Gets the ID of the admin chain.
455    #[instrument(level = "trace", skip(self))]
456    pub fn admin_chain_id(&self) -> ChainId {
457        self.client.admin_chain_id
458    }
459
460    /// Gets the currently preferred owner for signing the blocks.
461    #[instrument(level = "trace", skip(self))]
462    pub fn preferred_owner(&self) -> Option<AccountOwner> {
463        self.preferred_owner
464    }
465
466    /// Sets the new, preferred owner for signing the blocks.
467    #[instrument(level = "trace", skip(self))]
468    pub fn set_preferred_owner(&mut self, preferred_owner: AccountOwner) {
469        self.preferred_owner = Some(preferred_owner);
470    }
471
472    /// Obtains a `ChainStateView` for this client's chain.
473    #[instrument(level = "trace")]
474    pub async fn chain_state_view(
475        &self,
476    ) -> Result<crate::worker::ChainStateViewReadGuard<Env::Storage>, LocalNodeError> {
477        self.client.local_node.chain_state_view(self.chain_id).await
478    }
479
480    /// Returns chain IDs that this chain subscribes to.
481    #[instrument(level = "trace", skip(self))]
482    pub async fn event_stream_publishers(
483        &self,
484    ) -> Result<BTreeMap<ChainId, BTreeSet<StreamId>>, LocalNodeError> {
485        let subscriptions = self
486            .client
487            .local_node
488            .get_event_subscriptions(self.chain_id)
489            .await?;
490        let mut publishers = subscriptions.into_iter().fold(
491            BTreeMap::<ChainId, BTreeSet<StreamId>>::new(),
492            |mut map, ((chain_id, stream_id), _)| {
493                map.entry(chain_id).or_default().insert(stream_id);
494                map
495            },
496        );
497        if self.chain_id != self.client.admin_chain_id {
498            publishers.insert(
499                self.client.admin_chain_id,
500                vec![
501                    StreamId::system(EPOCH_STREAM_NAME),
502                    StreamId::system(REMOVED_EPOCH_STREAM_NAME),
503                ]
504                .into_iter()
505                .collect(),
506            );
507        }
508        Ok(publishers)
509    }
510
511    /// Subscribes to notifications from this client's chain.
512    #[instrument(level = "trace")]
513    pub fn subscribe(&self) -> Result<NotificationStream, LocalNodeError> {
514        self.subscribe_to(self.chain_id)
515    }
516
517    /// Subscribes to notifications from the specified chain.
518    #[instrument(level = "trace")]
519    pub fn subscribe_to(&self, chain_id: ChainId) -> Result<NotificationStream, LocalNodeError> {
520        Ok(Box::pin(UnboundedReceiverStream::new(
521            self.client.notifier.subscribe(vec![chain_id]),
522        )))
523    }
524
525    /// Returns the storage client used by this client's local node.
526    #[instrument(level = "trace")]
527    pub fn storage_client(&self) -> &Env::Storage {
528        self.client.storage_client()
529    }
530
531    /// Obtains the basic `ChainInfo` data for the local chain.
532    #[instrument(level = "trace")]
533    pub async fn chain_info(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
534        let query = ChainInfoQuery::new(self.chain_id);
535        let response = self
536            .client
537            .local_node
538            .handle_chain_info_query(query)
539            .await?;
540        Ok(response.info)
541    }
542
543    /// Obtains the basic `ChainInfo` data for the local chain, with chain manager values.
544    #[instrument(level = "trace")]
545    pub async fn chain_info_with_manager_values(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
546        let query = ChainInfoQuery::new(self.chain_id).with_manager_values();
547        let response = self
548            .client
549            .local_node
550            .handle_chain_info_query(query)
551            .await?;
552        Ok(response.info)
553    }
554
555    /// Returns a fingerprint of the chain manager's observable state: height,
556    /// current round, locking block round, and timeout certificate round.
557    ///
558    /// [`Self::process_pending_block_without_prepare`] takes one snapshot just before
559    /// each network call (re-taking it after our own local proposal handling) and
560    /// compares against the post-call state. A change means a per-validator updater
561    /// absorbed something we didn't have. The local node verifies signatures on
562    /// anything it stores, so a single dishonest validator can't fake this.
563    async fn consensus_state_snapshot(&self) -> Result<ConsensusStateSnapshot, Error> {
564        let info = self.chain_info_with_manager_values().await?;
565        let lock_round = info
566            .manager
567            .requested_locking
568            .as_deref()
569            .map(LockingBlock::round);
570        let timeout_round = info.manager.timeout.as_deref().map(|cert| cert.round);
571        Ok(ConsensusStateSnapshot {
572            next_block_height: info.next_block_height,
573            current_round: info.manager.current_round,
574            lock_round,
575            timeout_round,
576        })
577    }
578
579    /// Returns the chain's description. Fetches it from the validators if necessary.
580    pub async fn get_chain_description(&self) -> Result<ChainDescription, Error> {
581        self.client.get_chain_description(self.chain_id).await
582    }
583
584    /// Obtains up to `self.options.max_pending_message_bundles` pending message bundles for the
585    /// local chain.
586    #[instrument(level = "trace")]
587    async fn pending_message_bundles(&self) -> Result<Vec<IncomingBundle>, Error> {
588        if self.options.message_policy.is_ignore() {
589            // Ignore all messages.
590            return Ok(Vec::new());
591        }
592
593        let query = ChainInfoQuery::new(self.chain_id).with_pending_message_bundles();
594        let info = self
595            .client
596            .local_node
597            .handle_chain_info_query(query)
598            .await?
599            .info;
600        if self.preferred_owner.is_some_and(|owner| {
601            info.manager
602                .ownership
603                .is_super_owner_no_regular_owners(&owner)
604        }) {
605            // There are only super owners; they are expected to sync manually.
606            ensure!(
607                info.next_block_height >= self.initial_next_block_height,
608                Error::WalletSynchronizationError
609            );
610        }
611
612        let skipped = self.skipped_origins.pin();
613        let mut bundles = info
614            .requested_pending_message_bundles
615            .into_iter()
616            .filter_map(|bundle| bundle.apply_policy(&self.options.message_policy))
617            .filter(|bundle| !skipped.contains(&bundle.origin))
618            .collect::<Vec<_>>();
619        let priority_origins = &self.options.priority_bundle_origins;
620        bundles.sort_by(|a, b| {
621            let a_priority = priority_origins.contains(&a.origin);
622            let b_priority = priority_origins.contains(&b.origin);
623            b_priority
624                .cmp(&a_priority)
625                .then(a.bundle.timestamp.cmp(&b.bundle.timestamp))
626        });
627        bundles.truncate(self.options.max_pending_message_bundles);
628        Ok(bundles)
629    }
630
631    #[instrument(level = "trace")]
632    async fn collect_stream_updates(&self) -> Result<Vec<Operation>, Error> {
633        let subscription_map = self
634            .client
635            .local_node
636            .get_event_subscriptions(self.chain_id)
637            .await?;
638        let futures = subscription_map
639            .into_iter()
640            .filter(|((chain_id, _), _)| {
641                self.options
642                    .message_policy
643                    .restrict_chain_ids_to
644                    .as_ref()
645                    .is_none_or(|chain_set| chain_set.contains(chain_id))
646            })
647            .filter(|((_, stream_id), _)| {
648                self.options
649                    .message_policy
650                    .process_events_from_application_ids
651                    .as_ref()
652                    .is_none_or(|app_set| app_set.contains(&stream_id.application_id))
653            })
654            .map(|((chain_id, stream_id), subscriptions)| {
655                let client = self.client.clone();
656                async move {
657                    let next_expected_index = client
658                        .local_node
659                        .get_next_expected_event(chain_id, stream_id.clone())
660                        .await?;
661                    let Some(next_index) = next_expected_index
662                        .filter(|next_index| *next_index > subscriptions.min_next_index)
663                    else {
664                        return Ok::<_, Error>(Vec::new());
665                    };
666                    Ok(subscriptions
667                        .applications
668                        .into_iter()
669                        .filter(|(_, app_index)| *app_index < next_index)
670                        .map(|(application_id, _)| {
671                            SystemOperation::UpdateStream {
672                                application_id,
673                                chain_id,
674                                stream_id: stream_id.clone(),
675                                next_index,
676                            }
677                            .into()
678                        })
679                        .collect::<Vec<Operation>>())
680                }
681            });
682        Ok(futures::stream::iter(futures)
683            .buffer_unordered(self.options.max_joined_tasks)
684            .try_collect::<Vec<_>>()
685            .await?
686            .into_iter()
687            .flatten()
688            .collect::<Vec<_>>())
689    }
690
691    /// Obtains the committee for the current epoch of the local chain.
692    #[instrument(level = "trace")]
693    pub async fn local_committee(&self) -> Result<Arc<Committee>, Error> {
694        let info = match self.client.local_node.chain_info(self.chain_id).await {
695            Ok(info) => info,
696            Err(LocalNodeError::BlobsNotFound(_)) => {
697                self.synchronize_chain_state(self.chain_id).await?;
698                self.client.local_node.chain_info(self.chain_id).await?
699            }
700            Err(err) => return Err(err.into()),
701        };
702        let hash = info
703            .committee_hash
704            .ok_or(LocalNodeError::InactiveChain(self.chain_id))?;
705        Ok(self
706            .storage_client()
707            .get_or_load_committee_by_hash(hash)
708            .await
709            .map_err(LocalNodeError::from)?)
710    }
711
712    /// Obtains the committee for the latest epoch on the admin chain.
713    #[instrument(level = "trace")]
714    pub async fn admin_committee(&self) -> Result<(Epoch, Arc<Committee>), LocalNodeError> {
715        self.client.admin_committee().await
716    }
717
718    /// Obtains the identity of the current owner of the chain.
719    ///
720    /// Returns an error if we don't have the private key for the identity.
721    #[instrument(level = "trace")]
722    pub async fn identity(&self) -> Result<AccountOwner, Error> {
723        let Some(preferred_owner) = self.preferred_owner else {
724            return Err(Error::NoAccountKeyConfigured(self.chain_id));
725        };
726        let manager = self.chain_info().await?.manager;
727        ensure!(
728            manager.ownership.is_active(),
729            LocalNodeError::InactiveChain(self.chain_id)
730        );
731        let fallback_owners = if manager.ownership.has_fallback() {
732            self.local_committee()
733                .await?
734                .account_keys_and_weights()
735                .map(|(key, _)| AccountOwner::from(key))
736                .collect()
737        } else {
738            BTreeSet::new()
739        };
740
741        let is_owner = manager
742            .ownership
743            .can_propose_in_multi_leader_round(&preferred_owner)
744            || fallback_owners.contains(&preferred_owner);
745
746        if !is_owner {
747            warn!(
748                chain_id = %self.chain_id,
749                ownership = ?manager.ownership,
750                ?fallback_owners,
751                ?preferred_owner,
752                "The preferred owner is not configured as an owner of this chain",
753            );
754            return Err(Error::NotAnOwner(self.chain_id));
755        }
756
757        let has_signer = self.has_key_for(&preferred_owner).await?;
758
759        if !has_signer {
760            warn!(%self.chain_id, ?preferred_owner,
761                "Chain is one of the owners but its Signer instance doesn't contain the key",
762            );
763            return Err(Error::CannotFindKeyForChain(self.chain_id));
764        }
765
766        Ok(preferred_owner)
767    }
768
769    /// Prepares the chain for a new owner by fetching the chain description and validating access.
770    ///
771    /// This is useful when assigning a chain to a client that may not have the owner key,
772    /// e.g. when a faucet creates a chain with `open_multi_leader_rounds`.
773    ///
774    /// Returns the chain info if the owner can propose on this chain (either because they are
775    /// an owner, or because `open_multi_leader_rounds` is enabled).
776    #[instrument(level = "trace")]
777    pub async fn prepare_for_owner(&self, owner: AccountOwner) -> Result<Box<ChainInfo>, Error> {
778        ensure!(
779            self.has_key_for(&owner).await?,
780            Error::CannotFindKeyForChain(self.chain_id)
781        );
782        // Ensure we have the chain description blob.
783        self.client
784            .get_chain_description_blob(self.chain_id)
785            .await?;
786
787        // Get chain info.
788        let info = self.chain_info().await?;
789
790        // Validate that the owner can propose on this chain.
791        ensure!(
792            info.manager
793                .ownership
794                .can_propose_in_multi_leader_round(&owner),
795            Error::NotAnOwner(self.chain_id)
796        );
797
798        Ok(info)
799    }
800
801    /// Prepares the chain for the next operation, i.e. makes sure we have synchronized it up to
802    /// its current height.
803    #[instrument(level = "trace")]
804    pub async fn prepare_chain(&self) -> Result<Box<ChainInfo>, Error> {
805        #[cfg(with_metrics)]
806        let _latency = super::metrics::PREPARE_CHAIN_LATENCY.measure_latency();
807
808        let mut info = self.synchronize_to_known_height().await?;
809
810        if self.preferred_owner.is_none_or(|owner| {
811            !info
812                .manager
813                .ownership
814                .is_super_owner_no_regular_owners(&owner)
815        }) {
816            // If we are not a super owner or there are regular owners, we could be missing recent
817            // certificates created by other clients. Further synchronize blocks from the network.
818            // This is a best-effort that depends on network conditions.
819            info = self.client.synchronize_chain_state(self.chain_id).await?;
820        }
821
822        if info.epoch > self.client.admin_committee().await?.0 {
823            self.client
824                .synchronize_chain_state(self.client.admin_chain_id)
825                .await?;
826        }
827
828        Ok(info)
829    }
830
831    // Verifies that our local storage contains enough history compared to the
832    // known block height. Otherwise, downloads the missing history from the
833    // network.
834    // The known height only differs if the wallet is ahead of storage.
835    async fn synchronize_to_known_height(&self) -> Result<Box<ChainInfo>, Error> {
836        let info = self
837            .client
838            .download_certificates(self.chain_id, self.initial_next_block_height)
839            .await?;
840        if info.next_block_height == self.initial_next_block_height {
841            // Check that our local node has the expected block hash.
842            ensure!(
843                self.initial_block_hash == info.block_hash,
844                Error::InternalError("Invalid chain of blocks in local node")
845            );
846        }
847        Ok(info)
848    }
849
850    /// Attempts to update all validators about the local chain.
851    #[instrument(level = "trace", skip(old_committee, latest_certificate))]
852    pub async fn update_validators(
853        &self,
854        old_committee: Option<&Committee>,
855        latest_certificate: Option<CacheArc<ConfirmedBlockCertificate>>,
856    ) -> Result<(), Error> {
857        let update_validators_start = linera_base::time::Instant::now();
858        // Communicate the new certificate now.
859        if let Some(old_committee) = old_committee {
860            self.communicate_chain_updates(old_committee, latest_certificate.clone())
861                .await?
862        };
863        if let Ok(new_committee) = self.local_committee().await {
864            if old_committee.is_none_or(|old| *new_committee != *old) {
865                // If the configuration just changed, communicate to the new committee as well.
866                // (This is actually more important that updating the previous committee.)
867                self.communicate_chain_updates(&new_committee, latest_certificate)
868                    .await?;
869            }
870        }
871        self.send_timing(update_validators_start, TimingType::UpdateValidators);
872        Ok(())
873    }
874
875    /// Broadcasts certified blocks to validators.
876    #[instrument(level = "trace", skip(committee))]
877    pub async fn communicate_chain_updates(
878        &self,
879        committee: &Committee,
880        latest_certificate: Option<CacheArc<ConfirmedBlockCertificate>>,
881    ) -> Result<(), Error> {
882        let delivery = self.options.cross_chain_message_delivery;
883        let height = self.chain_info().await?.next_block_height;
884        self.client
885            .communicate_chain_updates(
886                committee,
887                self.chain_id,
888                height,
889                delivery,
890                latest_certificate,
891            )
892            .await
893    }
894
895    /// Synchronizes all chains that any application on this chain subscribes to.
896    /// We always consider the admin chain a relevant publishing chain, for new epochs.
897    /// For event publisher chains, only event-bearing blocks are downloaded (partial sync).
898    async fn synchronize_publisher_chains(&self) -> Result<(), Error> {
899        let subscriptions = self
900            .client
901            .local_node
902            .get_event_subscriptions(self.chain_id)
903            .await?;
904        // Group subscribed streams by publisher chain.
905        let mut streams_by_chain = BTreeMap::<ChainId, BTreeSet<StreamId>>::new();
906        for ((chain_id, stream_id), _) in &subscriptions {
907            if *chain_id != self.chain_id {
908                streams_by_chain
909                    .entry(*chain_id)
910                    .or_default()
911                    .insert(stream_id.clone());
912            }
913        }
914        // Always fully sync the admin chain for epoch changes.
915        let admin_chain_id = self.client.admin_chain_id;
916        if admin_chain_id != self.chain_id {
917            self.client.synchronize_chain_state(admin_chain_id).await?;
918        }
919        // For event publisher chains, do a partial sync using previous_event_blocks.
920        let (_, committee) = self.admin_committee().await?;
921        let nodes = self.client.make_nodes(&committee)?;
922        let tasks = streams_by_chain
923            .into_iter()
924            .filter(|(chain_id, _)| *chain_id != admin_chain_id)
925            .map(|(chain_id, stream_ids)| {
926                self.sync_publisher_chain_events(chain_id, stream_ids, &nodes, &committee)
927            })
928            .collect::<Vec<_>>();
929        stream::iter(tasks)
930            .buffer_unordered(self.options.max_joined_tasks)
931            .collect::<Vec<_>>()
932            .await
933            .into_iter()
934            .collect::<Result<Vec<_>, _>>()?;
935        Ok(())
936    }
937
938    /// Downloads only event-bearing blocks for the given publisher chain and streams.
939    ///
940    /// Uses `communicate_with_quorum` so that each validator is queried for
941    /// `previous_event_blocks` and the claimed blocks are downloaded from that same
942    /// validator. Waiting for a quorum guarantees that any confirmed event is discovered,
943    /// since at least one honest validator in the quorum must have it.
944    async fn sync_publisher_chain_events(
945        &self,
946        publisher_chain_id: ChainId,
947        stream_ids: BTreeSet<StreamId>,
948        nodes: &[RemoteNode<Env::ValidatorNode>],
949        committee: &Committee,
950    ) -> Result<(), Error> {
951        let stream_ids_ref = &stream_ids;
952        communicate_with_quorum(
953            nodes,
954            committee,
955            |_: &()| (),
956            |remote_node| async move {
957                self.client
958                    .sync_events_from_node(publisher_chain_id, stream_ids_ref, &remote_node)
959                    .await
960            },
961            self.options.quorum_grace_period,
962        )
963        .await?;
964        Ok(())
965    }
966
967    /// Attempts to download new received certificates.
968    ///
969    /// This is a best effort: it will only find certificates that have been confirmed
970    /// amongst sufficiently many validators of the current committee of the target
971    /// chain.
972    ///
973    /// However, this should be the case whenever a sender's chain is still in use and
974    /// is regularly upgraded to new committees.
975    #[instrument(level = "debug", skip(self), fields(chain_id = %self.chain_id))]
976    pub async fn find_received_certificates(&self) -> Result<(), Error> {
977        debug!("starting find_received_certificates");
978        #[cfg(with_metrics)]
979        let _latency = super::metrics::FIND_RECEIVED_CERTIFICATES_LATENCY.measure_latency();
980        // Use network information from the local chain.
981        let chain_id = self.chain_id;
982        let (_, committee) = self.admin_committee().await?;
983        let nodes = self.client.make_nodes(&committee)?;
984
985        let trackers = self
986            .client
987            .local_node
988            .get_received_certificate_trackers(chain_id)
989            .await?;
990
991        trace!("find_received_certificates: read trackers");
992
993        let received_log_batches = Arc::new(std::sync::Mutex::new(Vec::new()));
994        // Proceed to downloading received logs.
995        let result = communicate_with_quorum(
996            &nodes,
997            &committee,
998            |_| (),
999            |remote_node| {
1000                let client = &self.client;
1001                let tracker = trackers.get(&remote_node.public_key).copied().unwrap_or(0);
1002                let received_log_batches = Arc::clone(&received_log_batches);
1003                Box::pin(async move {
1004                    let batch = client
1005                        .get_received_log_from_validator(chain_id, &remote_node, tracker)
1006                        .await?;
1007                    let mut batches = received_log_batches.lock().unwrap();
1008                    batches.push((remote_node.public_key, batch));
1009                    Ok(())
1010                })
1011            },
1012            self.options.quorum_grace_period,
1013        )
1014        .await;
1015
1016        if let Err(error) = result {
1017            error!(
1018                %error,
1019                "Failed to synchronize received_logs from at least a quorum of validators",
1020            );
1021        }
1022
1023        let received_logs: Vec<_> = {
1024            let mut received_log_batches = received_log_batches.lock().unwrap();
1025            std::mem::take(received_log_batches.as_mut())
1026        };
1027
1028        debug!(
1029            received_logs_len = %received_logs.len(),
1030            received_logs_total = %received_logs.iter().map(|x| x.1.len()).sum::<usize>(),
1031            "collected received logs"
1032        );
1033
1034        let (received_logs, mut validator_trackers) = {
1035            (
1036                ReceivedLogs::from_received_result(received_logs.clone()),
1037                ValidatorTrackers::new(received_logs, &trackers),
1038            )
1039        };
1040
1041        debug!(
1042            num_chains = %received_logs.num_chains(),
1043            num_certs = %received_logs.num_certs(),
1044            "find_received_certificates: total number of chains and certificates to sync",
1045        );
1046
1047        let max_blocks_per_chain =
1048            self.options.sender_certificate_download_batch_size / self.options.max_joined_tasks * 2;
1049        for received_log in received_logs.into_batches(
1050            self.options.sender_certificate_download_batch_size,
1051            max_blocks_per_chain,
1052        ) {
1053            validator_trackers = self
1054                .receive_sender_certificates(received_log, validator_trackers, &nodes)
1055                .await?;
1056
1057            self.update_received_certificate_trackers(&validator_trackers)
1058                .await;
1059        }
1060
1061        info!("find_received_certificates finished");
1062
1063        Ok(())
1064    }
1065
1066    async fn update_received_certificate_trackers(&self, trackers: &ValidatorTrackers) {
1067        let updated_trackers = trackers.to_map();
1068        trace!(?updated_trackers, "updated tracker values");
1069
1070        // Update the trackers.
1071        if let Err(error) = self
1072            .client
1073            .local_node
1074            .update_received_certificate_trackers(self.chain_id, updated_trackers)
1075            .await
1076        {
1077            error!(
1078                chain_id = %self.chain_id,
1079                %error,
1080                "Failed to update the certificate trackers",
1081            );
1082        }
1083    }
1084
1085    /// Downloads and processes or preprocesses the certificates for blocks sending messages to
1086    /// this chain that we are still missing.
1087    async fn receive_sender_certificates(
1088        &self,
1089        mut received_logs: ReceivedLogs,
1090        mut validator_trackers: ValidatorTrackers,
1091        nodes: &[RemoteNode<Env::ValidatorNode>],
1092    ) -> Result<ValidatorTrackers, Error> {
1093        debug!(
1094            num_chains = %received_logs.num_chains(),
1095            num_certs = %received_logs.num_certs(),
1096            "receive_sender_certificates: number of chains and certificates to sync",
1097        );
1098
1099        // Obtain the next block height we need in the local node, for each chain.
1100        let local_next_heights = self
1101            .client
1102            .local_node
1103            .next_outbox_heights(received_logs.chains(), self.chain_id)
1104            .await?;
1105
1106        validator_trackers.filter_out_already_known(&mut received_logs, &local_next_heights);
1107
1108        debug!(
1109            remaining_total_certificates = %received_logs.num_certs(),
1110            "receive_sender_certificates: computed remote_heights"
1111        );
1112
1113        let mut other_sender_chains = Vec::new();
1114        let (sender, mut receiver) = mpsc::unbounded_channel::<ChainAndHeight>();
1115
1116        let cert_futures = received_logs.heights_per_chain().into_iter().filter_map({
1117            let received_logs = &received_logs;
1118            let other_sender_chains = &mut other_sender_chains;
1119
1120            move |(sender_chain_id, remote_heights)| {
1121                if remote_heights.is_empty() {
1122                    // Our highest, locally executed block is higher than any block height
1123                    // from the current batch. Skip this batch, but remember to wait for
1124                    // the messages to be delivered to the inboxes.
1125                    other_sender_chains.push(sender_chain_id);
1126                    return None;
1127                };
1128                let remote_heights = remote_heights.into_iter().collect::<Vec<_>>();
1129                let sender = sender.clone();
1130                let client = self.client.clone();
1131                let nodes = nodes.to_vec();
1132                Some(async move {
1133                    client
1134                        .download_and_process_sender_chain(
1135                            sender_chain_id,
1136                            &nodes,
1137                            received_logs,
1138                            remote_heights,
1139                            sender,
1140                        )
1141                        .await
1142                })
1143            }
1144        });
1145
1146        future::join(
1147            stream::iter(cert_futures)
1148                .buffer_unordered(self.options.max_joined_tasks)
1149                .collect::<()>(),
1150            async {
1151                while let Some(chain_and_height) = receiver.recv().await {
1152                    validator_trackers.downloaded_cert(chain_and_height);
1153                }
1154            },
1155        )
1156        .await;
1157
1158        debug!(
1159            num_other_chains = %other_sender_chains.len(),
1160            "receive_sender_certificates: processing certificates finished"
1161        );
1162
1163        // Certificates for these chains were omitted from `certificates` because they were
1164        // already processed locally. If they were processed in a concurrent task, it is not
1165        // guaranteed that their cross-chain messages were already handled.
1166        self.retry_pending_cross_chain_requests_from_sender_chains(nodes, other_sender_chains)
1167            .await;
1168
1169        debug!("receive_sender_certificates: finished processing other_sender_chains");
1170
1171        Ok(validator_trackers)
1172    }
1173
1174    /// Retries cross chain requests on the chains which may have been processed on
1175    /// another task without the messages being correctly handled. Fetches missing blobs from
1176    /// the given nodes if necessary.
1177    async fn retry_pending_cross_chain_requests_from_sender_chains(
1178        &self,
1179        nodes: &[RemoteNode<Env::ValidatorNode>],
1180        other_sender_chains: Vec<ChainId>,
1181    ) {
1182        let stream = other_sender_chains
1183            .into_iter()
1184            .map(|chain_id| async move {
1185                if let Err(error) = match self
1186                    .client
1187                    .retry_pending_cross_chain_requests(chain_id)
1188                    .await
1189                {
1190                    Ok(()) => Ok(()),
1191                    Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1192                        if let Err(error) = self
1193                            .client
1194                            .update_local_node_with_blobs_from(blob_ids.clone(), nodes)
1195                            .await
1196                        {
1197                            error!(
1198                                ?blob_ids,
1199                                %error,
1200                                "Error while attempting to download blobs during retrying outgoing \
1201                                messages"
1202                            );
1203                        }
1204                        self.client
1205                            .retry_pending_cross_chain_requests(chain_id)
1206                            .await
1207                    }
1208                    err => err,
1209                } {
1210                    error!(
1211                        %chain_id,
1212                        %error,
1213                        "Failed to retry outgoing messages from chain"
1214                    );
1215                }
1216            })
1217            .collect::<FuturesUnordered<_>>();
1218        stream.for_each(future::ready).await;
1219    }
1220
1221    /// Sends money.
1222    #[instrument(level = "trace")]
1223    pub async fn transfer(
1224        &self,
1225        owner: AccountOwner,
1226        amount: Amount,
1227        recipient: Account,
1228    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1229        // TODO(#467): check the balance of `owner` before signing any block proposal.
1230        self.execute_operation(SystemOperation::Transfer {
1231            owner,
1232            recipient,
1233            amount,
1234        })
1235        .await
1236    }
1237
1238    /// Verify if a data blob is readable from storage.
1239    // TODO(#2490): Consider removing or renaming this.
1240    #[instrument(level = "trace")]
1241    pub async fn read_data_blob(
1242        &self,
1243        hash: CryptoHash,
1244    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1245        let blob_id = BlobId {
1246            hash,
1247            blob_type: BlobType::Data,
1248        };
1249        self.execute_operation(SystemOperation::VerifyBlob { blob_id })
1250            .await
1251    }
1252
1253    /// Claims money in a remote chain.
1254    #[instrument(level = "trace")]
1255    pub async fn claim(
1256        &self,
1257        owner: AccountOwner,
1258        target_id: ChainId,
1259        recipient: Account,
1260        amount: Amount,
1261    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1262        self.execute_operation(SystemOperation::Claim {
1263            owner,
1264            target_id,
1265            recipient,
1266            amount,
1267        })
1268        .await
1269    }
1270
1271    /// Requests a leader timeout vote from all validators. If a quorum signs it, creates a
1272    /// certificate and sends it to all validators, to make them enter the next round.
1273    #[instrument(level = "trace")]
1274    pub async fn request_leader_timeout(&self) -> Result<TimeoutCertificate, Error> {
1275        let chain_id = self.chain_id;
1276        let info = self.chain_info().await?;
1277        let committee = self.local_committee().await?;
1278        let height = info.next_block_height;
1279        let round = info.manager.current_round;
1280        let action = CommunicateAction::RequestTimeout {
1281            height,
1282            round,
1283            chain_id,
1284        };
1285        let value = Timeout::new(chain_id, height, info.epoch);
1286        let certificate = Box::new(
1287            self.client
1288                .communicate_chain_action(&committee, action, value)
1289                .await?,
1290        );
1291        self.client.handle_certificate(*certificate.clone()).await?;
1292        // The block height didn't increase, but this will communicate the timeout as well.
1293        self.client
1294            .communicate_chain_updates(
1295                &committee,
1296                chain_id,
1297                height,
1298                CrossChainMessageDelivery::NonBlocking,
1299                None,
1300            )
1301            .await?;
1302        Ok(*certificate)
1303    }
1304
1305    /// Downloads and processes any certificates we are missing for the given chain.
1306    #[instrument(level = "trace", skip_all)]
1307    pub async fn synchronize_chain_state(
1308        &self,
1309        chain_id: ChainId,
1310    ) -> Result<Box<ChainInfo>, Error> {
1311        self.client.synchronize_chain_state(chain_id).await
1312    }
1313
1314    /// Downloads and processes any certificates we are missing for this chain, from the given
1315    /// committee.
1316    #[instrument(level = "trace", skip_all)]
1317    pub async fn synchronize_chain_state_from_committee(
1318        &self,
1319        committee: Arc<Committee>,
1320    ) -> Result<Box<ChainInfo>, Error> {
1321        self.client
1322            .synchronize_chain_from_committee(self.chain_id, committee)
1323            .await
1324    }
1325
1326    /// Executes a list of operations.
1327    #[instrument(level = "trace", skip(operations, blobs))]
1328    pub async fn execute_operations(
1329        &self,
1330        operations: Vec<Operation>,
1331        blobs: Vec<Blob>,
1332    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1333        let timing_start = linera_base::time::Instant::now();
1334
1335        let result = loop {
1336            let execute_block_start = linera_base::time::Instant::now();
1337            // TODO(#2066): Remove boxing once the call-stack is shallower
1338            match Box::pin(self.execute_block(operations.clone(), blobs.clone())).await {
1339                Ok(ClientOutcome::Committed(certificate)) => {
1340                    self.send_timing(execute_block_start, TimingType::ExecuteBlock);
1341                    break Ok(ClientOutcome::Committed(certificate));
1342                }
1343                Ok(ClientOutcome::WaitForTimeout(timeout)) => {
1344                    break Ok(ClientOutcome::WaitForTimeout(timeout));
1345                }
1346                Ok(ClientOutcome::Conflict(certificate)) => {
1347                    info!(
1348                        height = %certificate.block().header.height,
1349                        "Another block was committed."
1350                    );
1351                    break Ok(ClientOutcome::Conflict(certificate));
1352                }
1353                Err(Error::CommunicationError(CommunicationError::Trusted(
1354                    NodeError::UnexpectedBlockHeight {
1355                        expected_block_height,
1356                        found_block_height,
1357                    },
1358                ))) if expected_block_height > found_block_height => {
1359                    tracing::info!(
1360                        chain_id = %self.chain_id,
1361                        "Local state is outdated; synchronizing chain"
1362                    );
1363                    self.synchronize_chain_state(self.chain_id).await?;
1364                }
1365                Err(err) => return Err(err),
1366            };
1367        };
1368
1369        self.send_timing(timing_start, TimingType::ExecuteOperations);
1370
1371        result
1372    }
1373
1374    /// Executes an operation.
1375    pub async fn execute_operation(
1376        &self,
1377        operation: impl Into<Operation>,
1378    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1379        self.execute_operations(vec![operation.into()], vec![])
1380            .await
1381    }
1382
1383    /// Executes a new block.
1384    ///
1385    /// This must be preceded by a call to `prepare_chain()`.
1386    #[instrument(level = "trace", skip(operations, blobs))]
1387    async fn execute_block(
1388        &self,
1389        operations: Vec<Operation>,
1390        blobs: Vec<Blob>,
1391    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1392        #[cfg(with_metrics)]
1393        let _latency = super::metrics::EXECUTE_BLOCK_LATENCY.measure_latency();
1394
1395        let mutex = self.proposal_mutex();
1396        let lock_start = linera_base::time::Instant::now();
1397        let mut proposal_guard = mutex.lock_owned().await;
1398        tracing::debug!(
1399            chain_id = %self.chain_id,
1400            lock_wait_ms = lock_start.elapsed().as_millis(),
1401            "acquired proposal_mutex in execute_block"
1402        );
1403        // TODO(#5092): We shouldn't need to call this explicitly.
1404        // Process any leftover pending proposal from a previous interrupted call.
1405        // Even if there is no pending proposal, this still calls
1406        // `request_leader_timeout_if_needed` which ensures the local chain state
1407        // is synchronized with the current consensus round.
1408        match self
1409            .process_pending_block_without_prepare(&mut proposal_guard)
1410            .await?
1411        {
1412            ClientOutcome::Committed(Some(certificate)) => {
1413                return Ok(self.classify_committed(certificate, &operations));
1414            }
1415            ClientOutcome::WaitForTimeout(timeout) => {
1416                return Ok(ClientOutcome::WaitForTimeout(timeout))
1417            }
1418            ClientOutcome::Conflict(certificate) => {
1419                return Ok(ClientOutcome::Conflict(certificate))
1420            }
1421            ClientOutcome::Committed(None) => {}
1422        }
1423
1424        loop {
1425            // Collect pending messages and epoch changes after acquiring the lock to avoid
1426            // race conditions where messages valid for one block height are proposed at a
1427            // different height. This is recomputed on every retry because the set of
1428            // receivable messages and epoch changes can differ at a new height.
1429            let transactions = self
1430                .prepend_epochs_messages_and_events(operations.clone())
1431                .await?;
1432
1433            if transactions.is_empty() {
1434                return Err(Error::LocalNodeError(LocalNodeError::WorkerError(
1435                    WorkerError::ChainError(Box::new(ChainError::EmptyBlock)),
1436                )));
1437            }
1438
1439            self.new_pending_block(transactions, blobs.clone(), &mut proposal_guard)
1440                .await?;
1441
1442            match self
1443                .process_pending_block_without_prepare(&mut proposal_guard)
1444                .await?
1445            {
1446                ClientOutcome::Committed(Some(certificate)) => {
1447                    return Ok(self.classify_committed(certificate, &operations));
1448                }
1449                // `process_pending_block_without_prepare` cleared the pending proposal
1450                // without committing ours, so our operations were not applied. This happens
1451                // in two ways:
1452                //   * the chain advanced past the height we staged at while we were
1453                //     proposing (e.g. a notification or background sync committed another
1454                //     block at our height) — the common #5664 case; or
1455                //   * the staged proposal's authenticated signer's key is no longer held
1456                //     (the preferred owner changed since we staged), so the stale proposal
1457                //     was discarded.
1458                // Either way, re-stage and retry: `new_pending_block` recomputes the block
1459                // at the current height and signs as the current identity, so the first
1460                // case advances to the new height (genuine external progress) and the
1461                // second adopts a signer we hold, so the retry converges. See #5664.
1462                ClientOutcome::Committed(None) => {
1463                    tracing::debug!(
1464                        chain_id = %self.chain_id,
1465                        "pending proposal cleared without committing ours; re-staging and retrying"
1466                    );
1467                    continue;
1468                }
1469                ClientOutcome::WaitForTimeout(timeout) => {
1470                    return Ok(ClientOutcome::WaitForTimeout(timeout));
1471                }
1472                ClientOutcome::Conflict(certificate) => {
1473                    return Ok(ClientOutcome::Conflict(certificate));
1474                }
1475            }
1476        }
1477    }
1478
1479    /// Returns `Committed` if the committed block reflects our request — same
1480    /// authenticated owner, and our `operations`. All other transactions must be ones that
1481    /// are added automatically, to process messages, streams or new epochs.
1482    fn classify_committed(
1483        &self,
1484        certificate: ConfirmedBlockCertificate,
1485        operations: &[Operation],
1486    ) -> ClientOutcome<ConfirmedBlockCertificate> {
1487        let block = certificate.block();
1488        if self.preferred_owner.is_none()
1489            || block.header.authenticated_owner != self.preferred_owner
1490        {
1491            return ClientOutcome::Conflict(Box::new(certificate));
1492        }
1493        let mut operations_iter = operations.iter().peekable();
1494        for tx in &block.body.transactions {
1495            let is_expected = match tx {
1496                Transaction::ReceiveMessages(_) => true,
1497                Transaction::ExecuteOperation(op) if Some(&op) == operations_iter.peek() => {
1498                    operations_iter.next();
1499                    true
1500                }
1501                Transaction::ExecuteOperation(Operation::System(op)) => matches!(
1502                    **op,
1503                    SystemOperation::ProcessNewEpoch(_) | SystemOperation::UpdateStream { .. }
1504                ),
1505                Transaction::ExecuteOperation(Operation::User { .. }) => false,
1506            };
1507            if !is_expected {
1508                return ClientOutcome::Conflict(Box::new(certificate));
1509            }
1510        }
1511        if operations_iter.next().is_some() {
1512            ClientOutcome::Conflict(Box::new(certificate))
1513        } else {
1514            ClientOutcome::Committed(certificate)
1515        }
1516    }
1517
1518    /// Creates a vector of transactions which, in addition to the provided operations,
1519    /// also contains epoch changes, receiving message bundles and event stream updates
1520    /// (if there are any to be processed).
1521    /// This should be called when executing a block, in order to make sure that any pending
1522    /// messages or events are included in it.
1523    #[instrument(level = "trace", skip(operations))]
1524    async fn prepend_epochs_messages_and_events(
1525        &self,
1526        operations: Vec<Operation>,
1527    ) -> Result<Vec<Transaction>, Error> {
1528        let incoming_bundles = self.pending_message_bundles().await?;
1529        let stream_updates = self.collect_stream_updates().await?;
1530        Ok(self
1531            .collect_epoch_changes()
1532            .await?
1533            .into_iter()
1534            .map(Transaction::ExecuteOperation)
1535            .chain(
1536                incoming_bundles
1537                    .into_iter()
1538                    .map(Transaction::ReceiveMessages),
1539            )
1540            .chain(
1541                stream_updates
1542                    .into_iter()
1543                    .map(Transaction::ExecuteOperation),
1544            )
1545            .chain(operations.into_iter().map(Transaction::ExecuteOperation))
1546            .collect::<Vec<_>>())
1547    }
1548
1549    /// Creates a new pending block and stores it in `proposal_guard`.
1550    ///
1551    /// The caller must hold the proposal mutex. The pending proposal is written directly
1552    /// into the guard so that it is always synchronized with the mutex.
1553    #[instrument(level = "trace", skip(transactions, blobs, proposal_guard))]
1554    async fn new_pending_block(
1555        &self,
1556        transactions: Vec<Transaction>,
1557        blobs: Vec<Blob>,
1558        proposal_guard: &mut Option<PendingProposal>,
1559    ) -> Result<Block, Error> {
1560        let identity = self.identity().await?;
1561
1562        ensure!(
1563            proposal_guard.is_none(),
1564            Error::BlockProposalError(
1565                "Client state already has a pending block; \
1566                use the `linera retry-pending-block` command to commit that first"
1567            )
1568        );
1569        let info = self.chain_info().await?;
1570        let timestamp = self.next_timestamp(&transactions, info.timestamp);
1571        let proposed_block = ProposedBlock {
1572            epoch: info.epoch,
1573            chain_id: self.chain_id,
1574            transactions,
1575            previous_block_hash: info.block_hash,
1576            height: info.next_block_height,
1577            authenticated_owner: Some(identity),
1578            timestamp,
1579        };
1580
1581        let round = self.round_for_oracle(&info, &identity).await?;
1582        // Make sure every incoming message succeeds and otherwise remove them.
1583        // Also, compute the final certified hash while we're at it.
1584        let (block, _, never_reject_origins) = self
1585            .client
1586            .stage_block_execution(
1587                proposed_block,
1588                round,
1589                blobs.clone(),
1590                self.options.bundle_execution_policy(),
1591            )
1592            .await?;
1593        // Record origins whose bundles were discarded due to the never-reject policy so
1594        // that `process_inbox` stops retrying them until the client is restarted.
1595        if !never_reject_origins.is_empty() {
1596            let skipped = self.skipped_origins.pin();
1597            for origin in never_reject_origins {
1598                skipped.insert(origin);
1599            }
1600        }
1601        let (proposed_block, auto_retry_outcome) = block.clone().into_proposal();
1602        *proposal_guard = Some(PendingProposal {
1603            block: proposed_block,
1604            blobs,
1605            auto_retry_outcome: Some(auto_retry_outcome),
1606            round: None,
1607        });
1608        Ok(block)
1609    }
1610
1611    /// Returns a suitable timestamp for the next block.
1612    ///
1613    /// This will usually be the current time according to the local clock, but may be slightly
1614    /// ahead to make sure it's not earlier than the incoming messages or the previous block.
1615    #[instrument(level = "trace", skip(transactions))]
1616    fn next_timestamp(&self, transactions: &[Transaction], block_time: Timestamp) -> Timestamp {
1617        let local_time = self.storage_client().clock().current_time();
1618        transactions
1619            .iter()
1620            .filter_map(Transaction::incoming_bundle)
1621            .map(|msg| msg.bundle.timestamp)
1622            .max()
1623            .map_or(local_time, |timestamp| timestamp.max(local_time))
1624            .max(block_time)
1625    }
1626
1627    /// Queries an application.
1628    #[instrument(level = "trace", skip(query))]
1629    pub async fn query_application(
1630        &self,
1631        query: Query,
1632        block_hash: Option<CryptoHash>,
1633    ) -> Result<(QueryOutcome, BlockHeight), Error> {
1634        let mut downloaded_blobs = HashSet::<BlobId>::new();
1635        let mut events = super::EventSetDownloader::new(&self.client);
1636        loop {
1637            let result = self
1638                .client
1639                .local_node
1640                .query_application(self.chain_id, query.clone(), block_hash)
1641                .await;
1642            if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1643                let new_blobs = super::filter_new(blob_ids, &downloaded_blobs);
1644                if !new_blobs.is_empty() {
1645                    let validators = self.client.validator_nodes().await?;
1646                    self.client
1647                        .update_local_node_with_blobs_from(new_blobs.clone(), &validators)
1648                        .await?;
1649                    downloaded_blobs.extend(new_blobs);
1650                    continue;
1651                }
1652            }
1653            if let Err(LocalNodeError::EventsNotFound(event_ids)) = &result {
1654                if events.download_new(event_ids).await? {
1655                    continue;
1656                }
1657            }
1658            return Ok(result?);
1659        }
1660    }
1661
1662    /// Queries a system application.
1663    #[cfg(with_testing)]
1664    #[instrument(level = "trace", skip(query))]
1665    pub async fn query_system_application(
1666        &self,
1667        query: linera_execution::SystemQuery,
1668    ) -> Result<QueryOutcome<linera_execution::SystemResponse>, Error> {
1669        let (
1670            QueryOutcome {
1671                response,
1672                operations,
1673            },
1674            _,
1675        ) = self.query_application(Query::System(query), None).await?;
1676        match response {
1677            linera_execution::QueryResponse::System(response) => Ok(QueryOutcome {
1678                response,
1679                operations,
1680            }),
1681            _ => Err(Error::InternalError("Unexpected response for system query")),
1682        }
1683    }
1684
1685    /// Queries a user application.
1686    #[instrument(level = "trace", skip(application_id, query))]
1687    #[cfg(with_testing)]
1688    pub async fn query_user_application<A: Abi>(
1689        &self,
1690        application_id: ApplicationId<A>,
1691        query: &A::Query,
1692    ) -> Result<QueryOutcome<A::QueryResponse>, Error> {
1693        let query = Query::user(application_id, query)?;
1694        let (
1695            QueryOutcome {
1696                response,
1697                operations,
1698            },
1699            _,
1700        ) = self.query_application(query, None).await?;
1701        match response {
1702            linera_execution::QueryResponse::User(response_bytes) => {
1703                let response = serde_json::from_slice(&response_bytes)?;
1704                Ok(QueryOutcome {
1705                    response,
1706                    operations,
1707                })
1708            }
1709            _ => Err(Error::InternalError("Unexpected response for user query")),
1710        }
1711    }
1712
1713    /// Obtains the local balance of the chain account after staging the execution of
1714    /// incoming messages in a new block.
1715    ///
1716    /// Does not attempt to synchronize with validators. The result will reflect up to
1717    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
1718    /// block.
1719    #[instrument(level = "trace")]
1720    pub async fn query_balance(&self) -> Result<Amount, Error> {
1721        let (balance, _) = self.query_balances_with_owner(AccountOwner::CHAIN).await?;
1722        Ok(balance)
1723    }
1724
1725    /// Obtains the local balance of an account after staging the execution of incoming messages in
1726    /// a new block.
1727    ///
1728    /// Does not attempt to synchronize with validators. The result will reflect up to
1729    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
1730    /// block.
1731    #[instrument(level = "trace", skip(owner))]
1732    pub async fn query_owner_balance(&self, owner: AccountOwner) -> Result<Amount, Error> {
1733        if owner.is_chain() {
1734            self.query_balance().await
1735        } else {
1736            Ok(self
1737                .query_balances_with_owner(owner)
1738                .await?
1739                .1
1740                .unwrap_or(Amount::ZERO))
1741        }
1742    }
1743
1744    /// Obtains the local balance of an account and optionally another user after staging the
1745    /// execution of incoming messages in a new block.
1746    ///
1747    /// Does not attempt to synchronize with validators. The result will reflect up to
1748    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
1749    /// block.
1750    #[instrument(level = "trace", skip(owner))]
1751    pub(crate) async fn query_balances_with_owner(
1752        &self,
1753        owner: AccountOwner,
1754    ) -> Result<(Amount, Option<Amount>), Error> {
1755        let incoming_bundles = self.pending_message_bundles().await?;
1756        // Since we disallow empty blocks, and there is no incoming messages,
1757        // that could change it, we query for the balance immediately.
1758        if incoming_bundles.is_empty() {
1759            let chain_balance = self.local_balance().await?;
1760            let owner_balance = self.local_owner_balance(owner).await?;
1761            return Ok((chain_balance, Some(owner_balance)));
1762        }
1763        let info = self.chain_info().await?;
1764        let transactions = incoming_bundles
1765            .into_iter()
1766            .map(Transaction::ReceiveMessages)
1767            .collect::<Vec<_>>();
1768        let timestamp = self.next_timestamp(&transactions, info.timestamp);
1769        let block = ProposedBlock {
1770            epoch: info.epoch,
1771            chain_id: self.chain_id,
1772            transactions,
1773            previous_block_hash: info.block_hash,
1774            height: info.next_block_height,
1775            authenticated_owner: if owner == AccountOwner::CHAIN {
1776                None
1777            } else {
1778                Some(owner)
1779            },
1780            timestamp,
1781        };
1782        match self
1783            .client
1784            .stage_block_execution(
1785                block,
1786                None,
1787                Vec::new(),
1788                self.options.bundle_execution_policy(),
1789            )
1790            .await
1791        {
1792            Ok((_, response, _)) => Ok((
1793                response.info.chain_balance,
1794                response.info.requested_owner_balance,
1795            )),
1796            Err(Error::LocalNodeError(LocalNodeError::WorkerError(WorkerError::ChainError(
1797                error,
1798            )))) if matches!(
1799                &*error,
1800                ChainError::ExecutionError(
1801                    execution_error,
1802                    ChainExecutionContext::Block
1803                ) if matches!(
1804                    **execution_error,
1805                    ExecutionError::FeesExceedFunding { .. }
1806                )
1807            ) =>
1808            {
1809                // We can't even pay for the execution of one empty block. Let's return zero.
1810                Ok((Amount::ZERO, Some(Amount::ZERO)))
1811            }
1812            Err(error) => Err(error),
1813        }
1814    }
1815
1816    /// Reads the local balance of the chain account.
1817    ///
1818    /// Does not process the inbox or attempt to synchronize with validators.
1819    #[instrument(level = "trace")]
1820    pub async fn local_balance(&self) -> Result<Amount, Error> {
1821        let (balance, _) = self.local_balances_with_owner(AccountOwner::CHAIN).await?;
1822        Ok(balance)
1823    }
1824
1825    /// Reads the local balance of a user account.
1826    ///
1827    /// Does not process the inbox or attempt to synchronize with validators.
1828    #[instrument(level = "trace", skip(owner))]
1829    pub async fn local_owner_balance(&self, owner: AccountOwner) -> Result<Amount, Error> {
1830        if owner.is_chain() {
1831            self.local_balance().await
1832        } else {
1833            Ok(self
1834                .local_balances_with_owner(owner)
1835                .await?
1836                .1
1837                .unwrap_or(Amount::ZERO))
1838        }
1839    }
1840
1841    /// Reads the local balance of the chain account and optionally another user.
1842    ///
1843    /// Does not process the inbox or attempt to synchronize with validators.
1844    #[instrument(level = "trace", skip(owner))]
1845    pub(crate) async fn local_balances_with_owner(
1846        &self,
1847        owner: AccountOwner,
1848    ) -> Result<(Amount, Option<Amount>), Error> {
1849        ensure!(
1850            self.chain_info().await?.next_block_height >= self.initial_next_block_height,
1851            Error::WalletSynchronizationError
1852        );
1853        let mut query = ChainInfoQuery::new(self.chain_id);
1854        query.request_owner_balance = owner;
1855        let response = self
1856            .client
1857            .local_node
1858            .handle_chain_info_query(query)
1859            .await?;
1860        Ok((
1861            response.info.chain_balance,
1862            response.info.requested_owner_balance,
1863        ))
1864    }
1865
1866    /// Sends tokens to a chain.
1867    #[instrument(level = "trace")]
1868    pub async fn transfer_to_account(
1869        &self,
1870        from: AccountOwner,
1871        amount: Amount,
1872        account: Account,
1873    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1874        self.transfer(from, amount, account).await
1875    }
1876
1877    /// Burns tokens (transfer to a special address).
1878    #[cfg(with_testing)]
1879    #[instrument(level = "trace")]
1880    pub async fn burn(
1881        &self,
1882        owner: AccountOwner,
1883        amount: Amount,
1884    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1885        let recipient = Account::burn_address(self.chain_id);
1886        self.transfer(owner, amount, recipient).await
1887    }
1888
1889    #[instrument(level = "trace")]
1890    pub async fn fetch_chain_info(&self) -> Result<Box<ChainInfo>, Error> {
1891        let validators = self.client.validator_nodes().await?;
1892        self.client
1893            .fetch_chain_info(self.chain_id, &validators)
1894            .await
1895    }
1896
1897    /// Attempts to synchronize chains that have sent us messages and populate our local
1898    /// inbox.
1899    ///
1900    /// To create a block that actually executes the messages in the inbox,
1901    /// `process_inbox` must be called separately.
1902    ///
1903    /// If the chain is in follow-only mode, this only downloads blocks for this chain without
1904    /// fetching manager values or sender/publisher chains.
1905    /// Synchronizes the chain state from validators, optionally stopping at a given
1906    /// block height or block timestamp.
1907    ///
1908    /// - If `next_height` is `Some`, downloads blocks up to (but not including) that height.
1909    /// - If `until_block_time` is `Some`, downloads blocks up to (but not including) the
1910    ///   first block with timestamp >= the given value.
1911    #[instrument(level = "trace")]
1912    pub async fn synchronize_up_to(
1913        &self,
1914        next_height: Option<BlockHeight>,
1915        until_block_time: Option<Timestamp>,
1916    ) -> Result<Box<ChainInfo>, Error> {
1917        let (_, committee) = self.client.admin_committee().await?;
1918        let validators = self.client.make_nodes(&committee)?;
1919        Box::pin(self.client.fetch_chain_info(self.chain_id, &validators)).await?;
1920        communicate_with_quorum(
1921            &validators,
1922            &committee,
1923            |_: &()| (),
1924            |remote_node| async move {
1925                self.client
1926                    .download_certificates_from(
1927                        &remote_node,
1928                        self.chain_id,
1929                        next_height.unwrap_or(BlockHeight::MAX),
1930                        until_block_time,
1931                    )
1932                    .await?;
1933                Ok(())
1934            },
1935            self.client.options.quorum_grace_period,
1936        )
1937        .await?;
1938        self.client
1939            .local_node
1940            .chain_info(self.chain_id)
1941            .await
1942            .map_err(Into::into)
1943    }
1944
1945    pub async fn synchronize_from_validators(&self) -> Result<Box<ChainInfo>, Error> {
1946        if self.is_follow_only() {
1947            return self.client.synchronize_chain_state(self.chain_id).await;
1948        }
1949        let info = self.prepare_chain().await?;
1950        self.synchronize_publisher_chains().await?;
1951        self.find_received_certificates().await?;
1952        Ok(info)
1953    }
1954
1955    /// Processes the last pending block.
1956    #[instrument(level = "trace")]
1957    pub async fn process_pending_block(
1958        &self,
1959    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1960        self.prepare_chain().await?;
1961        let mutex = self.proposal_mutex();
1962        let mut proposal_guard = mutex.lock_owned().await;
1963        self.process_pending_block_without_prepare(&mut proposal_guard)
1964            .await
1965    }
1966
1967    /// Processes the last pending block. Assumes that the local chain is up to date.
1968    ///
1969    /// The caller must hold the proposal mutex via `proposal_guard`. The pending proposal
1970    /// is read from and cleared through the guard, ensuring synchronization.
1971    ///
1972    /// On error, compares the chain manager's snapshot just before the failing network
1973    /// call against the current state. If a per-validator updater absorbed new info
1974    /// (e.g. a locking block we didn't have) while we were calling, retries with the
1975    /// refreshed state. The snapshot is updated after our own local proposal handling
1976    /// so that doesn't count as "new info from a validator".
1977    ///
1978    /// Retrying terminates without an explicit bound: each retry corresponds to local
1979    /// consensus state that actually advanced (verified by the local node, which checks
1980    /// signatures and quorums), and that state is monotone.
1981    #[instrument(level = "debug", skip(self, proposal_guard), fields(chain_id = %self.chain_id))]
1982    async fn process_pending_block_without_prepare(
1983        &self,
1984        proposal_guard: &mut Option<PendingProposal>,
1985    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1986        // The fallback sync below is done at most once, so a genuinely stuck proposal can't loop.
1987        let mut did_fallback_sync = false;
1988        loop {
1989            // `process_pending_block_inner` records the baseline snapshot itself, after
1990            // the initial timeout request, so that absorbing a timeout certificate isn't
1991            // counted as new info. It stays `None` only if the call fails before that
1992            // point, in which case there is no baseline to retry against.
1993            let mut snapshot = None;
1994            let err = match self
1995                .process_pending_block_inner(proposal_guard, &mut snapshot)
1996                .await
1997            {
1998                Ok(outcome) => return Ok(outcome),
1999                Err(err) => err,
2000            };
2001            let Some(snapshot) = snapshot else {
2002                return Err(err);
2003            };
2004            let Ok(current) = self.consensus_state_snapshot().await else {
2005                return Err(err);
2006            };
2007            if current == snapshot {
2008                // The lazy per-validator pull on rejection (`Updater::send_block_proposal`) can be
2009                // raced out: `communicate_with_quorum` breaks as soon as a quorum is impossible and
2010                // drops the still-in-flight `synchronize_chain_state_from` calls, so a locking block
2011                // held only by the slower-to-respond validators is never absorbed. Fall back once to
2012                // an explicit quorum sync — guaranteed to reach a lock-holder — and retry if it
2013                // absorbed anything; otherwise the rejection was genuine, so propagate it.
2014                //
2015                // TODO(#6453): this fallback path has no deterministic regression test yet — forcing
2016                // the lazy pull to miss needs a clock-driven per-validator response delay in the test
2017                // harness (building on #6448); until then it is only covered indirectly by the (now
2018                // non-flaky) `test_lazy_pull_absorbs_locking_block_on_proposal_rejection`.
2019                if did_fallback_sync {
2020                    return Err(err);
2021                }
2022                did_fallback_sync = true;
2023                if let Err(error) = self.synchronize_chain_state(self.chain_id).await {
2024                    tracing::error!(%error, "fallback sync failed after rejected proposal");
2025                    return Err(err);
2026                }
2027                let Ok(after_sync) = self.consensus_state_snapshot().await else {
2028                    return Err(err);
2029                };
2030                if after_sync == snapshot {
2031                    return Err(err);
2032                }
2033                tracing::debug!(
2034                    chain_id = %self.chain_id,
2035                    ?snapshot,
2036                    ?after_sync,
2037                    %err,
2038                    "fallback sync absorbed new consensus state after rejected proposal; retrying"
2039                );
2040                continue;
2041            }
2042            tracing::debug!(
2043                chain_id = %self.chain_id,
2044                ?snapshot,
2045                ?current,
2046                %err,
2047                "local consensus state advanced during process_pending_block; retrying"
2048            );
2049        }
2050    }
2051
2052    async fn process_pending_block_inner(
2053        &self,
2054        proposal_guard: &mut Option<PendingProposal>,
2055        snapshot: &mut Option<ConsensusStateSnapshot>,
2056    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
2057        let process_start = linera_base::time::Instant::now();
2058        tracing::debug!("process_pending_block_without_prepare started");
2059        let info = self.request_leader_timeout_if_needed().await?;
2060        // After the timeout request (which may itself absorb a timeout cert from
2061        // validators), bake the resulting state into the snapshot. The rest of this
2062        // iteration will propose in the round the timeout produced, so a state diff
2063        // from just that absorption isn't a retry signal.
2064        *snapshot = Some(self.consensus_state_snapshot().await?);
2065
2066        // Clear stale pending proposals whose height has already been committed.
2067        if let Some(pending) = &*proposal_guard {
2068            if pending.block.height < info.next_block_height {
2069                tracing::debug!(
2070                    "Clearing pending proposal: a block was committed at height {}",
2071                    pending.block.height
2072                );
2073                *proposal_guard = None;
2074            }
2075        }
2076
2077        // If there is a validated block in the current round, finalize it.
2078        if info.manager.has_locking_block_in_current_round()
2079            && !info.manager.current_round.is_fast()
2080        {
2081            return self.finalize_locking_block(info).await;
2082        }
2083        let identity = self.identity().await?;
2084
2085        let local_node = &self.client.local_node;
2086        // Otherwise we have to re-propose the highest validated block, if there is one.
2087        let (block, blobs, owner) = if let Some(locking) = &info.manager.requested_locking {
2088            let (block, blobs) = match &**locking {
2089                LockingBlock::Regular(certificate) => {
2090                    let blob_ids = certificate.block().required_blob_ids();
2091                    let blobs = local_node
2092                        .get_locking_blobs(&blob_ids, self.chain_id)
2093                        .await?
2094                        .ok_or_else(|| Error::InternalError("Missing local locking blobs"))?;
2095                    debug!("Retrying locking block from round {}", certificate.round);
2096                    (certificate.block().clone(), blobs)
2097                }
2098                LockingBlock::Fast(proposal) => {
2099                    let proposed_block = proposal.content.block.clone();
2100                    let blob_ids = proposed_block.published_blob_ids();
2101                    let blobs = local_node
2102                        .get_locking_blobs(&blob_ids, self.chain_id)
2103                        .await?
2104                        .ok_or_else(|| Error::InternalError("Missing local locking blobs"))?;
2105                    let (block, _, _) = self
2106                        .client
2107                        .stage_block_execution(
2108                            proposed_block,
2109                            None,
2110                            blobs.clone(),
2111                            BundleExecutionPolicy::committed(),
2112                        )
2113                        .await?;
2114                    debug!("Retrying locking block from fast round.");
2115                    (block, blobs)
2116                }
2117            };
2118            (block, blobs, identity)
2119        } else if let Some(pending) = proposal_guard.as_ref() {
2120            // Otherwise we are free to propose our own pending block. Sign it as the owner
2121            // that staged it: the block's operations are authenticated by that owner, and
2122            // validators reject the proposal with `WorkerError::InvalidSigner` if we re-sign
2123            // it as someone else (which would happen if `preferred_owner` changed since the
2124            // block was staged). The signer is global to the client, so we can sign as the
2125            // original author as long as we still hold their key.
2126            let owner = match pending.block.authenticated_owner {
2127                Some(staged_owner) if staged_owner != identity => {
2128                    if !self.has_key_for(&staged_owner).await? {
2129                        // If a fast-round proposal was already submitted, we can't safely
2130                        // drop it and propose a conflicting fast block: fast rounds skip
2131                        // the validation step and rely on the super owner not forking
2132                        // itself, so a second proposal could split votes between f+1 and
2133                        // 2f and wedge the round until it times out. Surface an error so
2134                        // the caller can recover the key or wait for the timeout.
2135                        if pending.round.is_some_and(|round| round.is_fast()) {
2136                            return Err(Error::BlockProposalError(
2137                                "pending fast block was signed by an owner whose key is no \
2138                                 longer available; recover the key or wait for the round to \
2139                                 time out before retrying",
2140                            ));
2141                        }
2142                        warn!(
2143                            ?staged_owner, %identity,
2144                            "Discarding pending block: no signer key for its authenticated owner",
2145                        );
2146                        *proposal_guard = None;
2147                        return Ok(ClientOutcome::Committed(None));
2148                    }
2149                    staged_owner
2150                }
2151                _ => identity,
2152            };
2153            let proposed_block = pending.block.clone();
2154            let blobs = pending.blobs.clone();
2155            let staging_outcome = pending.auto_retry_outcome.as_ref();
2156            let round = self.round_for_oracle(&info, &owner).await?;
2157            let (block, _, _) = self
2158                .client
2159                .stage_block_execution(
2160                    proposed_block,
2161                    round,
2162                    blobs.clone(),
2163                    BundleExecutionPolicy::committed(),
2164                )
2165                .await?;
2166            // Sanity check: the committed execution should produce the same outcome
2167            // as the initial AutoRetry execution. A mismatch indicates a divergence
2168            // between the two execution paths.
2169            if let Some(staging_outcome) = staging_outcome {
2170                ensure!(
2171                    block.outcome_matches(staging_outcome),
2172                    Error::ExecutionOutcomeMismatch
2173                );
2174            }
2175            debug!("Proposing the local pending block.");
2176            (block, blobs, owner)
2177        } else {
2178            return Ok(ClientOutcome::Committed(None)); // Nothing to do.
2179        };
2180
2181        let has_oracle_responses = block.has_oracle_responses();
2182        let (proposed_block, outcome) = block.into_proposal();
2183        let round = match self
2184            .round_for_new_proposal(&info, &owner, has_oracle_responses)
2185            .await?
2186        {
2187            Either::Left(round) => round,
2188            Either::Right(timeout) => return Ok(ClientOutcome::WaitForTimeout(timeout)),
2189        };
2190        debug!("Proposing block for round {}", round);
2191        if let Some(pending) = proposal_guard.as_mut() {
2192            pending.round.get_or_insert(round);
2193        }
2194
2195        let already_handled_locally = info
2196            .manager
2197            .already_handled_proposal(round, &proposed_block);
2198        // Create the final block proposal.
2199        let proposal = if let Some(locking) = info.manager.requested_locking {
2200            Box::new(match *locking {
2201                LockingBlock::Regular(cert) => {
2202                    BlockProposal::new_retry_regular(owner, round, cert, self.signer())
2203                        .await
2204                        .map_err(Error::signer_failure)?
2205                }
2206                LockingBlock::Fast(proposal) => {
2207                    BlockProposal::new_retry_fast(owner, round, proposal, self.signer())
2208                        .await
2209                        .map_err(Error::signer_failure)?
2210                }
2211            })
2212        } else {
2213            Box::new(
2214                BlockProposal::new_initial(owner, round, proposed_block.clone(), self.signer())
2215                    .await
2216                    .map_err(Error::signer_failure)?,
2217            )
2218        };
2219        if !already_handled_locally {
2220            // Check the final block proposal. This will be cheaper after #1401.
2221            if let Err(err) = local_node.handle_block_proposal(*proposal.clone()).await {
2222                match err {
2223                    LocalNodeError::BlobsNotFound(_) => {
2224                        local_node
2225                            .handle_pending_blobs(self.chain_id, blobs)
2226                            .await?;
2227                        local_node.handle_block_proposal(*proposal.clone()).await?;
2228                    }
2229                    err => return Err(err.into()),
2230                }
2231            }
2232        }
2233        // The local handle of our own proposal can set a Fast lock, advance
2234        // `proposed`/`signed_proposal`, and thereby bump `current_round`. None of that
2235        // is "new info from a validator", so fold it into the snapshot before the
2236        // network calls below.
2237        *snapshot = Some(self.consensus_state_snapshot().await?);
2238        let committee = self.local_committee().await?;
2239        let block = Block::new(proposed_block, outcome);
2240        // Send the query to validators.
2241        let submit_block_proposal_start = linera_base::time::Instant::now();
2242        let certificate = if round.is_fast() {
2243            let hashed_value = ConfirmedBlock::new(block);
2244            self.client
2245                .submit_block_proposal(committee.clone(), proposal, hashed_value)
2246                .await?
2247        } else {
2248            let hashed_value = ValidatedBlock::new(block);
2249            let certificate = self
2250                .client
2251                .submit_block_proposal(committee.clone(), proposal, hashed_value.clone())
2252                .await?;
2253            self.client.finalize_block(&committee, certificate).await?
2254        };
2255        self.send_timing(submit_block_proposal_start, TimingType::SubmitBlockProposal);
2256        tracing::debug!(
2257            total_process_ms = process_start.elapsed().as_millis(),
2258            "process_pending_block_without_prepare completing"
2259        );
2260        debug!(round = %certificate.round, "Sending confirmed block to validators");
2261        let certificate = self.client.storage_client().cache_certificate(certificate);
2262        self.update_validators(Some(&committee), Some(certificate.clone()))
2263            .await?;
2264        // Clear the pending proposal now that the block has been committed.
2265        *proposal_guard = None;
2266        Ok(ClientOutcome::Committed(Some(CacheArc::unwrap_or_clone(
2267            certificate,
2268        ))))
2269    }
2270
2271    #[expect(
2272        clippy::cast_possible_truncation,
2273        reason = "elapsed millis fits in u64 for any realistic measurement window"
2274    )]
2275    fn send_timing(&self, start: Instant, timing_type: TimingType) {
2276        let Some(sender) = &self.timing_sender else {
2277            return;
2278        };
2279        if let Err(err) = sender.send((start.elapsed().as_millis() as u64, timing_type)) {
2280            tracing::warn!(%err, "Failed to send timing info");
2281        }
2282    }
2283
2284    /// Requests a leader timeout certificate if the current round has timed out. Returns the
2285    /// chain info for the (possibly new) current round.
2286    async fn request_leader_timeout_if_needed(&self) -> Result<Box<ChainInfo>, Error> {
2287        let mut info = self.chain_info_with_manager_values().await?;
2288        // If the current round has timed out, we request a timeout certificate and retry in
2289        // the next round.
2290        if let Some(round_timeout) = info.manager.round_timeout {
2291            if round_timeout <= self.storage_client().clock().current_time() {
2292                if let Err(e) = self.request_leader_timeout().await {
2293                    debug!("Failed to obtain a timeout certificate: {}", e);
2294                } else {
2295                    info = self.chain_info_with_manager_values().await?;
2296                }
2297            }
2298        }
2299        Ok(info)
2300    }
2301
2302    /// Finalizes the locking block.
2303    ///
2304    /// Panics if there is no locking block; fails if the locking block is not in the current round.
2305    async fn finalize_locking_block(
2306        &self,
2307        info: Box<ChainInfo>,
2308    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
2309        let locking = info
2310            .manager
2311            .requested_locking
2312            .expect("Should have a locking block");
2313        let LockingBlock::Regular(certificate) = *locking else {
2314            panic!("Should have a locking validated block");
2315        };
2316        debug!(
2317            round = %certificate.round,
2318            "Finalizing locking block"
2319        );
2320        let committee = self.local_committee().await?;
2321        let certificate = self
2322            .client
2323            .finalize_block(&committee, certificate.clone())
2324            .await?;
2325        let certificate = self.client.storage_client().cache_certificate(certificate);
2326        self.update_validators(Some(&committee), Some(certificate.clone()))
2327            .await?;
2328        Ok(ClientOutcome::Committed(Some(CacheArc::unwrap_or_clone(
2329            certificate,
2330        ))))
2331    }
2332
2333    /// Returns the number for the round number oracle to use when staging a block proposal.
2334    async fn round_for_oracle(
2335        &self,
2336        info: &ChainInfo,
2337        identity: &AccountOwner,
2338    ) -> Result<Option<u32>, Error> {
2339        // Pretend we do use oracles: If we don't, the round number is never read anyway.
2340        match self.round_for_new_proposal(info, identity, true).await {
2341            // If it is a multi-leader round, use its number for the oracle.
2342            Ok(Either::Left(round)) => Ok(round.multi_leader()),
2343            // If there is no suitable round with oracles, use None: If it works without oracles,
2344            // the block won't read the value. If it returns a timeout, it will be a single-leader
2345            // round, in which the oracle returns None.
2346            Err(Error::BlockProposalError(_)) | Ok(Either::Right(_)) => Ok(None),
2347            Err(err) => Err(err),
2348        }
2349    }
2350
2351    /// Returns a round in which we can propose a new block or the given one, if possible.
2352    async fn round_for_new_proposal(
2353        &self,
2354        info: &ChainInfo,
2355        identity: &AccountOwner,
2356        has_oracle_responses: bool,
2357    ) -> Result<Either<Round, RoundTimeout>, Error> {
2358        let manager = &info.manager;
2359        let seed = manager.seed;
2360        // If there is a conflicting proposal in the current round, we can only propose if the
2361        // next round can be started without a timeout, i.e. if we are in a multi-leader round.
2362        // Similarly, we cannot propose a block that uses oracles in the fast round, and also
2363        // skip the fast round if fast blocks are not allowed.
2364        let skip_fast = manager.current_round.is_fast()
2365            && (has_oracle_responses || !self.options.allow_fast_blocks);
2366        let conflict = manager
2367            .requested_signed_proposal
2368            .as_ref()
2369            .into_iter()
2370            .chain(&manager.requested_proposed)
2371            .any(|proposal| proposal.content.round == manager.current_round)
2372            || skip_fast;
2373        let round = if !conflict {
2374            manager.current_round
2375        } else if let Some(round) = manager
2376            .ownership
2377            .next_round(manager.current_round)
2378            .filter(|_| manager.current_round.is_multi_leader() || manager.current_round.is_fast())
2379        {
2380            round
2381        } else if let Some(timeout) = info.round_timeout() {
2382            return Ok(Either::Right(timeout));
2383        } else {
2384            return Err(Error::BlockProposalError(
2385                "Conflicting proposal in the current round",
2386            ));
2387        };
2388        let current_committee = self
2389            .local_committee()
2390            .await?
2391            .validators
2392            .values()
2393            .map(|v| (AccountOwner::from(v.account_public_key), v.votes))
2394            .collect();
2395        if manager.should_propose(identity, round, seed, &current_committee) {
2396            if let Some(wait_until) = self.multi_leader_jitter_target(info, identity, round) {
2397                return Ok(Either::Right(RoundTimeout {
2398                    timestamp: wait_until,
2399                    current_round: round,
2400                    next_block_height: info.next_block_height,
2401                }));
2402            }
2403            return Ok(Either::Left(round));
2404        }
2405        if let Some(timeout) = info.round_timeout() {
2406            return Ok(Either::Right(timeout));
2407        }
2408        Err(Error::BlockProposalError(
2409            "Not a leader in the current round",
2410        ))
2411    }
2412
2413    /// Returns the timestamp at which `owner` should propose in `round`, to spread out
2414    /// concurrent proposals from honest clients in a multi-leader round. Returns `None` if
2415    /// the owner should propose immediately (either because the round is not a multi-leader
2416    /// round, the owner is the preferred proposer, or the jitter target is already in the past).
2417    ///
2418    /// The delay is deterministic per `(owner, round)` and is anchored at the round's start
2419    /// time when known, so that retrying after an interrupting notification does not extend
2420    /// the wait further.
2421    fn multi_leader_jitter_target(
2422        &self,
2423        info: &ChainInfo,
2424        owner: &AccountOwner,
2425        round: Round,
2426    ) -> Option<Timestamp> {
2427        if !self.options.multi_leader_jitter {
2428            return None;
2429        }
2430        let ownership = &info.manager.ownership;
2431        let delay = ownership.multi_leader_proposal_delay(owner, round)?;
2432        if delay == TimeDelta::ZERO {
2433            return None;
2434        }
2435        let now = self.storage_client().clock().current_time();
2436        let round_start = if round == info.manager.current_round {
2437            match (info.manager.round_timeout, ownership.round_timeout(round)) {
2438                (Some(end), Some(duration)) => end.saturating_sub(duration),
2439                _ => now,
2440            }
2441        } else {
2442            now
2443        };
2444        let propose_at = round_start.saturating_add(delay);
2445        (propose_at > now).then_some(propose_at)
2446    }
2447
2448    /// Discards the pending block proposal, if any, so that a fresh block can be
2449    /// proposed instead of retrying the queued one.
2450    ///
2451    /// Note that this does not update the copy persisted in the wallet (callers that
2452    /// persist pending proposals should refresh it afterwards). Also, this should never
2453    /// be used on a proposal already submitted in the fast round.
2454    #[instrument(level = "trace")]
2455    pub async fn clear_pending_proposal(&self) {
2456        *self.proposal_mutex().lock().await = None;
2457    }
2458
2459    /// Rotates the key of the chain.
2460    ///
2461    /// Replaces current owners of the chain with the new key pair.
2462    #[cfg(with_testing)]
2463    #[instrument(level = "trace")]
2464    pub async fn rotate_key_pair(
2465        &self,
2466        public_key: linera_base::crypto::AccountPublicKey,
2467    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2468        self.transfer_ownership(public_key.into()).await
2469    }
2470
2471    /// Transfers ownership of the chain to a single super owner.
2472    #[instrument(level = "trace")]
2473    pub async fn transfer_ownership(
2474        &self,
2475        new_owner: AccountOwner,
2476    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2477        self.execute_operation(SystemOperation::ChangeOwnership {
2478            super_owners: vec![new_owner],
2479            owners: Vec::new(),
2480            first_leader: None,
2481            multi_leader_rounds: 5,
2482            open_multi_leader_rounds: false,
2483            timeout_config: TimeoutConfig::default(),
2484        })
2485        .await
2486    }
2487
2488    /// Adds another owner to the chain, and turns existing super owners into regular owners.
2489    #[instrument(level = "trace")]
2490    pub async fn share_ownership(
2491        &self,
2492        new_owner: AccountOwner,
2493        new_weight: u64,
2494    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2495        let ownership = self.prepare_chain().await?.manager.ownership;
2496        ensure!(
2497            ownership.is_active(),
2498            ChainError::InactiveChain(self.chain_id)
2499        );
2500        let mut owners = ownership.owners.into_iter().collect::<Vec<_>>();
2501        owners.extend(ownership.super_owners.into_iter().zip(iter::repeat(100)));
2502        owners.push((new_owner, new_weight));
2503        let operations = vec![Operation::system(SystemOperation::ChangeOwnership {
2504            super_owners: Vec::new(),
2505            owners,
2506            first_leader: ownership.first_leader,
2507            multi_leader_rounds: ownership.multi_leader_rounds,
2508            open_multi_leader_rounds: ownership.open_multi_leader_rounds,
2509            timeout_config: ownership.timeout_config,
2510        })];
2511        self.execute_block(operations, vec![]).await
2512    }
2513
2514    /// Returns the current ownership settings on this chain.
2515    #[instrument(level = "trace")]
2516    pub async fn query_chain_ownership(&self) -> Result<ChainOwnership, Error> {
2517        Ok(self
2518            .client
2519            .local_node
2520            .chain_state_view(self.chain_id)
2521            .await?
2522            .execution_state
2523            .system
2524            .ownership
2525            .get()
2526            .await?
2527            .clone())
2528    }
2529
2530    /// Changes the ownership of this chain. Fails if it would remove existing owners, unless
2531    /// `remove_owners` is `true`.
2532    #[instrument(level = "trace")]
2533    pub async fn change_ownership(
2534        &self,
2535        ownership: ChainOwnership,
2536    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2537        self.execute_operation(SystemOperation::ChangeOwnership {
2538            super_owners: ownership.super_owners.into_iter().collect(),
2539            owners: ownership.owners.into_iter().collect(),
2540            first_leader: ownership.first_leader,
2541            multi_leader_rounds: ownership.multi_leader_rounds,
2542            open_multi_leader_rounds: ownership.open_multi_leader_rounds,
2543            timeout_config: ownership.timeout_config.clone(),
2544        })
2545        .await
2546    }
2547
2548    /// Returns the current application permissions on this chain.
2549    #[instrument(level = "trace")]
2550    pub async fn query_application_permissions(&self) -> Result<ApplicationPermissions, Error> {
2551        Ok(self
2552            .client
2553            .local_node
2554            .chain_state_view(self.chain_id)
2555            .await?
2556            .execution_state
2557            .system
2558            .application_permissions
2559            .get()
2560            .await?
2561            .clone())
2562    }
2563
2564    /// Changes the application permissions configuration on this chain.
2565    #[instrument(level = "trace", skip(application_permissions))]
2566    pub async fn change_application_permissions(
2567        &self,
2568        application_permissions: ApplicationPermissions,
2569    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2570        self.execute_operation(SystemOperation::ChangeApplicationPermissions(
2571            application_permissions,
2572        ))
2573        .await
2574    }
2575
2576    /// Opens a new chain with a derived UID.
2577    #[instrument(level = "trace", skip(self))]
2578    pub async fn open_chain(
2579        &self,
2580        ownership: ChainOwnership,
2581        application_permissions: ApplicationPermissions,
2582        balance: Amount,
2583    ) -> Result<ClientOutcome<(ChainDescription, ConfirmedBlockCertificate)>, Error> {
2584        // Check if we have a key for any owner before consuming ownership.
2585        let mut has_key = false;
2586        for owner in ownership.all_owners() {
2587            if self.has_key_for(owner).await? {
2588                has_key = true;
2589                break;
2590            }
2591        }
2592        let config = OpenChainConfig {
2593            ownership,
2594            balance,
2595            application_permissions,
2596        };
2597        let operation = Operation::system(SystemOperation::OpenChain(config));
2598        let certificate = match self.execute_block(vec![operation], vec![]).await? {
2599            ClientOutcome::Committed(certificate) => certificate,
2600            ClientOutcome::Conflict(certificate) => {
2601                return Ok(ClientOutcome::Conflict(certificate));
2602            }
2603            ClientOutcome::WaitForTimeout(timeout) => {
2604                return Ok(ClientOutcome::WaitForTimeout(timeout));
2605            }
2606        };
2607        // The only operation, i.e. the last transaction, created the new chain.
2608        let chain_blob = certificate
2609            .block()
2610            .body
2611            .blobs
2612            .last()
2613            .and_then(|blobs| blobs.last())
2614            .ok_or_else(|| Error::InternalError("Failed to create a new chain"))?;
2615        let description = bcs::from_bytes::<ChainDescription>(chain_blob.bytes())?;
2616        // If we have a key for any owner, add it to the list of tracked chains.
2617        if has_key {
2618            self.client
2619                .extend_chain_mode(description.id(), ListeningMode::FullChain);
2620            self.client
2621                .retry_pending_cross_chain_requests(self.chain_id)
2622                .await?;
2623        }
2624        Ok(ClientOutcome::Committed((description, certificate)))
2625    }
2626
2627    /// Publishes a checkpoint of the chain's execution state. The resulting block
2628    /// contains a single `SystemOperation::Checkpoint`; future nodes can bootstrap
2629    /// from the published blob instead of replaying the chain's history.
2630    #[instrument(level = "trace")]
2631    pub async fn checkpoint(&self) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2632        self.execute_operation(SystemOperation::Checkpoint).await
2633    }
2634
2635    /// Closes the chain (and loses everything in it!!).
2636    /// Returns `None` if the chain was already closed.
2637    #[instrument(level = "trace")]
2638    pub async fn close_chain(
2639        &self,
2640    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
2641        match self.execute_operation(SystemOperation::CloseChain).await {
2642            Ok(outcome) => Ok(outcome.map(Some)),
2643            Err(Error::LocalNodeError(LocalNodeError::WorkerError(WorkerError::ChainError(
2644                chain_error,
2645            )))) if matches!(*chain_error, ChainError::ClosedChain) => {
2646                Ok(ClientOutcome::Committed(None)) // Chain is already closed.
2647            }
2648            Err(error) => Err(error),
2649        }
2650    }
2651
2652    /// Publishes some module, optionally along with a JSON-encoded `Formats`
2653    /// description that becomes a third blob alongside contract and service.
2654    #[cfg(not(target_arch = "wasm32"))]
2655    #[instrument(level = "trace", skip(contract, service, formats))]
2656    pub async fn publish_module(
2657        &self,
2658        contract: Bytecode,
2659        service: Bytecode,
2660        vm_runtime: VmRuntime,
2661        formats: Option<Vec<u8>>,
2662    ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, Error> {
2663        let (blobs, module_id) =
2664            super::create_bytecode_blobs(contract, service, vm_runtime, formats).await;
2665        self.publish_module_blobs(blobs, module_id).await
2666    }
2667
2668    /// Publishes some module.
2669    #[cfg(not(target_arch = "wasm32"))]
2670    #[instrument(level = "trace", skip(blobs, module_id))]
2671    pub async fn publish_module_blobs(
2672        &self,
2673        blobs: Vec<Blob>,
2674        module_id: ModuleId,
2675    ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, Error> {
2676        self.execute_operations(
2677            vec![Operation::system(SystemOperation::PublishModule {
2678                module_id,
2679            })],
2680            blobs,
2681        )
2682        .await?
2683        .try_map(|certificate| Ok((module_id, certificate)))
2684    }
2685
2686    /// Publishes some data blobs.
2687    #[instrument(level = "trace", skip(bytes))]
2688    pub async fn publish_data_blobs(
2689        &self,
2690        bytes: Vec<Vec<u8>>,
2691    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2692        let blobs = bytes.into_iter().map(Blob::new_data);
2693        let publish_blob_operations = blobs
2694            .clone()
2695            .map(|blob| {
2696                Operation::system(SystemOperation::PublishDataBlob {
2697                    blob_hash: blob.id().hash,
2698                })
2699            })
2700            .collect();
2701        self.execute_operations(publish_blob_operations, blobs.collect())
2702            .await
2703    }
2704
2705    /// Publishes some data blob.
2706    #[instrument(level = "trace", skip(bytes))]
2707    pub async fn publish_data_blob(
2708        &self,
2709        bytes: Vec<u8>,
2710    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2711        self.publish_data_blobs(vec![bytes]).await
2712    }
2713
2714    /// Creates an application by instantiating some bytecode.
2715    #[instrument(
2716        level = "trace",
2717        skip(self, parameters, instantiation_argument, required_application_ids)
2718    )]
2719    pub async fn create_application<
2720        A: Abi,
2721        Parameters: Serialize,
2722        InstantiationArgument: Serialize,
2723    >(
2724        &self,
2725        module_id: ModuleId<A, Parameters, InstantiationArgument>,
2726        parameters: &Parameters,
2727        instantiation_argument: &InstantiationArgument,
2728        required_application_ids: Vec<ApplicationId>,
2729    ) -> Result<ClientOutcome<(ApplicationId<A>, ConfirmedBlockCertificate)>, Error> {
2730        let instantiation_argument = serde_json::to_vec(instantiation_argument)?;
2731        let parameters = serde_json::to_vec(parameters)?;
2732        Ok(self
2733            .create_application_untyped(
2734                module_id.forget_abi(),
2735                parameters,
2736                instantiation_argument,
2737                required_application_ids,
2738            )
2739            .await?
2740            .map(|(app_id, cert)| (app_id.with_abi(), cert)))
2741    }
2742
2743    /// Creates an application by instantiating some bytecode.
2744    #[instrument(
2745        level = "trace",
2746        skip(
2747            self,
2748            module_id,
2749            parameters,
2750            instantiation_argument,
2751            required_application_ids
2752        )
2753    )]
2754    pub async fn create_application_untyped(
2755        &self,
2756        module_id: ModuleId,
2757        parameters: Vec<u8>,
2758        instantiation_argument: Vec<u8>,
2759        required_application_ids: Vec<ApplicationId>,
2760    ) -> Result<ClientOutcome<(ApplicationId, ConfirmedBlockCertificate)>, Error> {
2761        self.execute_operation(SystemOperation::CreateApplication {
2762            module_id,
2763            parameters,
2764            instantiation_argument,
2765            required_application_ids,
2766        })
2767        .await?
2768        .try_map(|certificate| {
2769            // The first message of the only operation created the application.
2770            let mut creation = certificate
2771                .block()
2772                .created_blob_ids()
2773                .into_iter()
2774                .filter(|blob_id| blob_id.blob_type == BlobType::ApplicationDescription)
2775                .collect::<Vec<_>>();
2776            if creation.len() > 1 {
2777                return Err(Error::InternalError(
2778                    "Unexpected number of application descriptions published",
2779                ));
2780            }
2781            let blob_id = creation.pop().ok_or(Error::InternalError(
2782                "ApplicationDescription blob not found.",
2783            ))?;
2784            let id = ApplicationId::new(blob_id.hash);
2785            Ok((id, certificate))
2786        })
2787    }
2788
2789    /// Creates a new committee and starts using it (admin chains only).
2790    #[instrument(level = "trace", skip(committee))]
2791    pub async fn stage_new_committee(
2792        &self,
2793        committee: Committee,
2794    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2795        let blob = Blob::new(BlobContent::new_committee(bcs::to_bytes(&committee)?));
2796        let blob_hash = blob.id().hash;
2797        match self
2798            .execute_operations(
2799                vec![Operation::system(SystemOperation::Admin(
2800                    AdminOperation::PublishCommitteeBlob { blob_hash },
2801                ))],
2802                vec![blob],
2803            )
2804            .await?
2805        {
2806            ClientOutcome::Committed(_) => {}
2807            outcome @ ClientOutcome::WaitForTimeout(_) | outcome @ ClientOutcome::Conflict(_) => {
2808                return Ok(outcome)
2809            }
2810        }
2811        let epoch = self.chain_info().await?.epoch.try_add_one()?;
2812        self.execute_operation(SystemOperation::Admin(AdminOperation::CreateCommittee {
2813            epoch,
2814            blob_hash,
2815        }))
2816        .await
2817    }
2818
2819    /// Synchronizes the chain with the validators and creates blocks without any operations to
2820    /// process all incoming messages. This may require several blocks.
2821    ///
2822    /// If not all certificates could be processed due to a timeout, the timestamp for when to retry
2823    /// is returned, too.
2824    #[instrument(level = "trace")]
2825    pub async fn process_inbox(
2826        &self,
2827    ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), Error> {
2828        self.prepare_chain().await?;
2829        self.process_inbox_without_prepare().await
2830    }
2831
2832    /// Creates blocks without any operations to process all incoming messages. This may require
2833    /// several blocks.
2834    ///
2835    /// If not all certificates could be processed due to a timeout, the timestamp for when to retry
2836    /// is returned, too.
2837    #[instrument(level = "trace")]
2838    pub async fn process_inbox_without_prepare(
2839        &self,
2840    ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), Error> {
2841        #[cfg(with_metrics)]
2842        let _latency = super::metrics::PROCESS_INBOX_WITHOUT_PREPARE_LATENCY.measure_latency();
2843
2844        let mut certificates = Vec::new();
2845        loop {
2846            // We provide no operations - this means that the only operations executed
2847            // will be epoch changes, receiving messages and processing event stream
2848            // updates, if any are pending.
2849            match self.execute_block(vec![], vec![]).await {
2850                Ok(ClientOutcome::Committed(certificate)) => certificates.push(certificate),
2851                Ok(ClientOutcome::Conflict(certificate)) => certificates.push(*certificate),
2852                Ok(ClientOutcome::WaitForTimeout(timeout)) => {
2853                    return Ok((certificates, Some(timeout)));
2854                }
2855                // Nothing in the inbox and no stream updates to be processed.
2856                Err(Error::LocalNodeError(LocalNodeError::WorkerError(
2857                    WorkerError::ChainError(chain_error),
2858                ))) if matches!(*chain_error, ChainError::EmptyBlock) => {
2859                    return Ok((certificates, None));
2860                }
2861                Err(error) => return Err(error),
2862            };
2863        }
2864    }
2865
2866    /// Returns operations to process all pending new epochs, in order.
2867    async fn collect_epoch_changes(&self) -> Result<Vec<Operation>, Error> {
2868        let mut next_epoch = self.chain_info().await?.epoch.try_add_one()?;
2869        let mut epoch_change_ops = Vec::new();
2870        while self
2871            .has_admin_event(EPOCH_STREAM_NAME, next_epoch.0)
2872            .await?
2873        {
2874            epoch_change_ops.push(Operation::system(SystemOperation::ProcessNewEpoch(
2875                next_epoch,
2876            )));
2877            next_epoch.try_add_assign_one()?;
2878        }
2879        Ok(epoch_change_ops)
2880    }
2881
2882    /// Returns whether the system event on the admin chain with the given stream name and key
2883    /// exists in storage.
2884    async fn has_admin_event(&self, stream_name: &[u8], index: u32) -> Result<bool, Error> {
2885        let event_id = EventId {
2886            chain_id: self.client.admin_chain_id,
2887            stream_id: StreamId::system(stream_name),
2888            index,
2889        };
2890        Ok(self
2891            .client
2892            .storage_client()
2893            .read_event(event_id)
2894            .await?
2895            .is_some())
2896    }
2897
2898    /// Returns the indices and events from the storage
2899    pub async fn events_from_index(
2900        &self,
2901        stream_id: StreamId,
2902        start_index: u32,
2903    ) -> Result<Vec<IndexAndEvent>, Error> {
2904        Ok(self
2905            .client
2906            .storage_client()
2907            .read_events_from_index(&self.chain_id, &stream_id, start_index)
2908            .await?)
2909    }
2910
2911    /// Deprecates all configurations of voting rights up to the given one (admin chains only).
2912    /// Emits a `RemoveCommittee` event for every still-active epoch up to and including
2913    /// `revoked_epoch`.
2914    #[instrument(level = "trace")]
2915    pub async fn revoke_epochs(
2916        &self,
2917        revoked_epoch: Epoch,
2918    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2919        self.prepare_chain().await?;
2920        let current_epoch = self.chain_info().await?.epoch;
2921        ensure!(
2922            revoked_epoch < current_epoch,
2923            Error::CannotRevokeCurrentEpoch(current_epoch)
2924        );
2925        let mut operations = Vec::new();
2926        for epoch_index in 0..=revoked_epoch.0 {
2927            let epoch = Epoch(epoch_index);
2928            if self
2929                .has_admin_event(REMOVED_EPOCH_STREAM_NAME, epoch.0)
2930                .await?
2931            {
2932                continue;
2933            }
2934            operations.push(Operation::system(SystemOperation::Admin(
2935                AdminOperation::RemoveCommittee { epoch },
2936            )));
2937        }
2938        ensure!(!operations.is_empty(), Error::EpochAlreadyRevoked);
2939        self.execute_operations(operations, vec![]).await
2940    }
2941
2942    /// Sends money to a chain.
2943    /// Do not check balance. (This may block the client)
2944    /// Do not confirm the transaction.
2945    #[cfg(with_testing)]
2946    #[instrument(level = "trace")]
2947    pub async fn transfer_to_account_unsafe_unconfirmed(
2948        &self,
2949        owner: AccountOwner,
2950        amount: Amount,
2951        recipient: Account,
2952    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2953        self.execute_operation(SystemOperation::Transfer {
2954            owner,
2955            recipient,
2956            amount,
2957        })
2958        .await
2959    }
2960
2961    #[instrument(level = "trace", skip(hash))]
2962    pub async fn read_confirmed_block(
2963        &self,
2964        hash: CryptoHash,
2965    ) -> Result<Arc<ConfirmedBlock>, Error> {
2966        self.client
2967            .storage_client()
2968            .read_confirmed_block(hash)
2969            .await?
2970            .ok_or(Error::MissingConfirmedBlock(hash))
2971            .map(|b| b.into_std())
2972    }
2973
2974    #[instrument(level = "trace", skip(hash))]
2975    pub async fn read_certificate(
2976        &self,
2977        hash: CryptoHash,
2978    ) -> Result<CacheArc<ConfirmedBlockCertificate>, Error> {
2979        self.client
2980            .storage_client()
2981            .read_certificate(hash)
2982            .await?
2983            .ok_or(Error::ReadCertificatesError(vec![hash]))
2984    }
2985
2986    /// Handles any cross-chain requests for any pending outgoing messages.
2987    #[instrument(level = "trace")]
2988    pub async fn retry_pending_outgoing_messages(&self) -> Result<(), Error> {
2989        self.client
2990            .retry_pending_cross_chain_requests(self.chain_id)
2991            .await?;
2992        Ok(())
2993    }
2994
2995    #[instrument(level = "trace", skip(local_node))]
2996    async fn maybe_local_chain_info(
2997        &self,
2998        chain_id: ChainId,
2999        local_node: &LocalNodeClient<Env::Storage>,
3000    ) -> Result<Option<Box<ChainInfo>>, Error> {
3001        match local_node.chain_info(chain_id).await {
3002            Ok(info) => Ok(Some(info)),
3003            Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => Ok(None),
3004            Err(err) => Err(err.into()),
3005        }
3006    }
3007
3008    #[instrument(level = "trace", skip(chain_id, local_node))]
3009    async fn local_next_block_height(
3010        &self,
3011        chain_id: ChainId,
3012        local_node: &LocalNodeClient<Env::Storage>,
3013    ) -> Result<BlockHeight, Error> {
3014        Ok(self
3015            .maybe_local_chain_info(chain_id, local_node)
3016            .await?
3017            .map_or(BlockHeight::ZERO, |info| info.next_block_height))
3018    }
3019
3020    /// Returns the next height we expect to receive from the given sender chain, according to the
3021    /// local inbox.
3022    #[instrument(level = "trace")]
3023    async fn local_next_height_to_receive(&self, origin: ChainId) -> Result<BlockHeight, Error> {
3024        Ok(self
3025            .client
3026            .local_node
3027            .get_inbox_next_height(self.chain_id, origin)
3028            .await?)
3029    }
3030
3031    #[instrument(level = "trace", skip(remote_node, local_node, notification))]
3032    async fn process_notification(
3033        &self,
3034        remote_node: RemoteNode<Env::ValidatorNode>,
3035        local_node: LocalNodeClient<Env::Storage>,
3036        notification: Notification,
3037    ) -> Result<(), Error> {
3038        let listening_mode = self.client.chain_mode(notification.chain_id);
3039        let relevant = listening_mode
3040            .as_ref()
3041            .is_some_and(|mode| mode.is_relevant(&notification.reason));
3042        if !relevant {
3043            tracing::trace!(
3044                chain_id = %notification.chain_id,
3045                reason = ?notification.reason,
3046                ?listening_mode,
3047                "Ignoring notification due to listening mode"
3048            );
3049            return Ok(());
3050        }
3051        match notification.reason {
3052            Reason::NewIncomingBundle { origin, height } => {
3053                if self.options.message_policy.ignores_origin(&origin) {
3054                    trace!(
3055                        chain_id = %self.chain_id,
3056                        %origin,
3057                        %height,
3058                        "Skipping NewIncomingBundle notification: origin filtered by message_policy"
3059                    );
3060                    return Ok(());
3061                }
3062                if self.local_next_height_to_receive(origin).await? > height {
3063                    debug!(
3064                        chain_id = %self.chain_id,
3065                        "Accepting redundant notification for new message"
3066                    );
3067                    return Ok(());
3068                }
3069                self.client
3070                    .download_sender_block_with_sending_ancestors(
3071                        self.chain_id,
3072                        origin,
3073                        height,
3074                        &remote_node,
3075                    )
3076                    .await?;
3077                if self.local_next_height_to_receive(origin).await? <= height {
3078                    info!(
3079                        chain_id = %self.chain_id,
3080                        "NewIncomingBundle: Fail to synchronize new message after notification"
3081                    );
3082                }
3083            }
3084            Reason::NewBlock { height, .. } => {
3085                let chain_id = notification.chain_id;
3086                let local_height = self.local_next_block_height(chain_id, &local_node).await?;
3087                if local_height > height {
3088                    debug!(
3089                        chain_id = %self.chain_id,
3090                        "Accepting redundant notification for new block"
3091                    );
3092                    return Ok(());
3093                }
3094                // The below will only happen in modes other than EventsOnly, as the
3095                // NewBlock notification is only relevant to other modes.
3096                self.client
3097                    .synchronize_chain_state_from(&remote_node, chain_id)
3098                    .await?;
3099                if self.local_next_block_height(chain_id, &local_node).await? <= height {
3100                    error!("NewBlock: Fail to synchronize new block after notification");
3101                }
3102                trace!(
3103                    chain_id = %self.chain_id,
3104                    %height,
3105                    "NewBlock: processed notification",
3106                );
3107            }
3108            Reason::NewEvents {
3109                height, block_hash, ..
3110            } => {
3111                let chain_id = notification.chain_id;
3112                let local_height = self.local_next_block_height(chain_id, &local_node).await?;
3113                if local_height > height {
3114                    debug!(
3115                        chain_id = %self.chain_id,
3116                        "Accepting redundant notification for new events"
3117                    );
3118                    return Ok(());
3119                }
3120                trace!(
3121                    chain_id = %self.chain_id,
3122                    %height,
3123                    "NewEvents: processing notification"
3124                );
3125                // Use the subscribed streams from EventsOnly mode to figure out which
3126                // streams we are interested in.
3127                let relevant_streams = match self.listening_mode() {
3128                    Some(ListeningMode::EventsOnly(subscribed)) => subscribed,
3129                    // other cases should be unreachable, as the NewEvents notification is
3130                    // only relevant in the EventsOnly mode
3131                    _ => unreachable!(),
3132                };
3133                self.client
3134                    .download_event_bearing_blocks(
3135                        self.chain_id,
3136                        BTreeSet::from([(height, block_hash)]),
3137                        local_height,
3138                        &relevant_streams,
3139                        &remote_node,
3140                    )
3141                    .await?;
3142            }
3143            Reason::NewRound { height, round } => {
3144                let chain_id = notification.chain_id;
3145                if let Some(info) = self.maybe_local_chain_info(chain_id, &local_node).await? {
3146                    if (info.next_block_height, info.manager.current_round) >= (height, round) {
3147                        debug!(
3148                            chain_id = %self.chain_id,
3149                            "Accepting redundant notification for new round"
3150                        );
3151                        return Ok(());
3152                    }
3153                }
3154                self.client
3155                    .synchronize_chain_state_from(&remote_node, chain_id)
3156                    .await?;
3157                let Some(info) = self.maybe_local_chain_info(chain_id, &local_node).await? else {
3158                    error!(
3159                        chain_id = %self.chain_id,
3160                        "NewRound: Fail to read local chain info for {chain_id}"
3161                    );
3162                    return Ok(());
3163                };
3164                if (info.next_block_height, info.manager.current_round) < (height, round) {
3165                    info!(
3166                        chain_id = %self.chain_id,
3167                        "NewRound: Fail to synchronize new block after notification"
3168                    );
3169                }
3170            }
3171            Reason::BlockExecuted { .. } => {
3172                // No action needed.
3173            }
3174        }
3175        Ok(())
3176    }
3177
3178    /// Returns whether this chain is tracked by the client, i.e. we are updating its inbox.
3179    pub fn is_tracked(&self) -> bool {
3180        self.client.is_tracked(self.chain_id)
3181    }
3182
3183    /// Returns the listening mode for this chain, if it is tracked.
3184    pub fn listening_mode(&self) -> Option<ListeningMode> {
3185        self.client.chain_mode(self.chain_id)
3186    }
3187
3188    /// Spawns a task that listens to notifications about the current chain from all validators,
3189    /// and synchronizes the local state accordingly.
3190    ///
3191    /// The listening mode must be set in `Client::chain_modes` before calling this method.
3192    #[instrument(level = "trace", fields(chain_id = ?self.chain_id))]
3193    pub async fn listen(
3194        &self,
3195    ) -> Result<(impl Future<Output = ()>, AbortOnDrop, NotificationStream), Error> {
3196        use future::FutureExt as _;
3197
3198        async fn await_while_polling<F: FusedFuture>(
3199            future: F,
3200            background_work: impl FusedStream<Item = ()>,
3201        ) -> F::Output {
3202            tokio::pin!(future);
3203            tokio::pin!(background_work);
3204            loop {
3205                futures::select! {
3206                    _ = background_work.next() => (),
3207                    result = future => return result,
3208                }
3209            }
3210        }
3211
3212        let mut senders = HashMap::new();
3213        let mut circuit_breakers: HashMap<ValidatorPublicKey, CircuitBreakerState> = HashMap::new();
3214        let notifications = self.subscribe()?;
3215        let (abortable_notifications, abort) = stream::abortable(self.subscribe()?);
3216
3217        // Beware: if this future ceases to make progress, notification processing will
3218        // deadlock, because of the issue described in
3219        // https://github.com/linera-io/linera-protocol/pull/1173.
3220
3221        // TODO(#2013): replace this lock with an asynchronous communication channel
3222
3223        let mut process_notifications = FuturesUnordered::new();
3224
3225        match self
3226            .update_notification_streams(&mut senders, &mut circuit_breakers)
3227            .await
3228        {
3229            Ok(handler) => process_notifications.push(handler),
3230            Err(error) => error!("Failed to update committee: {error}"),
3231        };
3232
3233        let this = self.clone();
3234        let update_streams = async move {
3235            let mut abortable_notifications = abortable_notifications.fuse();
3236
3237            while let Some(notification) =
3238                await_while_polling(abortable_notifications.next(), &mut process_notifications)
3239                    .await
3240            {
3241                if let Reason::NewBlock { .. } = notification.reason {
3242                    match Box::pin(await_while_polling(
3243                        this.update_notification_streams(&mut senders, &mut circuit_breakers)
3244                            .fuse(),
3245                        &mut process_notifications,
3246                    ))
3247                    .await
3248                    {
3249                        Ok(handler) => process_notifications.push(handler),
3250                        Err(error) => error!("Failed to update committee: {error}"),
3251                    }
3252                }
3253            }
3254
3255            for abort in senders.into_values() {
3256                abort.abort();
3257            }
3258
3259            let () = process_notifications.collect().await;
3260        }
3261        .in_current_span();
3262
3263        Ok((update_streams, AbortOnDrop(abort), notifications))
3264    }
3265
3266    #[instrument(level = "trace", skip(senders, circuit_breakers))]
3267    async fn update_notification_streams(
3268        &self,
3269        senders: &mut HashMap<ValidatorPublicKey, AbortHandle>,
3270        circuit_breakers: &mut HashMap<ValidatorPublicKey, CircuitBreakerState>,
3271    ) -> Result<impl Future<Output = ()>, Error> {
3272        let initial_probe_interval = self
3273            .options
3274            .notification_circuit_breaker_initial_probe_interval;
3275        let max_probe_interval = self.options.notification_circuit_breaker_max_probe_interval;
3276        let (nodes, local_node) = {
3277            // For EventsOnly chains we may not have the chain's own committee locally,
3278            // and attempting to fetch it would trigger a full sync. Use the admin
3279            // committee instead — we only need it to know which validators to connect to.
3280            let committee = if self
3281                .listening_mode()
3282                .is_some_and(|m| m.should_sync_chain_state())
3283            {
3284                self.local_committee().await?
3285            } else {
3286                self.client.admin_committee().await?.1
3287            };
3288            let nodes = self
3289                .client
3290                .validator_node_provider()
3291                .make_nodes(&committee)?
3292                .collect::<HashMap<_, _>>();
3293            (nodes, self.client.local_node.clone())
3294        };
3295        // Detect circuit breaker state transitions before cleaning up senders.
3296        for (validator, abort) in senders.iter() {
3297            if abort.is_aborted() && nodes.contains_key(validator) {
3298                if let Some(state) = circuit_breakers.get_mut(validator) {
3299                    // Was probing -> probe failed -> escalate interval.
3300                    state.probe_interval = (state.probe_interval * 2).min(max_probe_interval);
3301                    state.next_probe_at = Instant::now() + state.probe_interval;
3302                    warn!(
3303                        %validator,
3304                        chain_id = %self.chain_id,
3305                        next_probe_in = ?state.probe_interval,
3306                        "Validator still unhealthy after probe; increasing probe interval"
3307                    );
3308                } else {
3309                    // First failure -> enter circuit breaker.
3310                    circuit_breakers.insert(
3311                        *validator,
3312                        CircuitBreakerState {
3313                            next_probe_at: Instant::now() + initial_probe_interval,
3314                            probe_interval: initial_probe_interval,
3315                        },
3316                    );
3317                    error!(
3318                        %validator,
3319                        chain_id = %self.chain_id,
3320                        next_probe_in = ?initial_probe_interval,
3321                        "Validator notification stream ended; entering circuit breaker"
3322                    );
3323                }
3324            } else if !abort.is_aborted() && circuit_breakers.contains_key(validator) {
3325                // Stream alive while in circuit breaker -> probe succeeded -> recovered.
3326                info!(
3327                    %validator,
3328                    chain_id = %self.chain_id,
3329                    "Validator recovered from circuit breaker"
3330                );
3331                circuit_breakers.remove(validator);
3332            }
3333        }
3334
3335        senders.retain(|validator, abort| {
3336            if !nodes.contains_key(validator) {
3337                abort.abort();
3338            }
3339            !abort.is_aborted()
3340        });
3341        circuit_breakers.retain(|validator, _| nodes.contains_key(validator));
3342
3343        let validator_tasks = FuturesUnordered::new();
3344        for (public_key, node) in nodes {
3345            let hash_map::Entry::Vacant(entry) = senders.entry(public_key) else {
3346                continue;
3347            };
3348
3349            // Circuit breaker: skip if not time to probe yet.
3350            if let Some(state) = circuit_breakers.get(&public_key) {
3351                if Instant::now() < state.next_probe_at {
3352                    continue;
3353                }
3354                debug!(
3355                    validator = %public_key,
3356                    chain_id = %self.chain_id,
3357                    "Probing unhealthy validator"
3358                );
3359            }
3360
3361            let address = node.address();
3362            let this = self.clone();
3363            let listening_mode_for_sync = self.listening_mode();
3364            let stream = stream::once({
3365                let node = node.clone();
3366                async move {
3367                    let stream = node.subscribe(vec![this.chain_id]).await?;
3368                    // Only now the notification stream is established. We may have missed
3369                    // notifications since the last time we synchronized.
3370                    let remote_node = RemoteNode { public_key, node };
3371                    if listening_mode_for_sync
3372                        .as_ref()
3373                        .is_some_and(|mode| mode.should_sync_chain_state())
3374                    {
3375                        this.client
3376                            .synchronize_chain_state_from(&remote_node, this.chain_id)
3377                            .await?;
3378                    } else {
3379                        // For EventsOnly chains, do a lightweight initial sync:
3380                        // query the validator for the latest event-bearing blocks
3381                        // for our subscribed streams, then download them sparsely.
3382                        if let Some(ListeningMode::EventsOnly(subscribed)) =
3383                            listening_mode_for_sync.as_ref()
3384                        {
3385                            if let Err(error) = this
3386                                .client
3387                                .sync_events_from_node(this.chain_id, subscribed, &remote_node)
3388                                .await
3389                            {
3390                                debug!(
3391                                    chain_id = %this.chain_id,
3392                                    %error,
3393                                    "Failed initial sparse sync for EventsOnly chain"
3394                                );
3395                            }
3396                        }
3397                    }
3398                    Ok::<_, Error>(stream)
3399                }
3400            })
3401            .filter_map(move |result| {
3402                let address = address.clone();
3403                async move {
3404                    if let Err(error) = &result {
3405                        info!(?error, address, "could not connect to validator");
3406                    } else {
3407                        debug!(address, "connected to validator");
3408                    }
3409                    result.ok()
3410                }
3411            })
3412            .flatten();
3413            let (stream, abort) = stream::abortable(stream);
3414            let mut stream = Box::pin(stream);
3415            let abort_on_exit = abort.clone();
3416            let this = self.clone();
3417            let local_node = local_node.clone();
3418            let remote_node = RemoteNode { public_key, node };
3419            validator_tasks.push(async move {
3420                while let Some(notification) = stream.next().await {
3421                    if let Err(error) = this
3422                        .process_notification(
3423                            remote_node.clone(),
3424                            local_node.clone(),
3425                            notification.clone(),
3426                        )
3427                        .await
3428                    {
3429                        tracing::info!(
3430                            chain_id = %this.chain_id,
3431                            address = remote_node.address(),
3432                            ?notification,
3433                            %error,
3434                            "failed to process notification",
3435                        );
3436                    }
3437                }
3438                warn!(
3439                    chain_id = %this.chain_id,
3440                    address = remote_node.address(),
3441                    "Validator notification stream ended"
3442                );
3443                abort_on_exit.abort();
3444            });
3445            entry.insert(abort);
3446        }
3447        Ok(validator_tasks.collect())
3448    }
3449
3450    /// Attempts to update a validator with the local information.
3451    #[instrument(level = "trace", skip(remote_node))]
3452    pub async fn sync_validator(&self, remote_node: Env::ValidatorNode) -> Result<(), Error> {
3453        let validator_next_block_height = match remote_node
3454            .handle_chain_info_query(ChainInfoQuery::new(self.chain_id))
3455            .await
3456        {
3457            Ok(info) => info.info.next_block_height,
3458            // The validator doesn't have this chain's description blob yet.
3459            Err(NodeError::BlobsNotFound(_)) => BlockHeight::ZERO,
3460            Err(err) => return Err(err.into()),
3461        };
3462        let local_next_block_height = self.chain_info().await?.next_block_height;
3463
3464        if validator_next_block_height >= local_next_block_height {
3465            debug!("Validator is up-to-date with local state");
3466            return Ok(());
3467        }
3468
3469        let heights = (validator_next_block_height.0..local_next_block_height.0)
3470            .map(BlockHeight)
3471            .collect::<Vec<_>>();
3472
3473        let certificates = self
3474            .client
3475            .storage_client()
3476            .read_certificates_by_heights(self.chain_id, &heights)
3477            .await?
3478            .into_iter()
3479            .flatten();
3480
3481        for certificate in certificates {
3482            let missing_blob_ids = match remote_node
3483                .handle_confirmed_certificate(
3484                    certificate.clone(),
3485                    CrossChainMessageDelivery::NonBlocking,
3486                )
3487                .await
3488            {
3489                Ok(_) => continue,
3490                Err(NodeError::BlobsNotFound(missing_blob_ids)) => missing_blob_ids,
3491                Err(err) => return Err(err.into()),
3492            };
3493            // The validator is missing blobs the certificate depends on
3494            // (including possibly the chain description). Upload and retry.
3495            let missing_blobs = self
3496                .client
3497                .storage_client()
3498                .read_blobs(&missing_blob_ids)
3499                .await?
3500                .into_iter()
3501                .flatten()
3502                .map(|b| b.into_std())
3503                .collect();
3504            remote_node.upload_blobs(missing_blobs).await?;
3505            remote_node
3506                .handle_confirmed_certificate(certificate, CrossChainMessageDelivery::NonBlocking)
3507                .await?;
3508        }
3509
3510        Ok(())
3511    }
3512}
3513
3514#[cfg(with_testing)]
3515impl<Env: Environment> ChainClient<Env> {
3516    pub async fn process_notification_from(
3517        &self,
3518        notification: Notification,
3519        validator: (ValidatorPublicKey, &str),
3520    ) {
3521        let mut node_list = self
3522            .client
3523            .validator_node_provider()
3524            .make_nodes_from_list(vec![validator])
3525            .unwrap();
3526        let (public_key, node) = node_list.next().unwrap();
3527        let remote_node = RemoteNode { node, public_key };
3528        let local_node = self.client.local_node.clone();
3529        self.process_notification(remote_node, local_node, notification)
3530            .await
3531            .unwrap();
3532    }
3533}