linera_core/client/chain_client/
mod.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4mod state;
5use std::{
6    collections::{hash_map, BTreeMap, BTreeSet, HashMap, HashSet},
7    convert::Infallible,
8    iter,
9    sync::Arc,
10};
11
12use custom_debug_derive::Debug;
13use futures::{
14    future::{self, Either, FusedFuture, Future},
15    stream::{self, AbortHandle, FusedStream, FuturesUnordered, StreamExt, TryStreamExt},
16};
17#[cfg(with_metrics)]
18use linera_base::prometheus_util::MeasureLatency as _;
19use linera_base::{
20    abi::Abi,
21    crypto::{signer, CryptoHash, Signer, ValidatorPublicKey},
22    data_types::{
23        Amount, ApplicationPermissions, ArithmeticError, Blob, BlobContent, BlockHeight,
24        ChainDescription, Epoch, MessagePolicy, Round, Timestamp,
25    },
26    ensure,
27    identifiers::{
28        Account, AccountOwner, ApplicationId, BlobId, BlobType, ChainId, EventId, IndexAndEvent,
29        ModuleId, StreamId,
30    },
31    ownership::{ChainOwnership, TimeoutConfig},
32    time::{Duration, Instant},
33};
34#[cfg(not(target_arch = "wasm32"))]
35use linera_base::{data_types::Bytecode, vm::VmRuntime};
36use linera_chain::{
37    data_types::{
38        BlockProposal, BundleExecutionPolicy, BundleFailurePolicy, ChainAndHeight, IncomingBundle,
39        ProposedBlock, Transaction,
40    },
41    manager::LockingBlock,
42    types::{
43        Block, ConfirmedBlock, ConfirmedBlockCertificate, Timeout, TimeoutCertificate,
44        ValidatedBlock,
45    },
46    ChainError, ChainExecutionContext,
47};
48use linera_execution::{
49    committee::Committee,
50    system::{
51        AdminOperation, OpenChainConfig, SystemOperation, EPOCH_STREAM_NAME,
52        REMOVED_EPOCH_STREAM_NAME,
53    },
54    ExecutionError, Operation, Query, QueryOutcome,
55};
56use linera_storage::{Clock as _, Storage as _};
57use linera_views::ViewError;
58use serde::Serialize;
59pub use state::State;
60use thiserror::Error;
61use tokio::sync::mpsc;
62use tokio_stream::wrappers::UnboundedReceiverStream;
63use tracing::{debug, error, info, instrument, trace, warn, Instrument as _};
64
65use super::{
66    received_log::ReceivedLogs, validator_trackers::ValidatorTrackers, AbortOnDrop, Client,
67    ListeningMode, PendingProposal, TimingType,
68};
69use crate::{
70    data_types::{ChainInfo, ChainInfoQuery, ClientOutcome, RoundTimeout},
71    environment::Environment,
72    local_node::{LocalNodeClient, LocalNodeError},
73    node::{
74        CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode,
75        ValidatorNodeProvider as _,
76    },
77    remote_node::RemoteNode,
78    updater::{communicate_with_quorum, CommunicateAction, CommunicationError},
79    worker::{Notification, Reason, WorkerError},
80};
81
82#[derive(Debug, Clone)]
83pub struct Options {
84    /// Maximum number of pending message bundles processed at a time in a block.
85    pub max_pending_message_bundles: usize,
86    /// Maximum number of message bundles to discard from a block proposal due to block limit
87    /// errors before discarding all remaining bundles.
88    ///
89    /// Discarded bundles can be retried in the next block.
90    pub max_block_limit_errors: u32,
91    /// Time budget for staging message bundles. When set, limits bundle execution by
92    /// wall-clock time, in addition to the count limit from `max_pending_message_bundles`.
93    pub staging_bundles_time_budget: Option<Duration>,
94    /// The policy for automatically handling incoming messages.
95    pub message_policy: MessagePolicy,
96    /// Chain IDs whose incoming bundles should be processed first when proposing a block.
97    pub priority_bundle_origins: HashSet<ChainId>,
98    /// Whether to block on cross-chain message delivery.
99    pub cross_chain_message_delivery: CrossChainMessageDelivery,
100    /// An additional delay, after reaching a quorum, to wait for additional validator signatures,
101    /// as a fraction of time taken to reach quorum.
102    pub quorum_grace_period: f64,
103    /// The delay when downloading a blob, after which we try a second validator.
104    pub blob_download_timeout: Duration,
105    /// The delay when downloading a batch of certificates, after which we try a second validator.
106    pub certificate_batch_download_timeout: Duration,
107    /// Maximum number of certificates that we download at a time from one validator when
108    /// synchronizing one of our chains.
109    pub certificate_download_batch_size: u64,
110    /// Maximum number of certificates read from local storage and uploaded to a validator
111    /// at a time when synchronizing a chain.
112    pub certificate_upload_batch_size: u64,
113    /// Maximum number of sender certificates we try to download and receive in one go
114    /// when syncing sender chains.
115    pub sender_certificate_download_batch_size: usize,
116    /// Maximum number of tasks that can be joined concurrently using buffer_unordered.
117    pub max_joined_tasks: usize,
118    /// Whether to allow creating blocks in the fast round. Fast blocks have lower latency but
119    /// must be used carefully so that there are never any conflicting fast block proposals.
120    pub allow_fast_blocks: bool,
121    /// Initial probe interval for the notification circuit breaker. When a validator's
122    /// notification stream exhausts retries, the circuit breaker waits this long before
123    /// probing again. Doubles on each failed probe.
124    pub notification_circuit_breaker_initial_probe_interval: Duration,
125    /// Maximum probe interval for the notification circuit breaker. The probe interval
126    /// doubles on each failure but is capped at this value.
127    pub notification_circuit_breaker_max_probe_interval: Duration,
128    /// Maximum number of event stream IDs to include in a single `PreviousEventBlocks`
129    /// request. Larger sets are split into multiple requests.
130    pub max_event_stream_queries: usize,
131}
132
133struct CircuitBreakerState {
134    next_probe_at: Instant,
135    probe_interval: Duration,
136}
137
138#[cfg(with_testing)]
139impl Options {
140    pub fn test_default() -> Self {
141        use super::{
142            DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE, DEFAULT_CERTIFICATE_UPLOAD_BATCH_SIZE,
143            DEFAULT_MAX_EVENT_STREAM_QUERIES, DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
144        };
145        use crate::DEFAULT_QUORUM_GRACE_PERIOD;
146
147        Options {
148            max_pending_message_bundles: 10,
149            max_block_limit_errors: 3,
150            staging_bundles_time_budget: None,
151            message_policy: MessagePolicy::default(),
152            priority_bundle_origins: HashSet::new(),
153            cross_chain_message_delivery: CrossChainMessageDelivery::NonBlocking,
154            quorum_grace_period: DEFAULT_QUORUM_GRACE_PERIOD,
155            blob_download_timeout: Duration::from_secs(1),
156            certificate_batch_download_timeout: Duration::from_secs(1),
157            certificate_download_batch_size: DEFAULT_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
158            certificate_upload_batch_size: DEFAULT_CERTIFICATE_UPLOAD_BATCH_SIZE,
159            sender_certificate_download_batch_size: DEFAULT_SENDER_CERTIFICATE_DOWNLOAD_BATCH_SIZE,
160            max_joined_tasks: 100,
161            allow_fast_blocks: false,
162            notification_circuit_breaker_initial_probe_interval: Duration::from_secs(300),
163            notification_circuit_breaker_max_probe_interval: Duration::from_secs(3600),
164            max_event_stream_queries: DEFAULT_MAX_EVENT_STREAM_QUERIES,
165        }
166    }
167}
168
169impl Options {
170    /// Builds the [`BundleExecutionPolicy`] based on the client options.
171    pub fn bundle_execution_policy(&self) -> BundleExecutionPolicy {
172        BundleExecutionPolicy {
173            on_failure: BundleFailurePolicy::AutoRetry {
174                max_failures: self.max_block_limit_errors,
175                never_reject_application_ids: Arc::new(
176                    self.message_policy.never_reject_application_ids.clone(),
177                ),
178            },
179            time_budget: self.staging_bundles_time_budget,
180        }
181    }
182}
183
184/// Client to operate a chain by interacting with validators and the given local storage
185/// implementation.
186/// * The chain being operated is called the "local chain" or just the "chain".
187/// * As a rule, operations are considered successful (and communication may stop) when
188///   they succeeded in gathering a quorum of responses.
189#[derive(Debug)]
190pub struct ChainClient<Env: Environment> {
191    /// The Linera [`Client`] that manages operations for this chain client.
192    #[debug(skip)]
193    pub(crate) client: Arc<Client<Env>>,
194    /// The off-chain chain ID.
195    chain_id: ChainId,
196    /// The client options.
197    #[debug(skip)]
198    options: Options,
199    /// The preferred owner of the chain used to sign proposals.
200    /// `None` if we cannot propose on this chain.
201    preferred_owner: Option<AccountOwner>,
202    /// The next block height as read from the wallet.
203    initial_next_block_height: BlockHeight,
204    /// The last block hash as read from the wallet.
205    initial_block_hash: Option<CryptoHash>,
206    /// Optional timing sender for benchmarking.
207    timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
208    /// Sender chain IDs whose bundles were discarded due to the never-reject policy.
209    /// These origins are excluded from `process_inbox` until the client is restarted.
210    skipped_origins: Arc<papaya::HashSet<ChainId>>,
211}
212
213impl<Env: Environment> Clone for ChainClient<Env> {
214    fn clone(&self) -> Self {
215        Self {
216            client: self.client.clone(),
217            chain_id: self.chain_id,
218            options: self.options.clone(),
219            preferred_owner: self.preferred_owner,
220            initial_next_block_height: self.initial_next_block_height,
221            initial_block_hash: self.initial_block_hash,
222            timing_sender: self.timing_sender.clone(),
223            skipped_origins: self.skipped_origins.clone(),
224        }
225    }
226}
227
228/// Error type for [`ChainClient`].
229#[derive(Debug, Error)]
230pub enum Error {
231    #[error("Local node operation failed: {0}")]
232    LocalNodeError(#[from] LocalNodeError),
233
234    #[error("Remote node operation failed: {0}")]
235    RemoteNodeError(#[from] NodeError),
236
237    #[error(transparent)]
238    ArithmeticError(#[from] ArithmeticError),
239
240    #[error("Missing certificates: {0:?}")]
241    ReadCertificatesError(Vec<CryptoHash>),
242
243    #[error("Missing confirmed block: {0:?}")]
244    MissingConfirmedBlock(CryptoHash),
245
246    #[error("JSON (de)serialization error: {0}")]
247    JsonError(#[from] serde_json::Error),
248
249    #[error("Chain operation failed: {0}")]
250    ChainError(#[from] ChainError),
251
252    #[error(transparent)]
253    CommunicationError(#[from] CommunicationError<NodeError>),
254
255    #[error("Internal error within chain client: {0}")]
256    InternalError(&'static str),
257
258    #[error(
259        "Cannot accept a certificate from an unknown committee in the future. \
260         Please synchronize the local view of the admin chain"
261    )]
262    CommitteeSynchronizationError,
263
264    #[error("The local node is behind the trusted state in wallet and needs synchronization with validators")]
265    WalletSynchronizationError,
266
267    #[error("The state of the client is incompatible with the proposed block: {0}")]
268    BlockProposalError(&'static str),
269
270    #[error(
271        "Cannot accept a certificate from a committee that was retired. \
272         Try a newer certificate from the same origin"
273    )]
274    CommitteeDeprecationError,
275
276    #[error("Protocol error within chain client: {0}")]
277    ProtocolError(&'static str),
278
279    #[error("Signer doesn't have key to sign for chain {0}")]
280    CannotFindKeyForChain(ChainId),
281
282    #[error("client is not configured to propose on chain {0}")]
283    NoAccountKeyConfigured(ChainId),
284
285    #[error("The chain client isn't owner on chain {0}")]
286    NotAnOwner(ChainId),
287
288    #[error(transparent)]
289    ViewError(#[from] ViewError),
290
291    #[error(
292        "Failed to download certificates and update local node to the next height \
293         {target_next_block_height} of chain {chain_id}"
294    )]
295    CannotDownloadCertificates {
296        chain_id: ChainId,
297        target_next_block_height: BlockHeight,
298    },
299
300    #[error(transparent)]
301    BcsError(#[from] bcs::Error),
302
303    #[error(
304        "Unexpected quorum: validators voted for block hash {hash} in {round}, \
305         expected block hash {expected_hash} in {expected_round}"
306    )]
307    UnexpectedQuorum {
308        hash: CryptoHash,
309        round: Round,
310        expected_hash: CryptoHash,
311        expected_round: Round,
312    },
313
314    #[error("signer error: {0:?}")]
315    Signer(#[source] Box<dyn signer::Error>),
316
317    #[error("Cannot revoke the current epoch {0}")]
318    CannotRevokeCurrentEpoch(Epoch),
319
320    #[error("Epoch is already revoked")]
321    EpochAlreadyRevoked,
322
323    #[error("Failed to download missing sender blocks from chain {chain_id} at height {height}")]
324    CannotDownloadMissingSenderBlock {
325        chain_id: ChainId,
326        height: BlockHeight,
327    },
328
329    #[error(
330        "A different block was already committed at this height. \
331         The committed certificate hash is {0}"
332    )]
333    Conflict(CryptoHash),
334
335    #[error(
336        "Execution outcome mismatch: AutoRetry and committed execution produced \
337         different outcomes for the same block"
338    )]
339    ExecutionOutcomeMismatch,
340}
341
342impl From<Infallible> for Error {
343    fn from(infallible: Infallible) -> Self {
344        match infallible {}
345    }
346}
347
348impl Error {
349    pub fn signer_failure(err: impl signer::Error + 'static) -> Self {
350        Self::Signer(Box::new(err))
351    }
352}
353
354impl<Env: Environment> ChainClient<Env> {
355    pub fn new(
356        client: Arc<Client<Env>>,
357        chain_id: ChainId,
358        options: Options,
359        initial_block_hash: Option<CryptoHash>,
360        initial_next_block_height: BlockHeight,
361        preferred_owner: Option<AccountOwner>,
362        timing_sender: Option<mpsc::UnboundedSender<(u64, TimingType)>>,
363    ) -> Self {
364        ChainClient {
365            client,
366            chain_id,
367            options,
368            preferred_owner,
369            initial_block_hash,
370            initial_next_block_height,
371            timing_sender,
372            skipped_origins: Arc::new(papaya::HashSet::new()),
373        }
374    }
375
376    /// Returns whether this chain is in follow-only mode.
377    pub fn is_follow_only(&self) -> bool {
378        self.client.is_chain_follow_only(self.chain_id)
379    }
380
381    /// Returns the proposal mutex for this chain.
382    ///
383    /// The mutex serializes block proposals and holds the pending proposal (if any).
384    #[instrument(level = "trace", skip(self))]
385    fn proposal_mutex(&self) -> Arc<tokio::sync::Mutex<Option<PendingProposal>>> {
386        self.client
387            .chains
388            .pin()
389            .get(&self.chain_id)
390            .expect("Chain client constructed for invalid chain")
391            .proposal_mutex()
392    }
393
394    /// Returns the pending proposal, if any.
395    #[instrument(level = "trace", skip(self))]
396    pub async fn pending_proposal(&self) -> Option<PendingProposal> {
397        self.proposal_mutex().lock().await.clone()
398    }
399
400    /// Gets a reference to the client's signer instance.
401    #[instrument(level = "trace", skip(self))]
402    pub fn signer(&self) -> &impl Signer {
403        self.client.signer()
404    }
405
406    /// Returns whether the signer has a key for the given owner.
407    pub async fn has_key_for(&self, owner: &AccountOwner) -> Result<bool, Error> {
408        self.client.has_key_for(owner).await
409    }
410
411    /// Gets a mutable reference to the per-`ChainClient` options.
412    #[instrument(level = "trace", skip(self))]
413    pub fn options_mut(&mut self) -> &mut Options {
414        &mut self.options
415    }
416
417    /// Gets a reference to the per-`ChainClient` options.
418    #[instrument(level = "trace", skip(self))]
419    pub fn options(&self) -> &Options {
420        &self.options
421    }
422
423    /// Gets the ID of the associated chain.
424    #[instrument(level = "trace", skip(self))]
425    pub fn chain_id(&self) -> ChainId {
426        self.chain_id
427    }
428
429    /// Gets a clone of the timing sender for benchmarking.
430    pub fn timing_sender(&self) -> Option<mpsc::UnboundedSender<(u64, TimingType)>> {
431        self.timing_sender.clone()
432    }
433
434    /// Gets the ID of the admin chain.
435    #[instrument(level = "trace", skip(self))]
436    pub fn admin_chain_id(&self) -> ChainId {
437        self.client.admin_chain_id
438    }
439
440    /// Gets the currently preferred owner for signing the blocks.
441    #[instrument(level = "trace", skip(self))]
442    pub fn preferred_owner(&self) -> Option<AccountOwner> {
443        self.preferred_owner
444    }
445
446    /// Sets the new, preferred owner for signing the blocks.
447    #[instrument(level = "trace", skip(self))]
448    pub fn set_preferred_owner(&mut self, preferred_owner: AccountOwner) {
449        self.preferred_owner = Some(preferred_owner);
450    }
451
452    /// Obtains a `ChainStateView` for this client's chain.
453    #[instrument(level = "trace")]
454    pub async fn chain_state_view(
455        &self,
456    ) -> Result<crate::worker::ChainStateViewReadGuard<Env::Storage>, LocalNodeError> {
457        self.client.local_node.chain_state_view(self.chain_id).await
458    }
459
460    /// Returns chain IDs that this chain subscribes to.
461    #[instrument(level = "trace", skip(self))]
462    pub async fn event_stream_publishers(
463        &self,
464    ) -> Result<BTreeMap<ChainId, BTreeSet<StreamId>>, LocalNodeError> {
465        let subscriptions = self
466            .client
467            .local_node
468            .get_event_subscriptions(self.chain_id)
469            .await?;
470        let mut publishers = subscriptions.into_iter().fold(
471            BTreeMap::<ChainId, BTreeSet<StreamId>>::new(),
472            |mut map, ((chain_id, stream_id), _)| {
473                map.entry(chain_id).or_default().insert(stream_id);
474                map
475            },
476        );
477        if self.chain_id != self.client.admin_chain_id {
478            publishers.insert(
479                self.client.admin_chain_id,
480                vec![
481                    StreamId::system(EPOCH_STREAM_NAME),
482                    StreamId::system(REMOVED_EPOCH_STREAM_NAME),
483                ]
484                .into_iter()
485                .collect(),
486            );
487        }
488        Ok(publishers)
489    }
490
491    /// Subscribes to notifications from this client's chain.
492    #[instrument(level = "trace")]
493    pub fn subscribe(&self) -> Result<NotificationStream, LocalNodeError> {
494        self.subscribe_to(self.chain_id)
495    }
496
497    /// Subscribes to notifications from the specified chain.
498    #[instrument(level = "trace")]
499    pub fn subscribe_to(&self, chain_id: ChainId) -> Result<NotificationStream, LocalNodeError> {
500        Ok(Box::pin(UnboundedReceiverStream::new(
501            self.client.notifier.subscribe(vec![chain_id]),
502        )))
503    }
504
505    /// Returns the storage client used by this client's local node.
506    #[instrument(level = "trace")]
507    pub fn storage_client(&self) -> &Env::Storage {
508        self.client.storage_client()
509    }
510
511    /// Obtains the basic `ChainInfo` data for the local chain.
512    #[instrument(level = "trace")]
513    pub async fn chain_info(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
514        let query = ChainInfoQuery::new(self.chain_id);
515        let response = self
516            .client
517            .local_node
518            .handle_chain_info_query(query)
519            .await?;
520        Ok(response.info)
521    }
522
523    /// Obtains the basic `ChainInfo` data for the local chain, with chain manager values.
524    #[instrument(level = "trace")]
525    pub async fn chain_info_with_manager_values(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
526        let query = ChainInfoQuery::new(self.chain_id)
527            .with_manager_values()
528            .with_committees();
529        let response = self
530            .client
531            .local_node
532            .handle_chain_info_query(query)
533            .await?;
534        Ok(response.info)
535    }
536
537    /// Returns the chain's description. Fetches it from the validators if necessary.
538    pub async fn get_chain_description(&self) -> Result<ChainDescription, Error> {
539        self.client.get_chain_description(self.chain_id).await
540    }
541
542    /// Obtains up to `self.options.max_pending_message_bundles` pending message bundles for the
543    /// local chain.
544    #[instrument(level = "trace")]
545    async fn pending_message_bundles(&self) -> Result<Vec<IncomingBundle>, Error> {
546        if self.options.message_policy.is_ignore() {
547            // Ignore all messages.
548            return Ok(Vec::new());
549        }
550
551        let query = ChainInfoQuery::new(self.chain_id).with_pending_message_bundles();
552        let info = self
553            .client
554            .local_node
555            .handle_chain_info_query(query)
556            .await?
557            .info;
558        if self.preferred_owner.is_some_and(|owner| {
559            info.manager
560                .ownership
561                .is_super_owner_no_regular_owners(&owner)
562        }) {
563            // There are only super owners; they are expected to sync manually.
564            ensure!(
565                info.next_block_height >= self.initial_next_block_height,
566                Error::WalletSynchronizationError
567            );
568        }
569
570        let skipped = self.skipped_origins.pin();
571        let mut bundles = info
572            .requested_pending_message_bundles
573            .into_iter()
574            .filter_map(|bundle| bundle.apply_policy(&self.options.message_policy))
575            .filter(|bundle| !skipped.contains(&bundle.origin))
576            .collect::<Vec<_>>();
577        let priority_origins = &self.options.priority_bundle_origins;
578        bundles.sort_by(|a, b| {
579            let a_priority = priority_origins.contains(&a.origin);
580            let b_priority = priority_origins.contains(&b.origin);
581            b_priority
582                .cmp(&a_priority)
583                .then(a.bundle.timestamp.cmp(&b.bundle.timestamp))
584        });
585        bundles.truncate(self.options.max_pending_message_bundles);
586        Ok(bundles)
587    }
588
589    /// Returns an `UpdateStreams` operation that updates this client's chain about new events
590    /// in any of the streams its applications are subscribing to. Returns `None` if there are no
591    /// new events.
592    #[instrument(level = "trace")]
593    async fn collect_stream_updates(&self) -> Result<Option<Operation>, Error> {
594        // Load all our subscriptions.
595        let subscription_map = self
596            .client
597            .local_node
598            .get_event_subscriptions(self.chain_id)
599            .await?;
600        // Collect the indices of all new events.
601        let futures = subscription_map
602            .into_iter()
603            .filter(|((chain_id, _), _)| {
604                self.options
605                    .message_policy
606                    .restrict_chain_ids_to
607                    .as_ref()
608                    .is_none_or(|chain_set| chain_set.contains(chain_id))
609            })
610            .filter(|((_, stream_id), _)| {
611                self.options
612                    .message_policy
613                    .process_events_from_application_ids
614                    .as_ref()
615                    .is_none_or(|app_set| app_set.contains(&stream_id.application_id))
616            })
617            .map(|((chain_id, stream_id), subscriptions)| {
618                let client = self.client.clone();
619                async move {
620                    let next_expected_index = client
621                        .local_node
622                        .get_next_expected_event(chain_id, stream_id.clone())
623                        .await?;
624                    if let Some(next_index) = next_expected_index
625                        .filter(|next_index| *next_index > subscriptions.next_index)
626                    {
627                        Ok(Some((chain_id, stream_id, next_index)))
628                    } else {
629                        Ok::<_, Error>(None)
630                    }
631                }
632            });
633        let updates = futures::stream::iter(futures)
634            .buffer_unordered(self.options.max_joined_tasks)
635            .try_collect::<Vec<_>>()
636            .await?
637            .into_iter()
638            .flatten()
639            .collect::<Vec<_>>();
640        if updates.is_empty() {
641            return Ok(None);
642        }
643        Ok(Some(SystemOperation::UpdateStreams(updates).into()))
644    }
645
646    #[instrument(level = "trace")]
647    async fn chain_info_with_committees(&self) -> Result<Box<ChainInfo>, LocalNodeError> {
648        self.client.chain_info_with_committees(self.chain_id).await
649    }
650
651    /// Obtains the current epoch of the local chain as well as its set of trusted committees.
652    #[instrument(level = "trace")]
653    async fn epoch_and_committees(
654        &self,
655    ) -> Result<(Epoch, BTreeMap<Epoch, CryptoHash>), LocalNodeError> {
656        let info = self.chain_info_with_committees().await?;
657        let epoch = info.epoch;
658        let committees = info
659            .requested_committees
660            .ok_or(LocalNodeError::InvalidChainInfoResponse)?;
661        Ok((epoch, committees))
662    }
663
664    /// Obtains the committee for the current epoch of the local chain.
665    #[instrument(level = "trace")]
666    pub async fn local_committee(&self) -> Result<Arc<Committee>, Error> {
667        let info = match self.chain_info_with_committees().await {
668            Ok(info) => info,
669            Err(LocalNodeError::BlobsNotFound(_)) => {
670                self.synchronize_chain_state(self.chain_id).await?;
671                self.chain_info_with_committees().await?
672            }
673            Err(LocalNodeError::EventsNotFound(event_ids))
674                if event_ids
675                    .iter()
676                    .all(|event_id| event_id.stream_id == StreamId::system(EPOCH_STREAM_NAME)) =>
677            {
678                // `initialize_and_save_if_needed` couldn't start the chain because
679                // the admin chain's epoch events aren't synced yet.
680                self.synchronize_chain_state(self.client.admin_chain_id)
681                    .await?;
682                self.chain_info_with_committees().await?
683            }
684            Err(err) => return Err(err.into()),
685        };
686        let hash = info
687            .requested_committees
688            .as_ref()
689            .ok_or(LocalNodeError::InvalidChainInfoResponse)?
690            .get(&info.epoch)
691            .copied()
692            .ok_or(LocalNodeError::InactiveChain(self.chain_id))?;
693        Ok(self
694            .storage_client()
695            .get_or_load_committee_by_hash(hash)
696            .await
697            .map_err(LocalNodeError::from)?)
698    }
699
700    /// Obtains the committee for the latest epoch on the admin chain.
701    #[instrument(level = "trace")]
702    pub async fn admin_committee(&self) -> Result<(Epoch, Arc<Committee>), LocalNodeError> {
703        self.client.admin_committee().await
704    }
705
706    /// Obtains the identity of the current owner of the chain.
707    ///
708    /// Returns an error if we don't have the private key for the identity.
709    #[instrument(level = "trace")]
710    pub async fn identity(&self) -> Result<AccountOwner, Error> {
711        let Some(preferred_owner) = self.preferred_owner else {
712            return Err(Error::NoAccountKeyConfigured(self.chain_id));
713        };
714        let manager = self.chain_info().await?.manager;
715        ensure!(
716            manager.ownership.is_active(),
717            LocalNodeError::InactiveChain(self.chain_id)
718        );
719        let fallback_owners = if manager.ownership.has_fallback() {
720            self.local_committee()
721                .await?
722                .account_keys_and_weights()
723                .map(|(key, _)| AccountOwner::from(key))
724                .collect()
725        } else {
726            BTreeSet::new()
727        };
728
729        let is_owner = manager
730            .ownership
731            .can_propose_in_multi_leader_round(&preferred_owner)
732            || fallback_owners.contains(&preferred_owner);
733
734        if !is_owner {
735            warn!(
736                chain_id = %self.chain_id,
737                ownership = ?manager.ownership,
738                ?fallback_owners,
739                ?preferred_owner,
740                "The preferred owner is not configured as an owner of this chain",
741            );
742            return Err(Error::NotAnOwner(self.chain_id));
743        }
744
745        let has_signer = self.has_key_for(&preferred_owner).await?;
746
747        if !has_signer {
748            warn!(%self.chain_id, ?preferred_owner,
749                "Chain is one of the owners but its Signer instance doesn't contain the key",
750            );
751            return Err(Error::CannotFindKeyForChain(self.chain_id));
752        }
753
754        Ok(preferred_owner)
755    }
756
757    /// Prepares the chain for a new owner by fetching the chain description and validating access.
758    ///
759    /// This is useful when assigning a chain to a client that may not have the owner key,
760    /// e.g. when a faucet creates a chain with `open_multi_leader_rounds`.
761    ///
762    /// Returns the chain info if the owner can propose on this chain (either because they are
763    /// an owner, or because `open_multi_leader_rounds` is enabled).
764    #[instrument(level = "trace")]
765    pub async fn prepare_for_owner(&self, owner: AccountOwner) -> Result<Box<ChainInfo>, Error> {
766        ensure!(
767            self.has_key_for(&owner).await?,
768            Error::CannotFindKeyForChain(self.chain_id)
769        );
770        // Ensure we have the chain description blob.
771        self.client
772            .get_chain_description_blob(self.chain_id)
773            .await?;
774
775        // Get chain info.
776        let info = self.chain_info().await?;
777
778        // Validate that the owner can propose on this chain.
779        ensure!(
780            info.manager
781                .ownership
782                .can_propose_in_multi_leader_round(&owner),
783            Error::NotAnOwner(self.chain_id)
784        );
785
786        Ok(info)
787    }
788
789    /// Prepares the chain for the next operation, i.e. makes sure we have synchronized it up to
790    /// its current height.
791    #[instrument(level = "trace")]
792    pub async fn prepare_chain(&self) -> Result<Box<ChainInfo>, Error> {
793        #[cfg(with_metrics)]
794        let _latency = super::metrics::PREPARE_CHAIN_LATENCY.measure_latency();
795
796        let mut info = self.synchronize_to_known_height().await?;
797
798        if self.preferred_owner.is_none_or(|owner| {
799            !info
800                .manager
801                .ownership
802                .is_super_owner_no_regular_owners(&owner)
803        }) {
804            // If we are not a super owner or there are regular owners, we could be missing recent
805            // certificates created by other clients. Further synchronize blocks from the network.
806            // This is a best-effort that depends on network conditions.
807            info = self.client.synchronize_chain_state(self.chain_id).await?;
808        }
809
810        if info.epoch > self.client.admin_committees().await?.0 {
811            self.client
812                .synchronize_chain_state(self.client.admin_chain_id)
813                .await?;
814        }
815
816        Ok(info)
817    }
818
819    // Verifies that our local storage contains enough history compared to the
820    // known block height. Otherwise, downloads the missing history from the
821    // network.
822    // The known height only differs if the wallet is ahead of storage.
823    async fn synchronize_to_known_height(&self) -> Result<Box<ChainInfo>, Error> {
824        let info = self
825            .client
826            .download_certificates(self.chain_id, self.initial_next_block_height)
827            .await?;
828        if info.next_block_height == self.initial_next_block_height {
829            // Check that our local node has the expected block hash.
830            ensure!(
831                self.initial_block_hash == info.block_hash,
832                Error::InternalError("Invalid chain of blocks in local node")
833            );
834        }
835        Ok(info)
836    }
837
838    /// Attempts to update all validators about the local chain.
839    #[instrument(level = "trace", skip(old_committee, latest_certificate))]
840    pub async fn update_validators(
841        &self,
842        old_committee: Option<&Committee>,
843        latest_certificate: Option<Arc<ConfirmedBlockCertificate>>,
844    ) -> Result<(), Error> {
845        let update_validators_start = linera_base::time::Instant::now();
846        // Communicate the new certificate now.
847        if let Some(old_committee) = old_committee {
848            self.communicate_chain_updates(old_committee, latest_certificate.clone())
849                .await?
850        };
851        if let Ok(new_committee) = self.local_committee().await {
852            if old_committee.is_none_or(|old| *new_committee != *old) {
853                // If the configuration just changed, communicate to the new committee as well.
854                // (This is actually more important that updating the previous committee.)
855                self.communicate_chain_updates(&new_committee, latest_certificate)
856                    .await?;
857            }
858        }
859        self.send_timing(update_validators_start, TimingType::UpdateValidators);
860        Ok(())
861    }
862
863    /// Broadcasts certified blocks to validators.
864    #[instrument(level = "trace", skip(committee))]
865    pub async fn communicate_chain_updates(
866        &self,
867        committee: &Committee,
868        latest_certificate: Option<Arc<ConfirmedBlockCertificate>>,
869    ) -> Result<(), Error> {
870        let delivery = self.options.cross_chain_message_delivery;
871        let height = self.chain_info().await?.next_block_height;
872        self.client
873            .communicate_chain_updates(
874                committee,
875                self.chain_id,
876                height,
877                delivery,
878                latest_certificate,
879            )
880            .await
881    }
882
883    /// Synchronizes all chains that any application on this chain subscribes to.
884    /// We always consider the admin chain a relevant publishing chain, for new epochs.
885    /// For event publisher chains, only event-bearing blocks are downloaded (partial sync).
886    async fn synchronize_publisher_chains(&self) -> Result<(), Error> {
887        let subscriptions = self
888            .client
889            .local_node
890            .get_event_subscriptions(self.chain_id)
891            .await?;
892        // Group subscribed streams by publisher chain.
893        let mut streams_by_chain = BTreeMap::<ChainId, BTreeSet<StreamId>>::new();
894        for ((chain_id, stream_id), _) in &subscriptions {
895            if *chain_id != self.chain_id {
896                streams_by_chain
897                    .entry(*chain_id)
898                    .or_default()
899                    .insert(stream_id.clone());
900            }
901        }
902        // Always fully sync the admin chain for epoch changes.
903        let admin_chain_id = self.client.admin_chain_id;
904        if admin_chain_id != self.chain_id {
905            self.client.synchronize_chain_state(admin_chain_id).await?;
906        }
907        // For event publisher chains, do a partial sync using previous_event_blocks.
908        let (_, committee) = self.admin_committee().await?;
909        let nodes = self.client.make_nodes(&committee)?;
910        let tasks = streams_by_chain
911            .into_iter()
912            .filter(|(chain_id, _)| *chain_id != admin_chain_id)
913            .map(|(chain_id, stream_ids)| {
914                self.sync_publisher_chain_events(chain_id, stream_ids, &nodes, &committee)
915            })
916            .collect::<Vec<_>>();
917        stream::iter(tasks)
918            .buffer_unordered(self.options.max_joined_tasks)
919            .collect::<Vec<_>>()
920            .await
921            .into_iter()
922            .collect::<Result<Vec<_>, _>>()?;
923        Ok(())
924    }
925
926    /// Downloads only event-bearing blocks for the given publisher chain and streams.
927    ///
928    /// Uses `communicate_with_quorum` so that each validator is queried for
929    /// `previous_event_blocks` and the claimed blocks are downloaded from that same
930    /// validator. Waiting for a quorum guarantees that any confirmed event is discovered,
931    /// since at least one honest validator in the quorum must have it.
932    async fn sync_publisher_chain_events(
933        &self,
934        publisher_chain_id: ChainId,
935        stream_ids: BTreeSet<StreamId>,
936        nodes: &[RemoteNode<Env::ValidatorNode>],
937        committee: &Committee,
938    ) -> Result<(), Error> {
939        let stream_ids_ref = &stream_ids;
940        communicate_with_quorum(
941            nodes,
942            committee,
943            |_: &()| (),
944            |remote_node| async move {
945                self.client
946                    .sync_events_from_node(publisher_chain_id, stream_ids_ref, &remote_node)
947                    .await
948            },
949            self.options.quorum_grace_period,
950        )
951        .await?;
952        Ok(())
953    }
954
955    /// Attempts to download new received certificates.
956    ///
957    /// This is a best effort: it will only find certificates that have been confirmed
958    /// amongst sufficiently many validators of the current committee of the target
959    /// chain.
960    ///
961    /// However, this should be the case whenever a sender's chain is still in use and
962    /// is regularly upgraded to new committees.
963    #[instrument(level = "debug", skip(self), fields(chain_id = %self.chain_id))]
964    pub async fn find_received_certificates(&self) -> Result<(), Error> {
965        debug!("starting find_received_certificates");
966        #[cfg(with_metrics)]
967        let _latency = super::metrics::FIND_RECEIVED_CERTIFICATES_LATENCY.measure_latency();
968        // Use network information from the local chain.
969        let chain_id = self.chain_id;
970        let (_, committee) = self.admin_committee().await?;
971        let nodes = self.client.make_nodes(&committee)?;
972
973        let trackers = self
974            .client
975            .local_node
976            .get_received_certificate_trackers(chain_id)
977            .await?;
978
979        trace!("find_received_certificates: read trackers");
980
981        let received_log_batches = Arc::new(std::sync::Mutex::new(Vec::new()));
982        // Proceed to downloading received logs.
983        let result = communicate_with_quorum(
984            &nodes,
985            &committee,
986            |_| (),
987            |remote_node| {
988                let client = &self.client;
989                let tracker = trackers.get(&remote_node.public_key).copied().unwrap_or(0);
990                let received_log_batches = Arc::clone(&received_log_batches);
991                Box::pin(async move {
992                    let batch = client
993                        .get_received_log_from_validator(chain_id, &remote_node, tracker)
994                        .await?;
995                    let mut batches = received_log_batches.lock().unwrap();
996                    batches.push((remote_node.public_key, batch));
997                    Ok(())
998                })
999            },
1000            self.options.quorum_grace_period,
1001        )
1002        .await;
1003
1004        if let Err(error) = result {
1005            error!(
1006                %error,
1007                "Failed to synchronize received_logs from at least a quorum of validators",
1008            );
1009        }
1010
1011        let received_logs: Vec<_> = {
1012            let mut received_log_batches = received_log_batches.lock().unwrap();
1013            std::mem::take(received_log_batches.as_mut())
1014        };
1015
1016        debug!(
1017            received_logs_len = %received_logs.len(),
1018            received_logs_total = %received_logs.iter().map(|x| x.1.len()).sum::<usize>(),
1019            "collected received logs"
1020        );
1021
1022        let (received_logs, mut validator_trackers) = {
1023            (
1024                ReceivedLogs::from_received_result(received_logs.clone()),
1025                ValidatorTrackers::new(received_logs, &trackers),
1026            )
1027        };
1028
1029        debug!(
1030            num_chains = %received_logs.num_chains(),
1031            num_certs = %received_logs.num_certs(),
1032            "find_received_certificates: total number of chains and certificates to sync",
1033        );
1034
1035        let max_blocks_per_chain =
1036            self.options.sender_certificate_download_batch_size / self.options.max_joined_tasks * 2;
1037        for received_log in received_logs.into_batches(
1038            self.options.sender_certificate_download_batch_size,
1039            max_blocks_per_chain,
1040        ) {
1041            validator_trackers = self
1042                .receive_sender_certificates(received_log, validator_trackers, &nodes)
1043                .await?;
1044
1045            self.update_received_certificate_trackers(&validator_trackers)
1046                .await;
1047        }
1048
1049        info!("find_received_certificates finished");
1050
1051        Ok(())
1052    }
1053
1054    async fn update_received_certificate_trackers(&self, trackers: &ValidatorTrackers) {
1055        let updated_trackers = trackers.to_map();
1056        trace!(?updated_trackers, "updated tracker values");
1057
1058        // Update the trackers.
1059        if let Err(error) = self
1060            .client
1061            .local_node
1062            .update_received_certificate_trackers(self.chain_id, updated_trackers)
1063            .await
1064        {
1065            error!(
1066                chain_id = %self.chain_id,
1067                %error,
1068                "Failed to update the certificate trackers",
1069            );
1070        }
1071    }
1072
1073    /// Downloads and processes or preprocesses the certificates for blocks sending messages to
1074    /// this chain that we are still missing.
1075    async fn receive_sender_certificates(
1076        &self,
1077        mut received_logs: ReceivedLogs,
1078        mut validator_trackers: ValidatorTrackers,
1079        nodes: &[RemoteNode<Env::ValidatorNode>],
1080    ) -> Result<ValidatorTrackers, Error> {
1081        debug!(
1082            num_chains = %received_logs.num_chains(),
1083            num_certs = %received_logs.num_certs(),
1084            "receive_sender_certificates: number of chains and certificates to sync",
1085        );
1086
1087        // Obtain the next block height we need in the local node, for each chain.
1088        let local_next_heights = self
1089            .client
1090            .local_node
1091            .next_outbox_heights(received_logs.chains(), self.chain_id)
1092            .await?;
1093
1094        validator_trackers.filter_out_already_known(&mut received_logs, &local_next_heights);
1095
1096        debug!(
1097            remaining_total_certificates = %received_logs.num_certs(),
1098            "receive_sender_certificates: computed remote_heights"
1099        );
1100
1101        let mut other_sender_chains = Vec::new();
1102        let (sender, mut receiver) = mpsc::unbounded_channel::<ChainAndHeight>();
1103
1104        let cert_futures = received_logs.heights_per_chain().into_iter().filter_map({
1105            let received_logs = &received_logs;
1106            let other_sender_chains = &mut other_sender_chains;
1107
1108            move |(sender_chain_id, remote_heights)| {
1109                if remote_heights.is_empty() {
1110                    // Our highest, locally executed block is higher than any block height
1111                    // from the current batch. Skip this batch, but remember to wait for
1112                    // the messages to be delivered to the inboxes.
1113                    other_sender_chains.push(sender_chain_id);
1114                    return None;
1115                };
1116                let remote_heights = remote_heights.into_iter().collect::<Vec<_>>();
1117                let sender = sender.clone();
1118                let client = self.client.clone();
1119                let nodes = nodes.to_vec();
1120                Some(async move {
1121                    client
1122                        .download_and_process_sender_chain(
1123                            sender_chain_id,
1124                            &nodes,
1125                            received_logs,
1126                            remote_heights,
1127                            sender,
1128                        )
1129                        .await
1130                })
1131            }
1132        });
1133
1134        future::join(
1135            stream::iter(cert_futures)
1136                .buffer_unordered(self.options.max_joined_tasks)
1137                .collect::<()>(),
1138            async {
1139                while let Some(chain_and_height) = receiver.recv().await {
1140                    validator_trackers.downloaded_cert(chain_and_height);
1141                }
1142            },
1143        )
1144        .await;
1145
1146        debug!(
1147            num_other_chains = %other_sender_chains.len(),
1148            "receive_sender_certificates: processing certificates finished"
1149        );
1150
1151        // Certificates for these chains were omitted from `certificates` because they were
1152        // already processed locally. If they were processed in a concurrent task, it is not
1153        // guaranteed that their cross-chain messages were already handled.
1154        self.retry_pending_cross_chain_requests_from_sender_chains(nodes, other_sender_chains)
1155            .await;
1156
1157        debug!("receive_sender_certificates: finished processing other_sender_chains");
1158
1159        Ok(validator_trackers)
1160    }
1161
1162    /// Retries cross chain requests on the chains which may have been processed on
1163    /// another task without the messages being correctly handled. Fetches missing blobs from
1164    /// the given nodes if necessary.
1165    async fn retry_pending_cross_chain_requests_from_sender_chains(
1166        &self,
1167        nodes: &[RemoteNode<Env::ValidatorNode>],
1168        other_sender_chains: Vec<ChainId>,
1169    ) {
1170        let stream = other_sender_chains
1171            .into_iter()
1172            .map(|chain_id| async move {
1173                if let Err(error) = match self
1174                    .client
1175                    .retry_pending_cross_chain_requests(chain_id)
1176                    .await
1177                {
1178                    Ok(()) => Ok(()),
1179                    Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
1180                        if let Err(error) = self
1181                            .client
1182                            .update_local_node_with_blobs_from(blob_ids.clone(), nodes)
1183                            .await
1184                        {
1185                            error!(
1186                                ?blob_ids,
1187                                %error,
1188                                "Error while attempting to download blobs during retrying outgoing \
1189                                messages"
1190                            );
1191                        }
1192                        self.client
1193                            .retry_pending_cross_chain_requests(chain_id)
1194                            .await
1195                    }
1196                    err => err,
1197                } {
1198                    error!(
1199                        %chain_id,
1200                        %error,
1201                        "Failed to retry outgoing messages from chain"
1202                    );
1203                }
1204            })
1205            .collect::<FuturesUnordered<_>>();
1206        stream.for_each(future::ready).await;
1207    }
1208
1209    /// Sends money.
1210    #[instrument(level = "trace")]
1211    pub async fn transfer(
1212        &self,
1213        owner: AccountOwner,
1214        amount: Amount,
1215        recipient: Account,
1216    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1217        // TODO(#467): check the balance of `owner` before signing any block proposal.
1218        self.execute_operation(SystemOperation::Transfer {
1219            owner,
1220            recipient,
1221            amount,
1222        })
1223        .await
1224    }
1225
1226    /// Verify if a data blob is readable from storage.
1227    // TODO(#2490): Consider removing or renaming this.
1228    #[instrument(level = "trace")]
1229    pub async fn read_data_blob(
1230        &self,
1231        hash: CryptoHash,
1232    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1233        let blob_id = BlobId {
1234            hash,
1235            blob_type: BlobType::Data,
1236        };
1237        self.execute_operation(SystemOperation::VerifyBlob { blob_id })
1238            .await
1239    }
1240
1241    /// Claims money in a remote chain.
1242    #[instrument(level = "trace")]
1243    pub async fn claim(
1244        &self,
1245        owner: AccountOwner,
1246        target_id: ChainId,
1247        recipient: Account,
1248        amount: Amount,
1249    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1250        self.execute_operation(SystemOperation::Claim {
1251            owner,
1252            target_id,
1253            recipient,
1254            amount,
1255        })
1256        .await
1257    }
1258
1259    /// Requests a leader timeout vote from all validators. If a quorum signs it, creates a
1260    /// certificate and sends it to all validators, to make them enter the next round.
1261    #[instrument(level = "trace")]
1262    pub async fn request_leader_timeout(&self) -> Result<TimeoutCertificate, Error> {
1263        let chain_id = self.chain_id;
1264        let info = self.chain_info().await?;
1265        let committee = self.local_committee().await?;
1266        let height = info.next_block_height;
1267        let round = info.manager.current_round;
1268        let action = CommunicateAction::RequestTimeout {
1269            height,
1270            round,
1271            chain_id,
1272        };
1273        let value = Timeout::new(chain_id, height, info.epoch);
1274        let certificate = Box::new(
1275            self.client
1276                .communicate_chain_action(&committee, action, value)
1277                .await?,
1278        );
1279        self.client.handle_certificate(*certificate.clone()).await?;
1280        // The block height didn't increase, but this will communicate the timeout as well.
1281        self.client
1282            .communicate_chain_updates(
1283                &committee,
1284                chain_id,
1285                height,
1286                CrossChainMessageDelivery::NonBlocking,
1287                None,
1288            )
1289            .await?;
1290        Ok(*certificate)
1291    }
1292
1293    /// Downloads and processes any certificates we are missing for the given chain.
1294    #[instrument(level = "trace", skip_all)]
1295    pub async fn synchronize_chain_state(
1296        &self,
1297        chain_id: ChainId,
1298    ) -> Result<Box<ChainInfo>, Error> {
1299        self.client.synchronize_chain_state(chain_id).await
1300    }
1301
1302    /// Downloads and processes any certificates we are missing for this chain, from the given
1303    /// committee.
1304    #[instrument(level = "trace", skip_all)]
1305    pub async fn synchronize_chain_state_from_committee(
1306        &self,
1307        committee: Arc<Committee>,
1308    ) -> Result<Box<ChainInfo>, Error> {
1309        self.client
1310            .synchronize_chain_from_committee(self.chain_id, committee)
1311            .await
1312    }
1313
1314    /// Executes a list of operations.
1315    #[instrument(level = "trace", skip(operations, blobs))]
1316    pub async fn execute_operations(
1317        &self,
1318        operations: Vec<Operation>,
1319        blobs: Vec<Blob>,
1320    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1321        let timing_start = linera_base::time::Instant::now();
1322
1323        let result = loop {
1324            let execute_block_start = linera_base::time::Instant::now();
1325            // TODO(#2066): Remove boxing once the call-stack is shallower
1326            match Box::pin(self.execute_block(operations.clone(), blobs.clone())).await {
1327                Ok(ClientOutcome::Committed(certificate)) => {
1328                    self.send_timing(execute_block_start, TimingType::ExecuteBlock);
1329                    break Ok(ClientOutcome::Committed(certificate));
1330                }
1331                Ok(ClientOutcome::WaitForTimeout(timeout)) => {
1332                    break Ok(ClientOutcome::WaitForTimeout(timeout));
1333                }
1334                Ok(ClientOutcome::Conflict(certificate)) => {
1335                    info!(
1336                        height = %certificate.block().header.height,
1337                        "Another block was committed."
1338                    );
1339                    break Ok(ClientOutcome::Conflict(certificate));
1340                }
1341                Err(Error::CommunicationError(CommunicationError::Trusted(
1342                    NodeError::UnexpectedBlockHeight {
1343                        expected_block_height,
1344                        found_block_height,
1345                    },
1346                ))) if expected_block_height > found_block_height => {
1347                    tracing::info!(
1348                        chain_id = %self.chain_id,
1349                        "Local state is outdated; synchronizing chain"
1350                    );
1351                    self.synchronize_chain_state(self.chain_id).await?;
1352                }
1353                Err(err) => return Err(err),
1354            };
1355        };
1356
1357        self.send_timing(timing_start, TimingType::ExecuteOperations);
1358
1359        result
1360    }
1361
1362    /// Executes an operation.
1363    pub async fn execute_operation(
1364        &self,
1365        operation: impl Into<Operation>,
1366    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1367        self.execute_operations(vec![operation.into()], vec![])
1368            .await
1369    }
1370
1371    /// Executes a new block.
1372    ///
1373    /// This must be preceded by a call to `prepare_chain()`.
1374    #[instrument(level = "trace", skip(operations, blobs))]
1375    async fn execute_block(
1376        &self,
1377        operations: Vec<Operation>,
1378        blobs: Vec<Blob>,
1379    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1380        #[cfg(with_metrics)]
1381        let _latency = super::metrics::EXECUTE_BLOCK_LATENCY.measure_latency();
1382
1383        let mutex = self.proposal_mutex();
1384        let lock_start = linera_base::time::Instant::now();
1385        let mut proposal_guard = mutex.lock_owned().await;
1386        tracing::debug!(
1387            chain_id = %self.chain_id,
1388            lock_wait_ms = lock_start.elapsed().as_millis(),
1389            "acquired proposal_mutex in execute_block"
1390        );
1391        // TODO(#5092): We shouldn't need to call this explicitly.
1392        // Process any leftover pending proposal from a previous interrupted call.
1393        // Even if there is no pending proposal, this still calls
1394        // `request_leader_timeout_if_needed` which ensures the local chain state
1395        // is synchronized with the current consensus round.
1396        match self
1397            .process_pending_block_without_prepare(&mut proposal_guard)
1398            .await?
1399        {
1400            ClientOutcome::Committed(Some(certificate)) => {
1401                return Ok(ClientOutcome::Conflict(Box::new(certificate)))
1402            }
1403            ClientOutcome::WaitForTimeout(timeout) => {
1404                return Ok(ClientOutcome::WaitForTimeout(timeout))
1405            }
1406            ClientOutcome::Conflict(certificate) => {
1407                return Ok(ClientOutcome::Conflict(certificate))
1408            }
1409            ClientOutcome::Committed(None) => {}
1410        }
1411
1412        // Collect pending messages and epoch changes after acquiring the lock to avoid
1413        // race conditions where messages valid for one block height are proposed at a
1414        // different height.
1415        let transactions = self.prepend_epochs_messages_and_events(operations).await?;
1416
1417        if transactions.is_empty() {
1418            return Err(Error::LocalNodeError(LocalNodeError::WorkerError(
1419                WorkerError::ChainError(Box::new(ChainError::EmptyBlock)),
1420            )));
1421        }
1422
1423        let block = self
1424            .new_pending_block(transactions, blobs, &mut proposal_guard)
1425            .await?;
1426
1427        match self
1428            .process_pending_block_without_prepare(&mut proposal_guard)
1429            .await?
1430        {
1431            ClientOutcome::Committed(Some(certificate)) if certificate.block() == &block => {
1432                Ok(ClientOutcome::Committed(certificate))
1433            }
1434            ClientOutcome::Committed(Some(certificate)) => {
1435                Ok(ClientOutcome::Conflict(Box::new(certificate)))
1436            }
1437            // Unreachable: We just set the pending proposal in the guard.
1438            ClientOutcome::Committed(None) => {
1439                Err(Error::BlockProposalError("Unexpected block proposal error"))
1440            }
1441            ClientOutcome::WaitForTimeout(timeout) => Ok(ClientOutcome::WaitForTimeout(timeout)),
1442            ClientOutcome::Conflict(certificate) => Ok(ClientOutcome::Conflict(certificate)),
1443        }
1444    }
1445
1446    /// Creates a vector of transactions which, in addition to the provided operations,
1447    /// also contains epoch changes, receiving message bundles and event stream updates
1448    /// (if there are any to be processed).
1449    /// This should be called when executing a block, in order to make sure that any pending
1450    /// messages or events are included in it.
1451    #[instrument(level = "trace", skip(operations))]
1452    async fn prepend_epochs_messages_and_events(
1453        &self,
1454        operations: Vec<Operation>,
1455    ) -> Result<Vec<Transaction>, Error> {
1456        let incoming_bundles = self.pending_message_bundles().await?;
1457        let stream_updates = self.collect_stream_updates().await?;
1458        Ok(self
1459            .collect_epoch_changes()
1460            .await?
1461            .into_iter()
1462            .map(Transaction::ExecuteOperation)
1463            .chain(
1464                incoming_bundles
1465                    .into_iter()
1466                    .map(Transaction::ReceiveMessages),
1467            )
1468            .chain(
1469                stream_updates
1470                    .into_iter()
1471                    .map(Transaction::ExecuteOperation),
1472            )
1473            .chain(operations.into_iter().map(Transaction::ExecuteOperation))
1474            .collect::<Vec<_>>())
1475    }
1476
1477    /// Creates a new pending block and stores it in `proposal_guard`.
1478    ///
1479    /// The caller must hold the proposal mutex. The pending proposal is written directly
1480    /// into the guard so that it is always synchronized with the mutex.
1481    #[instrument(level = "trace", skip(transactions, blobs, proposal_guard))]
1482    async fn new_pending_block(
1483        &self,
1484        transactions: Vec<Transaction>,
1485        blobs: Vec<Blob>,
1486        proposal_guard: &mut Option<PendingProposal>,
1487    ) -> Result<Block, Error> {
1488        let identity = self.identity().await?;
1489
1490        ensure!(
1491            proposal_guard.is_none(),
1492            Error::BlockProposalError(
1493                "Client state already has a pending block; \
1494                use the `linera retry-pending-block` command to commit that first"
1495            )
1496        );
1497        let info = self.chain_info_with_committees().await?;
1498        let timestamp = self.next_timestamp(&transactions, info.timestamp);
1499        let proposed_block = ProposedBlock {
1500            epoch: info.epoch,
1501            chain_id: self.chain_id,
1502            transactions,
1503            previous_block_hash: info.block_hash,
1504            height: info.next_block_height,
1505            authenticated_owner: Some(identity),
1506            timestamp,
1507        };
1508
1509        let round = self.round_for_oracle(&info, &identity).await?;
1510        // Make sure every incoming message succeeds and otherwise remove them.
1511        // Also, compute the final certified hash while we're at it.
1512        let (block, _, never_reject_origins) = self
1513            .client
1514            .stage_block_execution(
1515                proposed_block,
1516                round,
1517                blobs.clone(),
1518                self.options.bundle_execution_policy(),
1519            )
1520            .await?;
1521        // Record origins whose bundles were discarded due to the never-reject policy so
1522        // that `process_inbox` stops retrying them until the client is restarted.
1523        if !never_reject_origins.is_empty() {
1524            let skipped = self.skipped_origins.pin();
1525            for origin in never_reject_origins {
1526                skipped.insert(origin);
1527            }
1528        }
1529        let (proposed_block, auto_retry_outcome) = block.clone().into_proposal();
1530        *proposal_guard = Some(PendingProposal {
1531            block: proposed_block,
1532            blobs,
1533            auto_retry_outcome: Some(auto_retry_outcome),
1534            round: None,
1535        });
1536        Ok(block)
1537    }
1538
1539    /// Returns a suitable timestamp for the next block.
1540    ///
1541    /// This will usually be the current time according to the local clock, but may be slightly
1542    /// ahead to make sure it's not earlier than the incoming messages or the previous block.
1543    #[instrument(level = "trace", skip(transactions))]
1544    fn next_timestamp(&self, transactions: &[Transaction], block_time: Timestamp) -> Timestamp {
1545        let local_time = self.storage_client().clock().current_time();
1546        transactions
1547            .iter()
1548            .filter_map(Transaction::incoming_bundle)
1549            .map(|msg| msg.bundle.timestamp)
1550            .max()
1551            .map_or(local_time, |timestamp| timestamp.max(local_time))
1552            .max(block_time)
1553    }
1554
1555    /// Queries an application.
1556    #[instrument(level = "trace", skip(query))]
1557    pub async fn query_application(
1558        &self,
1559        query: Query,
1560        block_hash: Option<CryptoHash>,
1561    ) -> Result<(QueryOutcome, BlockHeight), Error> {
1562        loop {
1563            let result = self
1564                .client
1565                .local_node
1566                .query_application(self.chain_id, query.clone(), block_hash)
1567                .await;
1568            if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result {
1569                let validators = self.client.validator_nodes().await?;
1570                self.client
1571                    .update_local_node_with_blobs_from(blob_ids.clone(), &validators)
1572                    .await?;
1573                continue; // We found the missing blob: retry.
1574            }
1575            return Ok(result?);
1576        }
1577    }
1578
1579    /// Queries a system application.
1580    #[cfg(with_testing)]
1581    #[instrument(level = "trace", skip(query))]
1582    pub async fn query_system_application(
1583        &self,
1584        query: linera_execution::SystemQuery,
1585    ) -> Result<QueryOutcome<linera_execution::SystemResponse>, Error> {
1586        let (
1587            QueryOutcome {
1588                response,
1589                operations,
1590            },
1591            _,
1592        ) = self.query_application(Query::System(query), None).await?;
1593        match response {
1594            linera_execution::QueryResponse::System(response) => Ok(QueryOutcome {
1595                response,
1596                operations,
1597            }),
1598            _ => Err(Error::InternalError("Unexpected response for system query")),
1599        }
1600    }
1601
1602    /// Queries a user application.
1603    #[instrument(level = "trace", skip(application_id, query))]
1604    #[cfg(with_testing)]
1605    pub async fn query_user_application<A: Abi>(
1606        &self,
1607        application_id: ApplicationId<A>,
1608        query: &A::Query,
1609    ) -> Result<QueryOutcome<A::QueryResponse>, Error> {
1610        let query = Query::user(application_id, query)?;
1611        let (
1612            QueryOutcome {
1613                response,
1614                operations,
1615            },
1616            _,
1617        ) = self.query_application(query, None).await?;
1618        match response {
1619            linera_execution::QueryResponse::User(response_bytes) => {
1620                let response = serde_json::from_slice(&response_bytes)?;
1621                Ok(QueryOutcome {
1622                    response,
1623                    operations,
1624                })
1625            }
1626            _ => Err(Error::InternalError("Unexpected response for user query")),
1627        }
1628    }
1629
1630    /// Obtains the local balance of the chain account after staging the execution of
1631    /// incoming messages in a new block.
1632    ///
1633    /// Does not attempt to synchronize with validators. The result will reflect up to
1634    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
1635    /// block.
1636    #[instrument(level = "trace")]
1637    pub async fn query_balance(&self) -> Result<Amount, Error> {
1638        let (balance, _) = self.query_balances_with_owner(AccountOwner::CHAIN).await?;
1639        Ok(balance)
1640    }
1641
1642    /// Obtains the local balance of an account after staging the execution of incoming messages in
1643    /// a new block.
1644    ///
1645    /// Does not attempt to synchronize with validators. The result will reflect up to
1646    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
1647    /// block.
1648    #[instrument(level = "trace", skip(owner))]
1649    pub async fn query_owner_balance(&self, owner: AccountOwner) -> Result<Amount, Error> {
1650        if owner.is_chain() {
1651            self.query_balance().await
1652        } else {
1653            Ok(self
1654                .query_balances_with_owner(owner)
1655                .await?
1656                .1
1657                .unwrap_or(Amount::ZERO))
1658        }
1659    }
1660
1661    /// Obtains the local balance of an account and optionally another user after staging the
1662    /// execution of incoming messages in a new block.
1663    ///
1664    /// Does not attempt to synchronize with validators. The result will reflect up to
1665    /// `max_pending_message_bundles` incoming message bundles and the execution fees for a single
1666    /// block.
1667    #[instrument(level = "trace", skip(owner))]
1668    pub(crate) async fn query_balances_with_owner(
1669        &self,
1670        owner: AccountOwner,
1671    ) -> Result<(Amount, Option<Amount>), Error> {
1672        let incoming_bundles = self.pending_message_bundles().await?;
1673        // Since we disallow empty blocks, and there is no incoming messages,
1674        // that could change it, we query for the balance immediately.
1675        if incoming_bundles.is_empty() {
1676            let chain_balance = self.local_balance().await?;
1677            let owner_balance = self.local_owner_balance(owner).await?;
1678            return Ok((chain_balance, Some(owner_balance)));
1679        }
1680        let info = self.chain_info().await?;
1681        let transactions = incoming_bundles
1682            .into_iter()
1683            .map(Transaction::ReceiveMessages)
1684            .collect::<Vec<_>>();
1685        let timestamp = self.next_timestamp(&transactions, info.timestamp);
1686        let block = ProposedBlock {
1687            epoch: info.epoch,
1688            chain_id: self.chain_id,
1689            transactions,
1690            previous_block_hash: info.block_hash,
1691            height: info.next_block_height,
1692            authenticated_owner: if owner == AccountOwner::CHAIN {
1693                None
1694            } else {
1695                Some(owner)
1696            },
1697            timestamp,
1698        };
1699        match self
1700            .client
1701            .stage_block_execution(
1702                block,
1703                None,
1704                Vec::new(),
1705                self.options.bundle_execution_policy(),
1706            )
1707            .await
1708        {
1709            Ok((_, response, _)) => Ok((
1710                response.info.chain_balance,
1711                response.info.requested_owner_balance,
1712            )),
1713            Err(Error::LocalNodeError(LocalNodeError::WorkerError(WorkerError::ChainError(
1714                error,
1715            )))) if matches!(
1716                &*error,
1717                ChainError::ExecutionError(
1718                    execution_error,
1719                    ChainExecutionContext::Block
1720                ) if matches!(
1721                    **execution_error,
1722                    ExecutionError::FeesExceedFunding { .. }
1723                )
1724            ) =>
1725            {
1726                // We can't even pay for the execution of one empty block. Let's return zero.
1727                Ok((Amount::ZERO, Some(Amount::ZERO)))
1728            }
1729            Err(error) => Err(error),
1730        }
1731    }
1732
1733    /// Reads the local balance of the chain account.
1734    ///
1735    /// Does not process the inbox or attempt to synchronize with validators.
1736    #[instrument(level = "trace")]
1737    pub async fn local_balance(&self) -> Result<Amount, Error> {
1738        let (balance, _) = self.local_balances_with_owner(AccountOwner::CHAIN).await?;
1739        Ok(balance)
1740    }
1741
1742    /// Reads the local balance of a user account.
1743    ///
1744    /// Does not process the inbox or attempt to synchronize with validators.
1745    #[instrument(level = "trace", skip(owner))]
1746    pub async fn local_owner_balance(&self, owner: AccountOwner) -> Result<Amount, Error> {
1747        if owner.is_chain() {
1748            self.local_balance().await
1749        } else {
1750            Ok(self
1751                .local_balances_with_owner(owner)
1752                .await?
1753                .1
1754                .unwrap_or(Amount::ZERO))
1755        }
1756    }
1757
1758    /// Reads the local balance of the chain account and optionally another user.
1759    ///
1760    /// Does not process the inbox or attempt to synchronize with validators.
1761    #[instrument(level = "trace", skip(owner))]
1762    pub(crate) async fn local_balances_with_owner(
1763        &self,
1764        owner: AccountOwner,
1765    ) -> Result<(Amount, Option<Amount>), Error> {
1766        ensure!(
1767            self.chain_info().await?.next_block_height >= self.initial_next_block_height,
1768            Error::WalletSynchronizationError
1769        );
1770        let mut query = ChainInfoQuery::new(self.chain_id);
1771        query.request_owner_balance = owner;
1772        let response = self
1773            .client
1774            .local_node
1775            .handle_chain_info_query(query)
1776            .await?;
1777        Ok((
1778            response.info.chain_balance,
1779            response.info.requested_owner_balance,
1780        ))
1781    }
1782
1783    /// Sends tokens to a chain.
1784    #[instrument(level = "trace")]
1785    pub async fn transfer_to_account(
1786        &self,
1787        from: AccountOwner,
1788        amount: Amount,
1789        account: Account,
1790    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1791        self.transfer(from, amount, account).await
1792    }
1793
1794    /// Burns tokens (transfer to a special address).
1795    #[cfg(with_testing)]
1796    #[instrument(level = "trace")]
1797    pub async fn burn(
1798        &self,
1799        owner: AccountOwner,
1800        amount: Amount,
1801    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
1802        let recipient = Account::burn_address(self.chain_id);
1803        self.transfer(owner, amount, recipient).await
1804    }
1805
1806    #[instrument(level = "trace")]
1807    pub async fn fetch_chain_info(&self) -> Result<Box<ChainInfo>, Error> {
1808        let validators = self.client.validator_nodes().await?;
1809        self.client
1810            .fetch_chain_info(self.chain_id, &validators)
1811            .await
1812    }
1813
1814    /// Attempts to synchronize chains that have sent us messages and populate our local
1815    /// inbox.
1816    ///
1817    /// To create a block that actually executes the messages in the inbox,
1818    /// `process_inbox` must be called separately.
1819    ///
1820    /// If the chain is in follow-only mode, this only downloads blocks for this chain without
1821    /// fetching manager values or sender/publisher chains.
1822    /// Synchronizes the chain state from validators, optionally stopping at a given
1823    /// block height or block timestamp.
1824    ///
1825    /// - If `next_height` is `Some`, downloads blocks up to (but not including) that height.
1826    /// - If `until_block_time` is `Some`, downloads blocks up to (but not including) the
1827    ///   first block with timestamp >= the given value.
1828    #[instrument(level = "trace")]
1829    pub async fn synchronize_up_to(
1830        &self,
1831        next_height: Option<BlockHeight>,
1832        until_block_time: Option<Timestamp>,
1833    ) -> Result<Box<ChainInfo>, Error> {
1834        let (_, committee) = self.client.admin_committee().await?;
1835        let validators = self.client.make_nodes(&committee)?;
1836        Box::pin(self.client.fetch_chain_info(self.chain_id, &validators)).await?;
1837        communicate_with_quorum(
1838            &validators,
1839            &committee,
1840            |_: &()| (),
1841            |remote_node| async move {
1842                self.client
1843                    .download_certificates_from(
1844                        &remote_node,
1845                        self.chain_id,
1846                        next_height.unwrap_or(BlockHeight::MAX),
1847                        until_block_time,
1848                    )
1849                    .await?;
1850                Ok(())
1851            },
1852            self.client.options.quorum_grace_period,
1853        )
1854        .await?;
1855        self.client
1856            .local_node
1857            .chain_info(self.chain_id)
1858            .await
1859            .map_err(Into::into)
1860    }
1861
1862    pub async fn synchronize_from_validators(&self) -> Result<Box<ChainInfo>, Error> {
1863        if self.is_follow_only() {
1864            return self.client.synchronize_chain_state(self.chain_id).await;
1865        }
1866        let info = self.prepare_chain().await?;
1867        self.synchronize_publisher_chains().await?;
1868        self.find_received_certificates().await?;
1869        Ok(info)
1870    }
1871
1872    /// Processes the last pending block.
1873    #[instrument(level = "trace")]
1874    pub async fn process_pending_block(
1875        &self,
1876    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1877        self.prepare_chain().await?;
1878        let mutex = self.proposal_mutex();
1879        let mut proposal_guard = mutex.lock_owned().await;
1880        self.process_pending_block_without_prepare(&mut proposal_guard)
1881            .await
1882    }
1883
1884    /// Processes the last pending block. Assumes that the local chain is up to date.
1885    ///
1886    /// The caller must hold the proposal mutex via `proposal_guard`. The pending proposal
1887    /// is read from and cleared through the guard, ensuring synchronization.
1888    #[instrument(level = "debug", skip(self, proposal_guard), fields(chain_id = %self.chain_id))]
1889    async fn process_pending_block_without_prepare(
1890        &self,
1891        proposal_guard: &mut Option<PendingProposal>,
1892    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
1893        let process_start = linera_base::time::Instant::now();
1894        tracing::debug!("process_pending_block_without_prepare started");
1895        let info = self.request_leader_timeout_if_needed().await?;
1896
1897        // Clear stale pending proposals whose height has already been committed.
1898        if let Some(pending) = &*proposal_guard {
1899            if pending.block.height < info.next_block_height {
1900                tracing::debug!(
1901                    "Clearing pending proposal: a block was committed at height {}",
1902                    pending.block.height
1903                );
1904                *proposal_guard = None;
1905            }
1906        }
1907
1908        // If there is a validated block in the current round, finalize it.
1909        if info.manager.has_locking_block_in_current_round()
1910            && !info.manager.current_round.is_fast()
1911        {
1912            return self.finalize_locking_block(info).await;
1913        }
1914        let owner = self.identity().await?;
1915
1916        let local_node = &self.client.local_node;
1917        // Otherwise we have to re-propose the highest validated block, if there is one.
1918        let (block, blobs) = if let Some(locking) = &info.manager.requested_locking {
1919            match &**locking {
1920                LockingBlock::Regular(certificate) => {
1921                    let blob_ids = certificate.block().required_blob_ids();
1922                    let blobs = local_node
1923                        .get_locking_blobs(&blob_ids, self.chain_id)
1924                        .await?
1925                        .ok_or_else(|| Error::InternalError("Missing local locking blobs"))?;
1926                    debug!("Retrying locking block from round {}", certificate.round);
1927                    (certificate.block().clone(), blobs)
1928                }
1929                LockingBlock::Fast(proposal) => {
1930                    let proposed_block = proposal.content.block.clone();
1931                    let blob_ids = proposed_block.published_blob_ids();
1932                    let blobs = local_node
1933                        .get_locking_blobs(&blob_ids, self.chain_id)
1934                        .await?
1935                        .ok_or_else(|| Error::InternalError("Missing local locking blobs"))?;
1936                    let (block, _, _) = self
1937                        .client
1938                        .stage_block_execution(
1939                            proposed_block,
1940                            None,
1941                            blobs.clone(),
1942                            BundleExecutionPolicy::committed(),
1943                        )
1944                        .await?;
1945                    debug!("Retrying locking block from fast round.");
1946                    (block, blobs)
1947                }
1948            }
1949        } else if let Some(pending) = proposal_guard.as_ref() {
1950            // Otherwise we are free to propose our own pending block.
1951            let proposed_block = pending.block.clone();
1952            let blobs = pending.blobs.clone();
1953            let staging_outcome = pending.auto_retry_outcome.as_ref();
1954            let round = self.round_for_oracle(&info, &owner).await?;
1955            let (block, _, _) = self
1956                .client
1957                .stage_block_execution(
1958                    proposed_block,
1959                    round,
1960                    blobs.clone(),
1961                    BundleExecutionPolicy::committed(),
1962                )
1963                .await?;
1964            // Sanity check: the committed execution should produce the same outcome
1965            // as the initial AutoRetry execution. A mismatch indicates a divergence
1966            // between the two execution paths.
1967            if let Some(staging_outcome) = staging_outcome {
1968                ensure!(
1969                    block.outcome_matches(staging_outcome),
1970                    Error::ExecutionOutcomeMismatch
1971                );
1972            }
1973            debug!("Proposing the local pending block.");
1974            (block, blobs)
1975        } else {
1976            return Ok(ClientOutcome::Committed(None)); // Nothing to do.
1977        };
1978
1979        let has_oracle_responses = block.has_oracle_responses();
1980        let (proposed_block, outcome) = block.into_proposal();
1981        let round = match self
1982            .round_for_new_proposal(&info, &owner, has_oracle_responses)
1983            .await?
1984        {
1985            Either::Left(round) => round,
1986            Either::Right(timeout) => return Ok(ClientOutcome::WaitForTimeout(timeout)),
1987        };
1988        debug!("Proposing block for round {}", round);
1989        if let Some(pending) = proposal_guard.as_mut() {
1990            pending.round.get_or_insert(round);
1991        }
1992
1993        let already_handled_locally = info
1994            .manager
1995            .already_handled_proposal(round, &proposed_block);
1996        // Create the final block proposal.
1997        let proposal = if let Some(locking) = info.manager.requested_locking {
1998            Box::new(match *locking {
1999                LockingBlock::Regular(cert) => {
2000                    BlockProposal::new_retry_regular(owner, round, cert, self.signer())
2001                        .await
2002                        .map_err(Error::signer_failure)?
2003                }
2004                LockingBlock::Fast(proposal) => {
2005                    BlockProposal::new_retry_fast(owner, round, proposal, self.signer())
2006                        .await
2007                        .map_err(Error::signer_failure)?
2008                }
2009            })
2010        } else {
2011            Box::new(
2012                BlockProposal::new_initial(owner, round, proposed_block.clone(), self.signer())
2013                    .await
2014                    .map_err(Error::signer_failure)?,
2015            )
2016        };
2017        if !already_handled_locally {
2018            // Check the final block proposal. This will be cheaper after #1401.
2019            if let Err(err) = local_node.handle_block_proposal(*proposal.clone()).await {
2020                match err {
2021                    LocalNodeError::BlobsNotFound(_) => {
2022                        local_node
2023                            .handle_pending_blobs(self.chain_id, blobs)
2024                            .await?;
2025                        local_node.handle_block_proposal(*proposal.clone()).await?;
2026                    }
2027                    err => return Err(err.into()),
2028                }
2029            }
2030        }
2031        let committee = self.local_committee().await?;
2032        let block = Block::new(proposed_block, outcome);
2033        // Send the query to validators.
2034        let submit_block_proposal_start = linera_base::time::Instant::now();
2035        let certificate = if round.is_fast() {
2036            let hashed_value = ConfirmedBlock::new(block);
2037            self.client
2038                .submit_block_proposal(committee.clone(), proposal, hashed_value)
2039                .await?
2040        } else {
2041            let hashed_value = ValidatedBlock::new(block);
2042            let certificate = self
2043                .client
2044                .submit_block_proposal(committee.clone(), proposal, hashed_value.clone())
2045                .await?;
2046            self.client.finalize_block(&committee, certificate).await?
2047        };
2048        self.send_timing(submit_block_proposal_start, TimingType::SubmitBlockProposal);
2049        tracing::debug!(
2050            total_process_ms = process_start.elapsed().as_millis(),
2051            "process_pending_block_without_prepare completing"
2052        );
2053        debug!(round = %certificate.round, "Sending confirmed block to validators");
2054        let certificate = self.client.storage_client().cache_certificate(certificate);
2055        self.update_validators(Some(&committee), Some(certificate.clone()))
2056            .await?;
2057        // Clear the pending proposal now that the block has been committed.
2058        *proposal_guard = None;
2059        Ok(ClientOutcome::Committed(Some(Arc::unwrap_or_clone(
2060            certificate,
2061        ))))
2062    }
2063
2064    fn send_timing(&self, start: Instant, timing_type: TimingType) {
2065        let Some(sender) = &self.timing_sender else {
2066            return;
2067        };
2068        if let Err(err) = sender.send((start.elapsed().as_millis() as u64, timing_type)) {
2069            tracing::warn!(%err, "Failed to send timing info");
2070        }
2071    }
2072
2073    /// Requests a leader timeout certificate if the current round has timed out. Returns the
2074    /// chain info for the (possibly new) current round.
2075    async fn request_leader_timeout_if_needed(&self) -> Result<Box<ChainInfo>, Error> {
2076        let mut info = self.chain_info_with_manager_values().await?;
2077        // If the current round has timed out, we request a timeout certificate and retry in
2078        // the next round.
2079        if let Some(round_timeout) = info.manager.round_timeout {
2080            if round_timeout <= self.storage_client().clock().current_time() {
2081                if let Err(e) = self.request_leader_timeout().await {
2082                    debug!("Failed to obtain a timeout certificate: {}", e);
2083                } else {
2084                    info = self.chain_info_with_manager_values().await?;
2085                }
2086            }
2087        }
2088        Ok(info)
2089    }
2090
2091    /// Finalizes the locking block.
2092    ///
2093    /// Panics if there is no locking block; fails if the locking block is not in the current round.
2094    async fn finalize_locking_block(
2095        &self,
2096        info: Box<ChainInfo>,
2097    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
2098        let locking = info
2099            .manager
2100            .requested_locking
2101            .expect("Should have a locking block");
2102        let LockingBlock::Regular(certificate) = *locking else {
2103            panic!("Should have a locking validated block");
2104        };
2105        debug!(
2106            round = %certificate.round,
2107            "Finalizing locking block"
2108        );
2109        let committee = self.local_committee().await?;
2110        let certificate = self
2111            .client
2112            .finalize_block(&committee, certificate.clone())
2113            .await?;
2114        let certificate = self.client.storage_client().cache_certificate(certificate);
2115        self.update_validators(Some(&committee), Some(certificate.clone()))
2116            .await?;
2117        Ok(ClientOutcome::Committed(Some(Arc::unwrap_or_clone(
2118            certificate,
2119        ))))
2120    }
2121
2122    /// Returns the number for the round number oracle to use when staging a block proposal.
2123    async fn round_for_oracle(
2124        &self,
2125        info: &ChainInfo,
2126        identity: &AccountOwner,
2127    ) -> Result<Option<u32>, Error> {
2128        // Pretend we do use oracles: If we don't, the round number is never read anyway.
2129        match self.round_for_new_proposal(info, identity, true).await {
2130            // If it is a multi-leader round, use its number for the oracle.
2131            Ok(Either::Left(round)) => Ok(round.multi_leader()),
2132            // If there is no suitable round with oracles, use None: If it works without oracles,
2133            // the block won't read the value. If it returns a timeout, it will be a single-leader
2134            // round, in which the oracle returns None.
2135            Err(Error::BlockProposalError(_)) | Ok(Either::Right(_)) => Ok(None),
2136            Err(err) => Err(err),
2137        }
2138    }
2139
2140    /// Returns a round in which we can propose a new block or the given one, if possible.
2141    async fn round_for_new_proposal(
2142        &self,
2143        info: &ChainInfo,
2144        identity: &AccountOwner,
2145        has_oracle_responses: bool,
2146    ) -> Result<Either<Round, RoundTimeout>, Error> {
2147        let manager = &info.manager;
2148        let seed = manager.seed;
2149        // If there is a conflicting proposal in the current round, we can only propose if the
2150        // next round can be started without a timeout, i.e. if we are in a multi-leader round.
2151        // Similarly, we cannot propose a block that uses oracles in the fast round, and also
2152        // skip the fast round if fast blocks are not allowed.
2153        let skip_fast = manager.current_round.is_fast()
2154            && (has_oracle_responses || !self.options.allow_fast_blocks);
2155        let conflict = manager
2156            .requested_signed_proposal
2157            .as_ref()
2158            .into_iter()
2159            .chain(&manager.requested_proposed)
2160            .any(|proposal| proposal.content.round == manager.current_round)
2161            || skip_fast;
2162        let round = if !conflict {
2163            manager.current_round
2164        } else if let Some(round) = manager
2165            .ownership
2166            .next_round(manager.current_round)
2167            .filter(|_| manager.current_round.is_multi_leader() || manager.current_round.is_fast())
2168        {
2169            round
2170        } else if let Some(timeout) = info.round_timeout() {
2171            return Ok(Either::Right(timeout));
2172        } else {
2173            return Err(Error::BlockProposalError(
2174                "Conflicting proposal in the current round",
2175            ));
2176        };
2177        let current_committee = self
2178            .local_committee()
2179            .await?
2180            .validators
2181            .values()
2182            .map(|v| (AccountOwner::from(v.account_public_key), v.votes))
2183            .collect();
2184        if manager.should_propose(identity, round, seed, &current_committee) {
2185            return Ok(Either::Left(round));
2186        }
2187        if let Some(timeout) = info.round_timeout() {
2188            return Ok(Either::Right(timeout));
2189        }
2190        Err(Error::BlockProposalError(
2191            "Not a leader in the current round",
2192        ))
2193    }
2194
2195    /// Clears the information on any operation that previously failed.
2196    #[cfg(with_testing)]
2197    #[instrument(level = "trace")]
2198    pub async fn clear_pending_proposal(&self) {
2199        *self.proposal_mutex().lock().await = None;
2200    }
2201
2202    /// Rotates the key of the chain.
2203    ///
2204    /// Replaces current owners of the chain with the new key pair.
2205    #[cfg(with_testing)]
2206    #[instrument(level = "trace")]
2207    pub async fn rotate_key_pair(
2208        &self,
2209        public_key: linera_base::crypto::AccountPublicKey,
2210    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2211        self.transfer_ownership(public_key.into()).await
2212    }
2213
2214    /// Transfers ownership of the chain to a single super owner.
2215    #[instrument(level = "trace")]
2216    pub async fn transfer_ownership(
2217        &self,
2218        new_owner: AccountOwner,
2219    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2220        self.execute_operation(SystemOperation::ChangeOwnership {
2221            super_owners: vec![new_owner],
2222            owners: Vec::new(),
2223            first_leader: None,
2224            multi_leader_rounds: 2,
2225            open_multi_leader_rounds: false,
2226            timeout_config: TimeoutConfig::default(),
2227        })
2228        .await
2229    }
2230
2231    /// Adds another owner to the chain, and turns existing super owners into regular owners.
2232    #[instrument(level = "trace")]
2233    pub async fn share_ownership(
2234        &self,
2235        new_owner: AccountOwner,
2236        new_weight: u64,
2237    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2238        let ownership = self.prepare_chain().await?.manager.ownership;
2239        ensure!(
2240            ownership.is_active(),
2241            ChainError::InactiveChain(self.chain_id)
2242        );
2243        let mut owners = ownership.owners.into_iter().collect::<Vec<_>>();
2244        owners.extend(ownership.super_owners.into_iter().zip(iter::repeat(100)));
2245        owners.push((new_owner, new_weight));
2246        let operations = vec![Operation::system(SystemOperation::ChangeOwnership {
2247            super_owners: Vec::new(),
2248            owners,
2249            first_leader: ownership.first_leader,
2250            multi_leader_rounds: ownership.multi_leader_rounds,
2251            open_multi_leader_rounds: ownership.open_multi_leader_rounds,
2252            timeout_config: ownership.timeout_config,
2253        })];
2254        self.execute_block(operations, vec![]).await
2255    }
2256
2257    /// Returns the current ownership settings on this chain.
2258    #[instrument(level = "trace")]
2259    pub async fn query_chain_ownership(&self) -> Result<ChainOwnership, Error> {
2260        Ok(self
2261            .client
2262            .local_node
2263            .chain_state_view(self.chain_id)
2264            .await?
2265            .execution_state
2266            .system
2267            .ownership
2268            .get()
2269            .await?
2270            .clone())
2271    }
2272
2273    /// Changes the ownership of this chain. Fails if it would remove existing owners, unless
2274    /// `remove_owners` is `true`.
2275    #[instrument(level = "trace")]
2276    pub async fn change_ownership(
2277        &self,
2278        ownership: ChainOwnership,
2279    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2280        self.execute_operation(SystemOperation::ChangeOwnership {
2281            super_owners: ownership.super_owners.into_iter().collect(),
2282            owners: ownership.owners.into_iter().collect(),
2283            first_leader: ownership.first_leader,
2284            multi_leader_rounds: ownership.multi_leader_rounds,
2285            open_multi_leader_rounds: ownership.open_multi_leader_rounds,
2286            timeout_config: ownership.timeout_config.clone(),
2287        })
2288        .await
2289    }
2290
2291    /// Returns the current application permissions on this chain.
2292    #[instrument(level = "trace")]
2293    pub async fn query_application_permissions(&self) -> Result<ApplicationPermissions, Error> {
2294        Ok(self
2295            .client
2296            .local_node
2297            .chain_state_view(self.chain_id)
2298            .await?
2299            .execution_state
2300            .system
2301            .application_permissions
2302            .get()
2303            .await?
2304            .clone())
2305    }
2306
2307    /// Changes the application permissions configuration on this chain.
2308    #[instrument(level = "trace", skip(application_permissions))]
2309    pub async fn change_application_permissions(
2310        &self,
2311        application_permissions: ApplicationPermissions,
2312    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2313        self.execute_operation(SystemOperation::ChangeApplicationPermissions(
2314            application_permissions,
2315        ))
2316        .await
2317    }
2318
2319    /// Opens a new chain with a derived UID.
2320    #[instrument(level = "trace", skip(self))]
2321    pub async fn open_chain(
2322        &self,
2323        ownership: ChainOwnership,
2324        application_permissions: ApplicationPermissions,
2325        balance: Amount,
2326    ) -> Result<ClientOutcome<(ChainDescription, ConfirmedBlockCertificate)>, Error> {
2327        // Check if we have a key for any owner before consuming ownership.
2328        let mut has_key = false;
2329        for owner in ownership.all_owners() {
2330            if self.has_key_for(owner).await? {
2331                has_key = true;
2332                break;
2333            }
2334        }
2335        let config = OpenChainConfig {
2336            ownership,
2337            balance,
2338            application_permissions,
2339        };
2340        let operation = Operation::system(SystemOperation::OpenChain(config));
2341        let certificate = match self.execute_block(vec![operation], vec![]).await? {
2342            ClientOutcome::Committed(certificate) => certificate,
2343            ClientOutcome::Conflict(certificate) => {
2344                return Ok(ClientOutcome::Conflict(certificate));
2345            }
2346            ClientOutcome::WaitForTimeout(timeout) => {
2347                return Ok(ClientOutcome::WaitForTimeout(timeout));
2348            }
2349        };
2350        // The only operation, i.e. the last transaction, created the new chain.
2351        let chain_blob = certificate
2352            .block()
2353            .body
2354            .blobs
2355            .last()
2356            .and_then(|blobs| blobs.last())
2357            .ok_or_else(|| Error::InternalError("Failed to create a new chain"))?;
2358        let description = bcs::from_bytes::<ChainDescription>(chain_blob.bytes())?;
2359        // If we have a key for any owner, add it to the list of tracked chains.
2360        if has_key {
2361            self.client
2362                .extend_chain_mode(description.id(), ListeningMode::FullChain);
2363            self.client
2364                .retry_pending_cross_chain_requests(self.chain_id)
2365                .await?;
2366        }
2367        Ok(ClientOutcome::Committed((description, certificate)))
2368    }
2369
2370    /// Closes the chain (and loses everything in it!!).
2371    /// Returns `None` if the chain was already closed.
2372    #[instrument(level = "trace")]
2373    pub async fn close_chain(
2374        &self,
2375    ) -> Result<ClientOutcome<Option<ConfirmedBlockCertificate>>, Error> {
2376        match self.execute_operation(SystemOperation::CloseChain).await {
2377            Ok(outcome) => Ok(outcome.map(Some)),
2378            Err(Error::LocalNodeError(LocalNodeError::WorkerError(WorkerError::ChainError(
2379                chain_error,
2380            )))) if matches!(*chain_error, ChainError::ClosedChain) => {
2381                Ok(ClientOutcome::Committed(None)) // Chain is already closed.
2382            }
2383            Err(error) => Err(error),
2384        }
2385    }
2386
2387    /// Publishes some module.
2388    #[cfg(not(target_arch = "wasm32"))]
2389    #[instrument(level = "trace", skip(contract, service))]
2390    pub async fn publish_module(
2391        &self,
2392        contract: Bytecode,
2393        service: Bytecode,
2394        vm_runtime: VmRuntime,
2395    ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, Error> {
2396        let (blobs, module_id) = super::create_bytecode_blobs(contract, service, vm_runtime).await;
2397        self.publish_module_blobs(blobs, module_id).await
2398    }
2399
2400    /// Publishes some module.
2401    #[cfg(not(target_arch = "wasm32"))]
2402    #[instrument(level = "trace", skip(blobs, module_id))]
2403    pub async fn publish_module_blobs(
2404        &self,
2405        blobs: Vec<Blob>,
2406        module_id: ModuleId,
2407    ) -> Result<ClientOutcome<(ModuleId, ConfirmedBlockCertificate)>, Error> {
2408        self.execute_operations(
2409            vec![Operation::system(SystemOperation::PublishModule {
2410                module_id,
2411            })],
2412            blobs,
2413        )
2414        .await?
2415        .try_map(|certificate| Ok((module_id, certificate)))
2416    }
2417
2418    /// Publishes some data blobs.
2419    #[instrument(level = "trace", skip(bytes))]
2420    pub async fn publish_data_blobs(
2421        &self,
2422        bytes: Vec<Vec<u8>>,
2423    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2424        let blobs = bytes.into_iter().map(Blob::new_data);
2425        let publish_blob_operations = blobs
2426            .clone()
2427            .map(|blob| {
2428                Operation::system(SystemOperation::PublishDataBlob {
2429                    blob_hash: blob.id().hash,
2430                })
2431            })
2432            .collect();
2433        self.execute_operations(publish_blob_operations, blobs.collect())
2434            .await
2435    }
2436
2437    /// Publishes some data blob.
2438    #[instrument(level = "trace", skip(bytes))]
2439    pub async fn publish_data_blob(
2440        &self,
2441        bytes: Vec<u8>,
2442    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2443        self.publish_data_blobs(vec![bytes]).await
2444    }
2445
2446    /// Creates an application by instantiating some bytecode.
2447    #[instrument(
2448        level = "trace",
2449        skip(self, parameters, instantiation_argument, required_application_ids)
2450    )]
2451    pub async fn create_application<
2452        A: Abi,
2453        Parameters: Serialize,
2454        InstantiationArgument: Serialize,
2455    >(
2456        &self,
2457        module_id: ModuleId<A, Parameters, InstantiationArgument>,
2458        parameters: &Parameters,
2459        instantiation_argument: &InstantiationArgument,
2460        required_application_ids: Vec<ApplicationId>,
2461    ) -> Result<ClientOutcome<(ApplicationId<A>, ConfirmedBlockCertificate)>, Error> {
2462        let instantiation_argument = serde_json::to_vec(instantiation_argument)?;
2463        let parameters = serde_json::to_vec(parameters)?;
2464        Ok(self
2465            .create_application_untyped(
2466                module_id.forget_abi(),
2467                parameters,
2468                instantiation_argument,
2469                required_application_ids,
2470            )
2471            .await?
2472            .map(|(app_id, cert)| (app_id.with_abi(), cert)))
2473    }
2474
2475    /// Creates an application by instantiating some bytecode.
2476    #[instrument(
2477        level = "trace",
2478        skip(
2479            self,
2480            module_id,
2481            parameters,
2482            instantiation_argument,
2483            required_application_ids
2484        )
2485    )]
2486    pub async fn create_application_untyped(
2487        &self,
2488        module_id: ModuleId,
2489        parameters: Vec<u8>,
2490        instantiation_argument: Vec<u8>,
2491        required_application_ids: Vec<ApplicationId>,
2492    ) -> Result<ClientOutcome<(ApplicationId, ConfirmedBlockCertificate)>, Error> {
2493        self.execute_operation(SystemOperation::CreateApplication {
2494            module_id,
2495            parameters,
2496            instantiation_argument,
2497            required_application_ids,
2498        })
2499        .await?
2500        .try_map(|certificate| {
2501            // The first message of the only operation created the application.
2502            let mut creation: Vec<_> = certificate
2503                .block()
2504                .created_blob_ids()
2505                .into_iter()
2506                .filter(|blob_id| blob_id.blob_type == BlobType::ApplicationDescription)
2507                .collect();
2508            if creation.len() > 1 {
2509                return Err(Error::InternalError(
2510                    "Unexpected number of application descriptions published",
2511                ));
2512            }
2513            let blob_id = creation.pop().ok_or(Error::InternalError(
2514                "ApplicationDescription blob not found.",
2515            ))?;
2516            let id = ApplicationId::new(blob_id.hash);
2517            Ok((id, certificate))
2518        })
2519    }
2520
2521    /// Creates a new committee and starts using it (admin chains only).
2522    #[instrument(level = "trace", skip(committee))]
2523    pub async fn stage_new_committee(
2524        &self,
2525        committee: Committee,
2526    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2527        let blob = Blob::new(BlobContent::new_committee(bcs::to_bytes(&committee)?));
2528        let blob_hash = blob.id().hash;
2529        match self
2530            .execute_operations(
2531                vec![Operation::system(SystemOperation::Admin(
2532                    AdminOperation::PublishCommitteeBlob { blob_hash },
2533                ))],
2534                vec![blob],
2535            )
2536            .await?
2537        {
2538            ClientOutcome::Committed(_) => {}
2539            outcome @ ClientOutcome::WaitForTimeout(_) | outcome @ ClientOutcome::Conflict(_) => {
2540                return Ok(outcome)
2541            }
2542        }
2543        let epoch = self.chain_info().await?.epoch.try_add_one()?;
2544        self.execute_operation(SystemOperation::Admin(AdminOperation::CreateCommittee {
2545            epoch,
2546            blob_hash,
2547        }))
2548        .await
2549    }
2550
2551    /// Synchronizes the chain with the validators and creates blocks without any operations to
2552    /// process all incoming messages. This may require several blocks.
2553    ///
2554    /// If not all certificates could be processed due to a timeout, the timestamp for when to retry
2555    /// is returned, too.
2556    #[instrument(level = "trace")]
2557    pub async fn process_inbox(
2558        &self,
2559    ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), Error> {
2560        self.prepare_chain().await?;
2561        self.process_inbox_without_prepare().await
2562    }
2563
2564    /// Creates blocks without any operations to process all incoming messages. This may require
2565    /// several blocks.
2566    ///
2567    /// If not all certificates could be processed due to a timeout, the timestamp for when to retry
2568    /// is returned, too.
2569    #[instrument(level = "trace")]
2570    pub async fn process_inbox_without_prepare(
2571        &self,
2572    ) -> Result<(Vec<ConfirmedBlockCertificate>, Option<RoundTimeout>), Error> {
2573        #[cfg(with_metrics)]
2574        let _latency = super::metrics::PROCESS_INBOX_WITHOUT_PREPARE_LATENCY.measure_latency();
2575
2576        let mut certificates = Vec::new();
2577        loop {
2578            // We provide no operations - this means that the only operations executed
2579            // will be epoch changes, receiving messages and processing event stream
2580            // updates, if any are pending.
2581            match self.execute_block(vec![], vec![]).await {
2582                Ok(ClientOutcome::Committed(certificate)) => certificates.push(certificate),
2583                Ok(ClientOutcome::Conflict(certificate)) => certificates.push(*certificate),
2584                Ok(ClientOutcome::WaitForTimeout(timeout)) => {
2585                    return Ok((certificates, Some(timeout)));
2586                }
2587                // Nothing in the inbox and no stream updates to be processed.
2588                Err(Error::LocalNodeError(LocalNodeError::WorkerError(
2589                    WorkerError::ChainError(chain_error),
2590                ))) if matches!(*chain_error, ChainError::EmptyBlock) => {
2591                    return Ok((certificates, None));
2592                }
2593                Err(error) => return Err(error),
2594            };
2595        }
2596    }
2597
2598    /// Returns operations to process all pending epoch changes: first the new epochs, in order,
2599    /// then the removed epochs, in order.
2600    async fn collect_epoch_changes(&self) -> Result<Vec<Operation>, Error> {
2601        let (mut min_epoch, mut next_epoch) = {
2602            let (epoch, committees) = self.epoch_and_committees().await?;
2603            let min_epoch = *committees.keys().next().unwrap_or(&Epoch::ZERO);
2604            (min_epoch, epoch.try_add_one()?)
2605        };
2606        let mut epoch_change_ops = Vec::new();
2607        while self
2608            .has_admin_event(EPOCH_STREAM_NAME, next_epoch.0)
2609            .await?
2610        {
2611            epoch_change_ops.push(Operation::system(SystemOperation::ProcessNewEpoch(
2612                next_epoch,
2613            )));
2614            next_epoch.try_add_assign_one()?;
2615        }
2616        while self
2617            .has_admin_event(REMOVED_EPOCH_STREAM_NAME, min_epoch.0)
2618            .await?
2619        {
2620            epoch_change_ops.push(Operation::system(SystemOperation::ProcessRemovedEpoch(
2621                min_epoch,
2622            )));
2623            min_epoch.try_add_assign_one()?;
2624        }
2625        Ok(epoch_change_ops)
2626    }
2627
2628    /// Returns whether the system event on the admin chain with the given stream name and key
2629    /// exists in storage.
2630    async fn has_admin_event(&self, stream_name: &[u8], index: u32) -> Result<bool, Error> {
2631        let event_id = EventId {
2632            chain_id: self.client.admin_chain_id,
2633            stream_id: StreamId::system(stream_name),
2634            index,
2635        };
2636        Ok(self
2637            .client
2638            .storage_client()
2639            .read_event(event_id)
2640            .await?
2641            .is_some())
2642    }
2643
2644    /// Returns the indices and events from the storage
2645    pub async fn events_from_index(
2646        &self,
2647        stream_id: StreamId,
2648        start_index: u32,
2649    ) -> Result<Vec<IndexAndEvent>, Error> {
2650        Ok(self
2651            .client
2652            .storage_client()
2653            .read_events_from_index(&self.chain_id, &stream_id, start_index)
2654            .await?)
2655    }
2656
2657    /// Deprecates all the configurations of voting rights up to the given one (admin chains
2658    /// only). Currently, each individual chain is still entitled to wait before accepting
2659    /// this command. However, it is expected that deprecated validators stop functioning
2660    /// shortly after such command is issued.
2661    #[instrument(level = "trace")]
2662    pub async fn revoke_epochs(
2663        &self,
2664        revoked_epoch: Epoch,
2665    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2666        self.prepare_chain().await?;
2667        let (current_epoch, committees) = self.epoch_and_committees().await?;
2668        ensure!(
2669            revoked_epoch < current_epoch,
2670            Error::CannotRevokeCurrentEpoch(current_epoch)
2671        );
2672        ensure!(
2673            committees.contains_key(&revoked_epoch),
2674            Error::EpochAlreadyRevoked
2675        );
2676        let operations = committees
2677            .keys()
2678            .filter_map(|epoch| {
2679                if *epoch <= revoked_epoch {
2680                    Some(Operation::system(SystemOperation::Admin(
2681                        AdminOperation::RemoveCommittee { epoch: *epoch },
2682                    )))
2683                } else {
2684                    None
2685                }
2686            })
2687            .collect();
2688        self.execute_operations(operations, vec![]).await
2689    }
2690
2691    /// Sends money to a chain.
2692    /// Do not check balance. (This may block the client)
2693    /// Do not confirm the transaction.
2694    #[cfg(with_testing)]
2695    #[instrument(level = "trace")]
2696    pub async fn transfer_to_account_unsafe_unconfirmed(
2697        &self,
2698        owner: AccountOwner,
2699        amount: Amount,
2700        recipient: Account,
2701    ) -> Result<ClientOutcome<ConfirmedBlockCertificate>, Error> {
2702        self.execute_operation(SystemOperation::Transfer {
2703            owner,
2704            recipient,
2705            amount,
2706        })
2707        .await
2708    }
2709
2710    #[instrument(level = "trace", skip(hash))]
2711    pub async fn read_confirmed_block(
2712        &self,
2713        hash: CryptoHash,
2714    ) -> Result<Arc<ConfirmedBlock>, Error> {
2715        self.client
2716            .storage_client()
2717            .read_confirmed_block(hash)
2718            .await?
2719            .ok_or(Error::MissingConfirmedBlock(hash))
2720    }
2721
2722    #[instrument(level = "trace", skip(hash))]
2723    pub async fn read_certificate(
2724        &self,
2725        hash: CryptoHash,
2726    ) -> Result<Arc<ConfirmedBlockCertificate>, Error> {
2727        self.client
2728            .storage_client()
2729            .read_certificate(hash)
2730            .await?
2731            .ok_or(Error::ReadCertificatesError(vec![hash]))
2732    }
2733
2734    /// Handles any cross-chain requests for any pending outgoing messages.
2735    #[instrument(level = "trace")]
2736    pub async fn retry_pending_outgoing_messages(&self) -> Result<(), Error> {
2737        self.client
2738            .retry_pending_cross_chain_requests(self.chain_id)
2739            .await?;
2740        Ok(())
2741    }
2742
2743    #[instrument(level = "trace", skip(local_node))]
2744    async fn local_chain_info(
2745        &self,
2746        chain_id: ChainId,
2747        local_node: &LocalNodeClient<Env::Storage>,
2748    ) -> Result<Option<Box<ChainInfo>>, Error> {
2749        match local_node.chain_info(chain_id).await {
2750            Ok(info) => Ok(Some(info)),
2751            Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => Ok(None),
2752            Err(err) => Err(err.into()),
2753        }
2754    }
2755
2756    #[instrument(level = "trace", skip(chain_id, local_node))]
2757    async fn local_next_block_height(
2758        &self,
2759        chain_id: ChainId,
2760        local_node: &LocalNodeClient<Env::Storage>,
2761    ) -> Result<BlockHeight, Error> {
2762        Ok(self
2763            .local_chain_info(chain_id, local_node)
2764            .await?
2765            .map_or(BlockHeight::ZERO, |info| info.next_block_height))
2766    }
2767
2768    /// Returns the next height we expect to receive from the given sender chain, according to the
2769    /// local inbox.
2770    #[instrument(level = "trace")]
2771    async fn local_next_height_to_receive(&self, origin: ChainId) -> Result<BlockHeight, Error> {
2772        Ok(self
2773            .client
2774            .local_node
2775            .get_inbox_next_height(self.chain_id, origin)
2776            .await?)
2777    }
2778
2779    #[instrument(level = "trace", skip(remote_node, local_node, notification))]
2780    async fn process_notification(
2781        &self,
2782        remote_node: RemoteNode<Env::ValidatorNode>,
2783        local_node: LocalNodeClient<Env::Storage>,
2784        notification: Notification,
2785    ) -> Result<(), Error> {
2786        let listening_mode = self.client.chain_mode(notification.chain_id);
2787        let relevant = listening_mode
2788            .as_ref()
2789            .is_some_and(|mode| mode.is_relevant(&notification.reason));
2790        if !relevant {
2791            tracing::trace!(
2792                chain_id = %notification.chain_id,
2793                reason = ?notification.reason,
2794                ?listening_mode,
2795                "Ignoring notification due to listening mode"
2796            );
2797            return Ok(());
2798        }
2799        match notification.reason {
2800            Reason::NewIncomingBundle { origin, height } => {
2801                if self.options.message_policy.ignores_origin(&origin) {
2802                    trace!(
2803                        chain_id = %self.chain_id,
2804                        %origin,
2805                        %height,
2806                        "Skipping NewIncomingBundle notification: origin filtered by message_policy"
2807                    );
2808                    return Ok(());
2809                }
2810                if self.local_next_height_to_receive(origin).await? > height {
2811                    debug!(
2812                        chain_id = %self.chain_id,
2813                        "Accepting redundant notification for new message"
2814                    );
2815                    return Ok(());
2816                }
2817                self.client
2818                    .download_sender_block_with_sending_ancestors(
2819                        self.chain_id,
2820                        origin,
2821                        height,
2822                        &remote_node,
2823                    )
2824                    .await?;
2825                if self.local_next_height_to_receive(origin).await? <= height {
2826                    info!(
2827                        chain_id = %self.chain_id,
2828                        "NewIncomingBundle: Fail to synchronize new message after notification"
2829                    );
2830                }
2831            }
2832            Reason::NewBlock { height, .. } => {
2833                let chain_id = notification.chain_id;
2834                let local_height = self.local_next_block_height(chain_id, &local_node).await?;
2835                if local_height > height {
2836                    debug!(
2837                        chain_id = %self.chain_id,
2838                        "Accepting redundant notification for new block"
2839                    );
2840                    return Ok(());
2841                }
2842                // The below will only happen in modes other than EventsOnly, as the
2843                // NewBlock notification is only relevant to other modes.
2844                self.client
2845                    .synchronize_chain_state_from(&remote_node, chain_id)
2846                    .await?;
2847                if self.local_next_block_height(chain_id, &local_node).await? <= height {
2848                    error!("NewBlock: Fail to synchronize new block after notification");
2849                }
2850                trace!(
2851                    chain_id = %self.chain_id,
2852                    %height,
2853                    "NewBlock: processed notification",
2854                );
2855            }
2856            Reason::NewEvents {
2857                height, block_hash, ..
2858            } => {
2859                let chain_id = notification.chain_id;
2860                let local_height = self.local_next_block_height(chain_id, &local_node).await?;
2861                if local_height > height {
2862                    debug!(
2863                        chain_id = %self.chain_id,
2864                        "Accepting redundant notification for new events"
2865                    );
2866                    return Ok(());
2867                }
2868                trace!(
2869                    chain_id = %self.chain_id,
2870                    %height,
2871                    "NewEvents: processing notification"
2872                );
2873                // Use the subscribed streams from EventsOnly mode to figure out which
2874                // streams we are interested in.
2875                let relevant_streams = match self.listening_mode() {
2876                    Some(ListeningMode::EventsOnly(subscribed)) => subscribed,
2877                    // other cases should be unreachable, as the NewEvents notification is
2878                    // only relevant in the EventsOnly mode
2879                    _ => unreachable!(),
2880                };
2881                self.client
2882                    .download_event_bearing_blocks(
2883                        self.chain_id,
2884                        BTreeSet::from([(height, block_hash)]),
2885                        local_height,
2886                        &relevant_streams,
2887                        &remote_node,
2888                    )
2889                    .await?;
2890            }
2891            Reason::NewRound { height, round } => {
2892                let chain_id = notification.chain_id;
2893                if let Some(info) = self.local_chain_info(chain_id, &local_node).await? {
2894                    if (info.next_block_height, info.manager.current_round) >= (height, round) {
2895                        debug!(
2896                            chain_id = %self.chain_id,
2897                            "Accepting redundant notification for new round"
2898                        );
2899                        return Ok(());
2900                    }
2901                }
2902                self.client
2903                    .synchronize_chain_state_from(&remote_node, chain_id)
2904                    .await?;
2905                let Some(info) = self.local_chain_info(chain_id, &local_node).await? else {
2906                    error!(
2907                        chain_id = %self.chain_id,
2908                        "NewRound: Fail to read local chain info for {chain_id}"
2909                    );
2910                    return Ok(());
2911                };
2912                if (info.next_block_height, info.manager.current_round) < (height, round) {
2913                    info!(
2914                        chain_id = %self.chain_id,
2915                        "NewRound: Fail to synchronize new block after notification"
2916                    );
2917                }
2918            }
2919            Reason::BlockExecuted { .. } => {
2920                // No action needed.
2921            }
2922        }
2923        Ok(())
2924    }
2925
2926    /// Returns whether this chain is tracked by the client, i.e. we are updating its inbox.
2927    pub fn is_tracked(&self) -> bool {
2928        self.client.is_tracked(self.chain_id)
2929    }
2930
2931    /// Returns the listening mode for this chain, if it is tracked.
2932    pub fn listening_mode(&self) -> Option<ListeningMode> {
2933        self.client.chain_mode(self.chain_id)
2934    }
2935
2936    /// Spawns a task that listens to notifications about the current chain from all validators,
2937    /// and synchronizes the local state accordingly.
2938    ///
2939    /// The listening mode must be set in `Client::chain_modes` before calling this method.
2940    #[instrument(level = "trace", fields(chain_id = ?self.chain_id))]
2941    pub async fn listen(
2942        &self,
2943    ) -> Result<(impl Future<Output = ()>, AbortOnDrop, NotificationStream), Error> {
2944        use future::FutureExt as _;
2945
2946        async fn await_while_polling<F: FusedFuture>(
2947            future: F,
2948            background_work: impl FusedStream<Item = ()>,
2949        ) -> F::Output {
2950            tokio::pin!(future);
2951            tokio::pin!(background_work);
2952            loop {
2953                futures::select! {
2954                    _ = background_work.next() => (),
2955                    result = future => return result,
2956                }
2957            }
2958        }
2959
2960        let mut senders = HashMap::new();
2961        let mut circuit_breakers: HashMap<ValidatorPublicKey, CircuitBreakerState> = HashMap::new();
2962        let notifications = self.subscribe()?;
2963        let (abortable_notifications, abort) = stream::abortable(self.subscribe()?);
2964
2965        // Beware: if this future ceases to make progress, notification processing will
2966        // deadlock, because of the issue described in
2967        // https://github.com/linera-io/linera-protocol/pull/1173.
2968
2969        // TODO(#2013): replace this lock with an asynchronous communication channel
2970
2971        let mut process_notifications = FuturesUnordered::new();
2972
2973        match self
2974            .update_notification_streams(&mut senders, &mut circuit_breakers)
2975            .await
2976        {
2977            Ok(handler) => process_notifications.push(handler),
2978            Err(error) => error!("Failed to update committee: {error}"),
2979        };
2980
2981        let this = self.clone();
2982        let update_streams = async move {
2983            let mut abortable_notifications = abortable_notifications.fuse();
2984
2985            while let Some(notification) =
2986                await_while_polling(abortable_notifications.next(), &mut process_notifications)
2987                    .await
2988            {
2989                if let Reason::NewBlock { .. } = notification.reason {
2990                    match Box::pin(await_while_polling(
2991                        this.update_notification_streams(&mut senders, &mut circuit_breakers)
2992                            .fuse(),
2993                        &mut process_notifications,
2994                    ))
2995                    .await
2996                    {
2997                        Ok(handler) => process_notifications.push(handler),
2998                        Err(error) => error!("Failed to update committee: {error}"),
2999                    }
3000                }
3001            }
3002
3003            for abort in senders.into_values() {
3004                abort.abort();
3005            }
3006
3007            let () = process_notifications.collect().await;
3008        }
3009        .in_current_span();
3010
3011        Ok((update_streams, AbortOnDrop(abort), notifications))
3012    }
3013
3014    #[instrument(level = "trace", skip(senders, circuit_breakers))]
3015    async fn update_notification_streams(
3016        &self,
3017        senders: &mut HashMap<ValidatorPublicKey, AbortHandle>,
3018        circuit_breakers: &mut HashMap<ValidatorPublicKey, CircuitBreakerState>,
3019    ) -> Result<impl Future<Output = ()>, Error> {
3020        let initial_probe_interval = self
3021            .options
3022            .notification_circuit_breaker_initial_probe_interval;
3023        let max_probe_interval = self.options.notification_circuit_breaker_max_probe_interval;
3024        let (nodes, local_node) = {
3025            // For EventsOnly chains we may not have the chain's own committee locally,
3026            // and attempting to fetch it would trigger a full sync. Use the admin
3027            // committee instead — we only need it to know which validators to connect to.
3028            let committee = if self
3029                .listening_mode()
3030                .is_some_and(|m| m.should_sync_chain_state())
3031            {
3032                self.local_committee().await?
3033            } else {
3034                self.client.admin_committee().await?.1
3035            };
3036            let nodes: HashMap<_, _> = self
3037                .client
3038                .validator_node_provider()
3039                .make_nodes(&committee)?
3040                .collect();
3041            (nodes, self.client.local_node.clone())
3042        };
3043        // Detect circuit breaker state transitions before cleaning up senders.
3044        for (validator, abort) in senders.iter() {
3045            if abort.is_aborted() && nodes.contains_key(validator) {
3046                if let Some(state) = circuit_breakers.get_mut(validator) {
3047                    // Was probing -> probe failed -> escalate interval.
3048                    state.probe_interval = (state.probe_interval * 2).min(max_probe_interval);
3049                    state.next_probe_at = Instant::now() + state.probe_interval;
3050                    warn!(
3051                        %validator,
3052                        chain_id = %self.chain_id,
3053                        next_probe_in = ?state.probe_interval,
3054                        "Validator still unhealthy after probe; increasing probe interval"
3055                    );
3056                } else {
3057                    // First failure -> enter circuit breaker.
3058                    circuit_breakers.insert(
3059                        *validator,
3060                        CircuitBreakerState {
3061                            next_probe_at: Instant::now() + initial_probe_interval,
3062                            probe_interval: initial_probe_interval,
3063                        },
3064                    );
3065                    error!(
3066                        %validator,
3067                        chain_id = %self.chain_id,
3068                        next_probe_in = ?initial_probe_interval,
3069                        "Validator notification stream ended; entering circuit breaker"
3070                    );
3071                }
3072            } else if !abort.is_aborted() && circuit_breakers.contains_key(validator) {
3073                // Stream alive while in circuit breaker -> probe succeeded -> recovered.
3074                info!(
3075                    %validator,
3076                    chain_id = %self.chain_id,
3077                    "Validator recovered from circuit breaker"
3078                );
3079                circuit_breakers.remove(validator);
3080            }
3081        }
3082
3083        senders.retain(|validator, abort| {
3084            if !nodes.contains_key(validator) {
3085                abort.abort();
3086            }
3087            !abort.is_aborted()
3088        });
3089        circuit_breakers.retain(|validator, _| nodes.contains_key(validator));
3090
3091        let validator_tasks = FuturesUnordered::new();
3092        for (public_key, node) in nodes {
3093            let hash_map::Entry::Vacant(entry) = senders.entry(public_key) else {
3094                continue;
3095            };
3096
3097            // Circuit breaker: skip if not time to probe yet.
3098            if let Some(state) = circuit_breakers.get(&public_key) {
3099                if Instant::now() < state.next_probe_at {
3100                    continue;
3101                }
3102                debug!(
3103                    validator = %public_key,
3104                    chain_id = %self.chain_id,
3105                    "Probing unhealthy validator"
3106                );
3107            }
3108
3109            let address = node.address();
3110            let this = self.clone();
3111            let listening_mode_for_sync = self.listening_mode();
3112            let stream = stream::once({
3113                let node = node.clone();
3114                async move {
3115                    let stream = node.subscribe(vec![this.chain_id]).await?;
3116                    // Only now the notification stream is established. We may have missed
3117                    // notifications since the last time we synchronized.
3118                    let remote_node = RemoteNode { public_key, node };
3119                    if listening_mode_for_sync
3120                        .as_ref()
3121                        .is_some_and(|mode| mode.should_sync_chain_state())
3122                    {
3123                        this.client
3124                            .synchronize_chain_state_from(&remote_node, this.chain_id)
3125                            .await?;
3126                    } else {
3127                        // For EventsOnly chains, do a lightweight initial sync:
3128                        // query the validator for the latest event-bearing blocks
3129                        // for our subscribed streams, then download them sparsely.
3130                        if let Some(ListeningMode::EventsOnly(subscribed)) =
3131                            listening_mode_for_sync.as_ref()
3132                        {
3133                            if let Err(error) = this
3134                                .client
3135                                .sync_events_from_node(this.chain_id, subscribed, &remote_node)
3136                                .await
3137                            {
3138                                debug!(
3139                                    chain_id = %this.chain_id,
3140                                    %error,
3141                                    "Failed initial sparse sync for EventsOnly chain"
3142                                );
3143                            }
3144                        }
3145                    }
3146                    Ok::<_, Error>(stream)
3147                }
3148            })
3149            .filter_map(move |result| {
3150                let address = address.clone();
3151                async move {
3152                    if let Err(error) = &result {
3153                        info!(?error, address, "could not connect to validator");
3154                    } else {
3155                        debug!(address, "connected to validator");
3156                    }
3157                    result.ok()
3158                }
3159            })
3160            .flatten();
3161            let (stream, abort) = stream::abortable(stream);
3162            let mut stream = Box::pin(stream);
3163            let abort_on_exit = abort.clone();
3164            let this = self.clone();
3165            let local_node = local_node.clone();
3166            let remote_node = RemoteNode { public_key, node };
3167            validator_tasks.push(async move {
3168                while let Some(notification) = stream.next().await {
3169                    if let Err(error) = this
3170                        .process_notification(
3171                            remote_node.clone(),
3172                            local_node.clone(),
3173                            notification.clone(),
3174                        )
3175                        .await
3176                    {
3177                        tracing::info!(
3178                            chain_id = %this.chain_id,
3179                            address = remote_node.address(),
3180                            ?notification,
3181                            %error,
3182                            "failed to process notification",
3183                        );
3184                    }
3185                }
3186                warn!(
3187                    chain_id = %this.chain_id,
3188                    address = remote_node.address(),
3189                    "Validator notification stream ended"
3190                );
3191                abort_on_exit.abort();
3192            });
3193            entry.insert(abort);
3194        }
3195        Ok(validator_tasks.collect())
3196    }
3197
3198    /// Attempts to update a validator with the local information.
3199    #[instrument(level = "trace", skip(remote_node))]
3200    pub async fn sync_validator(&self, remote_node: Env::ValidatorNode) -> Result<(), Error> {
3201        let validator_next_block_height = match remote_node
3202            .handle_chain_info_query(ChainInfoQuery::new(self.chain_id))
3203            .await
3204        {
3205            Ok(info) => info.info.next_block_height,
3206            Err(NodeError::BlobsNotFound(_)) => BlockHeight::ZERO,
3207            Err(err) => return Err(err.into()),
3208        };
3209        let local_next_block_height = self.chain_info().await?.next_block_height;
3210
3211        if validator_next_block_height >= local_next_block_height {
3212            debug!("Validator is up-to-date with local state");
3213            return Ok(());
3214        }
3215
3216        let heights: Vec<_> = (validator_next_block_height.0..local_next_block_height.0)
3217            .map(BlockHeight)
3218            .collect();
3219
3220        let certificates = self
3221            .client
3222            .storage_client()
3223            .read_certificates_by_heights(self.chain_id, &heights)
3224            .await?
3225            .into_iter()
3226            .flatten()
3227            .collect::<Vec<_>>();
3228
3229        for certificate in certificates {
3230            match remote_node
3231                .handle_confirmed_certificate(
3232                    certificate.clone(),
3233                    CrossChainMessageDelivery::NonBlocking,
3234                )
3235                .await
3236            {
3237                Ok(_) => (),
3238                Err(NodeError::BlobsNotFound(missing_blob_ids)) => {
3239                    // Upload the missing blobs we have and retry.
3240                    let missing_blobs: Vec<_> = self
3241                        .client
3242                        .storage_client()
3243                        .read_blobs(&missing_blob_ids)
3244                        .await?
3245                        .into_iter()
3246                        .flatten()
3247                        .collect();
3248                    remote_node.upload_blobs(missing_blobs).await?;
3249                    remote_node
3250                        .handle_confirmed_certificate(
3251                            certificate,
3252                            CrossChainMessageDelivery::NonBlocking,
3253                        )
3254                        .await?;
3255                }
3256                Err(err) => return Err(err.into()),
3257            }
3258        }
3259
3260        Ok(())
3261    }
3262}
3263
3264#[cfg(with_testing)]
3265impl<Env: Environment> ChainClient<Env> {
3266    pub async fn process_notification_from(
3267        &self,
3268        notification: Notification,
3269        validator: (ValidatorPublicKey, &str),
3270    ) {
3271        let mut node_list = self
3272            .client
3273            .validator_node_provider()
3274            .make_nodes_from_list(vec![validator])
3275            .unwrap();
3276        let (public_key, node) = node_list.next().unwrap();
3277        let remote_node = RemoteNode { node, public_key };
3278        let local_node = self.client.local_node.clone();
3279        self.process_notification(remote_node, local_node, notification)
3280            .await
3281            .unwrap();
3282    }
3283}