Skip to main content

linera_client/
client_context.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6#[cfg(not(web))]
7use futures::StreamExt as _;
8use futures::{Future, TryStreamExt as _};
9use linera_base::{
10    crypto::{CryptoHash, ValidatorPublicKey},
11    data_types::{ChainDescription, Epoch, Timestamp},
12    identifiers::{Account, AccountOwner, ChainId},
13    ownership::ChainOwnership,
14    time::{Duration, Instant},
15    util::future::FutureSyncExt as _,
16};
17use linera_chain::{manager::LockingBlock, types::ConfirmedBlockCertificate};
18use linera_core::{
19    client::{chain_client, ChainClient, Client, ListeningMode},
20    data_types::{ChainInfo, ChainInfoQuery, ClientOutcome},
21    join_set_ext::JoinSet,
22    node::ValidatorNode,
23    wallet, Environment, JoinSetExt as _, Wallet as _,
24};
25use linera_rpc::node_provider::{NodeOptions, NodeProvider};
26use linera_storage::Storage as _;
27use linera_version::VersionInfo;
28use thiserror_context::Context;
29use tracing::{debug, info, warn};
30#[cfg(not(web))]
31use {
32    crate::{
33        benchmark::{fungible_transfer, Benchmark, BenchmarkError},
34        client_metrics::ClientMetrics,
35    },
36    futures::stream,
37    linera_base::{
38        crypto::AccountPublicKey,
39        data_types::{Amount, BlockHeight},
40        identifiers::{ApplicationId, BlobType},
41    },
42    linera_execution::{
43        system::{OpenChainConfig, SystemOperation},
44        Operation,
45    },
46    std::{collections::HashSet, path::Path},
47    tokio::{sync::mpsc, task},
48};
49#[cfg(feature = "fs")]
50use {
51    linera_base::{
52        data_types::{BlobContent, Bytecode},
53        identifiers::ModuleId,
54        vm::VmRuntime,
55    },
56    linera_core::client::create_bytecode_blobs,
57    std::{fs, path::PathBuf},
58};
59
60use crate::{
61    chain_listener::{self, ClientContext as _, ClientContextExt as _},
62    client_options::{ChainOwnershipConfig, Options},
63    config::GenesisConfig,
64    error, util, Error,
65};
66
67/// Results from querying a validator about version, network description, and chain info.
68pub struct ValidatorQueryResults {
69    /// The validator's version information.
70    pub version_info: Result<VersionInfo, Error>,
71    /// The validator's genesis config hash.
72    pub genesis_config_hash: Result<CryptoHash, Error>,
73    /// The validator's chain info (if valid and signature check passed).
74    pub chain_info: Result<ChainInfo, Error>,
75}
76
77impl ValidatorQueryResults {
78    /// Returns a vector of references to all errors in the query results.
79    pub fn errors(&self) -> Vec<&Error> {
80        let mut errors = Vec::new();
81        if let Err(e) = &self.version_info {
82            errors.push(e);
83        }
84        if let Err(e) = &self.genesis_config_hash {
85            errors.push(e);
86        }
87        if let Err(e) = &self.chain_info {
88            errors.push(e);
89        }
90        errors
91    }
92
93    /// Prints validator information to stdout.
94    ///
95    /// Prints public key, address, and optionally weight, version info, and chain info.
96    /// If `reference` is provided, only prints fields that differ from the reference.
97    pub fn print(
98        &self,
99        public_key: Option<&ValidatorPublicKey>,
100        address: Option<&str>,
101        weight: Option<u64>,
102        reference: Option<&ValidatorQueryResults>,
103    ) {
104        if let Some(key) = public_key {
105            println!("Public key: {key}");
106        }
107        if let Some(address) = address {
108            println!("Address: {address}");
109        }
110        if let Some(w) = weight {
111            println!("Weight: {w}");
112        }
113
114        let ref_version = reference.and_then(|ref_results| ref_results.version_info.as_ref().ok());
115        match &self.version_info {
116            Ok(version_info) => {
117                if ref_version.is_none_or(|ref_v| ref_v.crate_version != version_info.crate_version)
118                {
119                    println!("Linera protocol: v{}", version_info.crate_version);
120                }
121                if ref_version.is_none_or(|ref_v| ref_v.rpc_hash != version_info.rpc_hash) {
122                    println!("RPC API hash: {}", version_info.rpc_hash);
123                }
124                if ref_version.is_none_or(|ref_v| ref_v.graphql_hash != version_info.graphql_hash) {
125                    println!("GraphQL API hash: {}", version_info.graphql_hash);
126                }
127                if ref_version.is_none_or(|ref_v| ref_v.wit_hash != version_info.wit_hash) {
128                    println!("WIT API hash: {}", version_info.wit_hash);
129                }
130                if ref_version.is_none_or(|ref_v| {
131                    (&ref_v.git_commit, ref_v.git_dirty)
132                        != (&version_info.git_commit, version_info.git_dirty)
133                }) {
134                    println!(
135                        "Source code: {}/tree/{}{}",
136                        env!("CARGO_PKG_REPOSITORY"),
137                        version_info.git_commit,
138                        if version_info.git_dirty {
139                            " (dirty)"
140                        } else {
141                            ""
142                        }
143                    );
144                }
145            }
146            Err(err) => println!("Error getting version info: {err}"),
147        }
148
149        let ref_genesis_hash =
150            reference.and_then(|ref_results| ref_results.genesis_config_hash.as_ref().ok());
151        match &self.genesis_config_hash {
152            Ok(hash) if ref_genesis_hash.is_some_and(|ref_hash| ref_hash == hash) => {}
153            Ok(hash) => println!("Genesis config hash: {hash}"),
154            Err(err) => println!("Error getting genesis config: {err}"),
155        }
156
157        let ref_info = reference.and_then(|ref_results| ref_results.chain_info.as_ref().ok());
158        match &self.chain_info {
159            Ok(info) => {
160                if ref_info.is_none_or(|ref_info| info.block_hash != ref_info.block_hash) {
161                    if let Some(hash) = info.block_hash {
162                        println!("Block hash: {hash}");
163                    } else {
164                        println!("Block hash: None");
165                    }
166                }
167                if ref_info
168                    .is_none_or(|ref_info| info.next_block_height != ref_info.next_block_height)
169                {
170                    println!("Next height: {}", info.next_block_height);
171                }
172                if ref_info.is_none_or(|ref_info| info.timestamp != ref_info.timestamp) {
173                    println!("Timestamp: {}", info.timestamp);
174                }
175                if ref_info.is_none_or(|ref_info| info.epoch != ref_info.epoch) {
176                    println!("Epoch: {}", info.epoch);
177                }
178                if ref_info.is_none_or(|ref_info| {
179                    info.manager.current_round != ref_info.manager.current_round
180                }) {
181                    println!("Round: {}", info.manager.current_round);
182                }
183                if let Some(leader) = info.manager.leader {
184                    println!("Leader: {leader}");
185                }
186                if let Some(locking) = &info.manager.requested_locking {
187                    match &**locking {
188                        LockingBlock::Fast(proposal) => {
189                            println!(
190                                "Locking fast block from {}",
191                                proposal.content.block.timestamp
192                            );
193                        }
194                        LockingBlock::Regular(validated) => {
195                            println!(
196                                "Locking block {} in {} from {}",
197                                validated.hash(),
198                                validated.round,
199                                validated.block().header.timestamp
200                            );
201                        }
202                    }
203                }
204            }
205            Err(err) => println!("Error getting chain info: {err}"),
206        }
207        println!();
208    }
209}
210
211pub struct ClientContext<Env: Environment> {
212    pub client: Arc<Client<Env>>,
213    // TODO(#5083): this doesn't really need to be stored
214    pub genesis_config: crate::config::GenesisConfig,
215    pub send_timeout: Duration,
216    pub recv_timeout: Duration,
217    pub retry_delay: Duration,
218    pub max_retries: u32,
219    pub max_backoff: Duration,
220    pub chain_listeners: JoinSet,
221    // TODO(#5082): move this into the upstream UI layers (maybe just the CLI)
222    pub default_chain: Option<ChainId>,
223    #[cfg(not(web))]
224    pub client_metrics: Option<ClientMetrics>,
225}
226
227impl<Env: Environment> chain_listener::ClientContext for ClientContext<Env> {
228    type Environment = Env;
229
230    fn wallet(&self) -> &Env::Wallet {
231        self.client.wallet()
232    }
233
234    fn storage(&self) -> &Env::Storage {
235        self.client.storage_client()
236    }
237
238    fn client(&self) -> &Arc<Client<Env>> {
239        &self.client
240    }
241
242    #[cfg(not(web))]
243    fn timing_sender(
244        &self,
245    ) -> Option<mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
246        self.client_metrics
247            .as_ref()
248            .map(|metrics| metrics.timing_sender.clone())
249    }
250
251    async fn update_wallet_for_new_chain(
252        &mut self,
253        chain_id: ChainId,
254        owner: Option<AccountOwner>,
255        timestamp: Timestamp,
256        epoch: Epoch,
257    ) -> Result<(), Error> {
258        self.update_wallet_for_new_chain(chain_id, owner, timestamp, epoch)
259            .make_sync()
260            .await
261    }
262
263    async fn update_wallet(&mut self, client: &ChainClient<Env>) -> Result<(), Error> {
264        self.update_wallet_from_client(client).make_sync().await
265    }
266}
267
268impl<S, Si, W> ClientContext<linera_core::environment::Impl<S, NodeProvider, Si, W>>
269where
270    S: linera_core::environment::Storage,
271    Si: linera_core::environment::Signer,
272    W: linera_core::environment::Wallet,
273{
274    // not worth refactoring this because
275    // https://github.com/linera-io/linera-protocol/issues/5082
276    // https://github.com/linera-io/linera-protocol/issues/5083
277    #[expect(clippy::too_many_arguments)]
278    pub async fn new(
279        storage: S,
280        wallet: W,
281        signer: Si,
282        options: &Options,
283        default_chain: Option<ChainId>,
284        genesis_config: GenesisConfig,
285        block_cache_size: usize,
286        execution_state_cache_size: usize,
287    ) -> Result<Self, Error> {
288        #[cfg(not(web))]
289        let timing_config = options.to_timing_config();
290        let node_provider = NodeProvider::new(NodeOptions {
291            send_timeout: options.send_timeout,
292            recv_timeout: options.recv_timeout,
293            retry_delay: options.retry_delay,
294            max_retries: options.max_retries,
295            max_backoff: options.max_backoff,
296        });
297        let chain_modes: Vec<_> = wallet
298            .items()
299            .map_ok(|(id, chain)| {
300                let mode = if chain.is_follow_only() {
301                    ListeningMode::FollowChain
302                } else {
303                    ListeningMode::FullChain
304                };
305                (id, mode)
306            })
307            .try_collect()
308            .await
309            .map_err(error::Inner::wallet)?;
310        let name = match chain_modes.len() {
311            0 => "Client node".to_string(),
312            1 => format!("Client node for {:.8}", chain_modes[0].0),
313            n => format!(
314                "Client node for {:.8} and {} others",
315                chain_modes[0].0,
316                n - 1
317            ),
318        };
319
320        let client = Client::new(
321            linera_core::environment::Impl {
322                network: node_provider,
323                storage,
324                signer,
325                wallet,
326            },
327            genesis_config.admin_chain_id(),
328            options.long_lived_services,
329            chain_modes,
330            name,
331            util::non_zero_duration(options.chain_worker_ttl),
332            util::non_zero_duration(options.sender_chain_worker_ttl),
333            options.cross_chain_batch_size_limit,
334            options.to_chain_client_options(),
335            block_cache_size,
336            execution_state_cache_size,
337            &options.to_requests_scheduler_config(),
338        );
339
340        #[cfg(not(web))]
341        let client_metrics = if timing_config.enabled {
342            Some(ClientMetrics::new(timing_config))
343        } else {
344            None
345        };
346
347        Ok(ClientContext {
348            client: Arc::new(client),
349            default_chain,
350            genesis_config,
351            send_timeout: options.send_timeout,
352            recv_timeout: options.recv_timeout,
353            retry_delay: options.retry_delay,
354            max_retries: options.max_retries,
355            max_backoff: options.max_backoff,
356            chain_listeners: JoinSet::default(),
357            #[cfg(not(web))]
358            client_metrics,
359        })
360    }
361}
362
363impl<Env: Environment> ClientContext<Env> {
364    // TODO(#5084) this (and other injected dependencies) should not be re-exposed by the
365    // client interface
366    /// Returns a reference to the wallet.
367    pub fn wallet(&self) -> &Env::Wallet {
368        self.client.wallet()
369    }
370
371    /// Returns the ID of the admin chain.
372    pub fn admin_chain_id(&self) -> ChainId {
373        self.client.admin_chain_id()
374    }
375
376    /// Retrieve the default account. Current this is the common account of the default
377    /// chain.
378    pub fn default_account(&self) -> Account {
379        Account::chain(self.default_chain())
380    }
381
382    /// Retrieve the default chain.
383    pub fn default_chain(&self) -> ChainId {
384        self.default_chain
385            .expect("default chain requested but none set")
386    }
387
388    pub async fn first_non_admin_chain(&self) -> Result<ChainId, Error> {
389        let admin_chain_id = self.admin_chain_id();
390        let chain_ids = self
391            .wallet()
392            .chain_ids()
393            .try_filter(|chain_id| futures::future::ready(*chain_id != admin_chain_id))
394            .try_collect::<Vec<ChainId>>()
395            .await
396            .map_err(Error::wallet)?;
397        Ok(chain_ids
398            .into_iter()
399            .min()
400            .expect("No non-admin chain specified in wallet with no non-admin chain"))
401    }
402
403    // TODO(#5084) this should match the `NodeProvider` from the `Environment`
404    pub fn make_node_provider(&self) -> NodeProvider {
405        NodeProvider::new(self.make_node_options())
406    }
407
408    fn make_node_options(&self) -> NodeOptions {
409        NodeOptions {
410            send_timeout: self.send_timeout,
411            recv_timeout: self.recv_timeout,
412            retry_delay: self.retry_delay,
413            max_retries: self.max_retries,
414            max_backoff: self.max_backoff,
415        }
416    }
417
418    #[cfg(not(web))]
419    pub fn client_metrics(&self) -> Option<&ClientMetrics> {
420        self.client_metrics.as_ref()
421    }
422
423    pub async fn update_wallet_from_client<Env_: Environment>(
424        &self,
425        client: &ChainClient<Env_>,
426    ) -> Result<(), Error> {
427        let info = client.chain_info().await?;
428        let chain_id = info.chain_id;
429        let existing_owner = self
430            .wallet()
431            .get(chain_id)
432            .await
433            .map_err(error::Inner::wallet)?
434            .and_then(|chain| chain.owner);
435
436        // Only persist proposals that were made in the fast round: they need to be
437        // remembered across sessions to make sure there are no conflicting fast proposals.
438        let pending_fast_proposal = client
439            .pending_proposal()
440            .await
441            .filter(|p| p.round.is_some_and(|r| r.is_fast()));
442        let new_chain = wallet::Chain {
443            pending_fast_proposal,
444            owner: existing_owner,
445            ..info.as_ref().into()
446        };
447
448        self.wallet()
449            .insert(chain_id, new_chain)
450            .await
451            .map_err(error::Inner::wallet)?;
452
453        Ok(())
454    }
455
456    /// Remembers the new chain and its owner (if any) in the wallet.
457    pub async fn update_wallet_for_new_chain(
458        &mut self,
459        chain_id: ChainId,
460        owner: Option<AccountOwner>,
461        timestamp: Timestamp,
462        epoch: Epoch,
463    ) -> Result<(), Error> {
464        self.wallet()
465            .try_insert(
466                chain_id,
467                linera_core::wallet::Chain::new(owner, epoch, timestamp),
468            )
469            .await
470            .map_err(error::Inner::wallet)?;
471        Ok(())
472    }
473
474    /// Registers a chain from its description: initializes local storage, adds to
475    /// wallet, and starts tracking it for cross-chain message delivery.
476    pub async fn extend_with_chain(
477        &mut self,
478        description: ChainDescription,
479        owner: Option<AccountOwner>,
480    ) -> Result<(), Error> {
481        let chain_id = description.id();
482        self.client
483            .storage_client()
484            .create_chain(description.clone())
485            .await?;
486        self.wallet()
487            .try_insert(
488                chain_id,
489                linera_core::wallet::Chain::new(
490                    owner,
491                    description.config().epoch,
492                    description.timestamp(),
493                ),
494            )
495            .await
496            .map_err(error::Inner::wallet)?;
497        self.client
498            .extend_chain_mode(chain_id, ListeningMode::FullChain);
499        Ok(())
500    }
501
502    pub async fn process_inbox(
503        &mut self,
504        chain_client: &ChainClient<Env>,
505    ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
506        let mut certificates = Vec::new();
507        // Try processing the inbox optimistically without waiting for validator notifications.
508        let (new_certificates, maybe_timeout) = {
509            chain_client.synchronize_from_validators().await?;
510            let result = chain_client.process_inbox_without_prepare().await;
511            self.update_wallet_from_client(chain_client).await?;
512            result?
513        };
514        certificates.extend(new_certificates);
515        if maybe_timeout.is_none() {
516            return Ok(certificates);
517        }
518
519        // Start listening for notifications, so we learn about new rounds and blocks.
520        let (listener, _listen_handle, mut notification_stream) = chain_client.listen().await?;
521        self.chain_listeners.spawn_task(listener);
522
523        loop {
524            let (new_certificates, maybe_timeout) = {
525                let result = chain_client.process_inbox().await;
526                self.update_wallet_from_client(chain_client).await?;
527                result?
528            };
529            certificates.extend(new_certificates);
530            if let Some(timestamp) = maybe_timeout {
531                util::wait_for_next_round(&mut notification_stream, timestamp).await
532            } else {
533                return Ok(certificates);
534            }
535        }
536    }
537
538    pub async fn assign_new_chain_to_key(
539        &mut self,
540        chain_id: ChainId,
541        owner: AccountOwner,
542    ) -> Result<(), Error> {
543        self.client
544            .extend_chain_mode(chain_id, ListeningMode::FullChain);
545        let client = self.make_chain_client(chain_id).await?;
546        let info = client.prepare_for_owner(owner).await.map_err(|error| {
547            tracing::error!(%chain_id, %owner, %error, "Chain is not owned");
548            error::Inner::ChainOwnership
549        })?;
550
551        // Try to modify existing chain entry, setting the owner.
552        let modified = self
553            .wallet()
554            .modify(chain_id, |chain| chain.owner = Some(owner))
555            .await
556            .map_err(error::Inner::wallet)?;
557        // If the chain didn't exist, insert a new entry.
558        if modified.is_none() {
559            self.wallet()
560                .insert(
561                    chain_id,
562                    wallet::Chain {
563                        owner: Some(owner),
564                        timestamp: info.timestamp,
565                        epoch: Some(info.epoch),
566                        ..Default::default()
567                    },
568                )
569                .await
570                .map_err(error::Inner::wallet)
571                .context("assigning new chain")?;
572        }
573        Ok(())
574    }
575
576    /// Applies the given function to the chain client.
577    ///
578    /// Updates the wallet regardless of the outcome. As long as the function returns a round
579    /// timeout, it will wait and retry.
580    pub async fn apply_client_command<E, F, Fut, T>(
581        &mut self,
582        client: &ChainClient<Env>,
583        mut f: F,
584    ) -> Result<T, Error>
585    where
586        F: FnMut(&ChainClient<Env>) -> Fut,
587        Fut: Future<Output = Result<ClientOutcome<T>, E>>,
588        Error: From<E>,
589    {
590        client.prepare_chain().await?;
591        // Try applying f optimistically without validator notifications. Return if committed.
592        let result = f(client).await;
593        self.update_wallet_from_client(client).await?;
594        match result? {
595            ClientOutcome::Committed(t) => return Ok(t),
596            ClientOutcome::Conflict(certificate) => {
597                return Err(chain_client::Error::Conflict(certificate.hash()).into());
598            }
599            ClientOutcome::WaitForTimeout(_) => {}
600        }
601
602        // Start listening for notifications, so we learn about new rounds and blocks.
603        let (listener, _listen_handle, mut notification_stream) = client.listen().await?;
604        self.chain_listeners.spawn_task(listener);
605
606        loop {
607            // Try applying f. Return if committed.
608            let result = f(client).await;
609            self.update_wallet_from_client(client).await?;
610            let timeout = match result? {
611                ClientOutcome::Committed(t) => return Ok(t),
612                ClientOutcome::Conflict(certificate) => {
613                    return Err(chain_client::Error::Conflict(certificate.hash()).into());
614                }
615                ClientOutcome::WaitForTimeout(timeout) => timeout,
616            };
617            // Otherwise wait and try again in the next round.
618            util::wait_for_next_round(&mut notification_stream, timeout).await;
619        }
620    }
621
622    pub async fn ownership(&mut self, chain_id: Option<ChainId>) -> Result<ChainOwnership, Error> {
623        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
624        let client = self.make_chain_client(chain_id).await?;
625        let info = client.chain_info().await?;
626        Ok(info.manager.ownership)
627    }
628
629    pub async fn change_ownership(
630        &mut self,
631        chain_id: Option<ChainId>,
632        ownership_config: ChainOwnershipConfig,
633    ) -> Result<(), Error> {
634        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
635        let mut chain_client = self.make_chain_client(chain_id).await?;
636        info!(
637            ?ownership_config, %chain_id, preferred_owner=?chain_client.preferred_owner(),
638            "Changing ownership of a chain"
639        );
640        let time_start = Instant::now();
641        let mut ownership = chain_client.query_chain_ownership().await?;
642        ownership_config.update(&mut ownership)?;
643
644        if ownership.super_owners.is_empty() && ownership.owners.is_empty() {
645            tracing::error!("At least one owner or super owner of the chain has to be set.");
646            return Err(error::Inner::ChainOwnership.into());
647        }
648
649        let certificate = self
650            .apply_client_command(&chain_client, |chain_client| {
651                let ownership = ownership.clone();
652                let chain_client = chain_client.clone();
653                async move {
654                    chain_client
655                        .change_ownership(ownership)
656                        .await
657                        .map_err(Error::from)
658                        .context("Failed to change ownership")
659                }
660            })
661            .await?;
662        let time_total = time_start.elapsed();
663        info!("Operation confirmed after {} ms", time_total.as_millis());
664        debug!("{:?}", certificate);
665        self.maybe_auto_assign_preferred_owner(&mut chain_client, &ownership)
666            .await?;
667        Ok(())
668    }
669
670    pub async fn set_preferred_owner(
671        &mut self,
672        chain_id: Option<ChainId>,
673        preferred_owner: AccountOwner,
674    ) -> Result<(), Error> {
675        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
676        let mut chain_client = self.make_chain_client(chain_id).await?;
677        let old_owner = chain_client.preferred_owner();
678        info!(%chain_id, ?old_owner, %preferred_owner, "Changing preferred owner for chain");
679        chain_client.set_preferred_owner(preferred_owner);
680        self.update_wallet_from_client(&chain_client).await?;
681        info!("New preferred owner set");
682        Ok(())
683    }
684
685    pub async fn check_compatible_version_info(
686        &self,
687        address: &str,
688        node: &impl ValidatorNode,
689    ) -> Result<VersionInfo, Error> {
690        match node.get_version_info().await {
691            Ok(version_info) if version_info.is_compatible_with(&linera_version::VERSION_INFO) => {
692                debug!(
693                    "Version information for validator {address}: {}",
694                    version_info
695                );
696                Ok(version_info)
697            }
698            Ok(version_info) => Err(error::Inner::UnexpectedVersionInfo {
699                remote: Box::new(version_info),
700                local: Box::new(linera_version::VERSION_INFO.clone()),
701            }
702            .into()),
703            Err(error) => Err(error::Inner::UnavailableVersionInfo {
704                address: address.to_string(),
705                error: Box::new(error),
706            }
707            .into()),
708        }
709    }
710
711    pub async fn check_matching_network_description(
712        &self,
713        address: &str,
714        node: &impl ValidatorNode,
715    ) -> Result<CryptoHash, Error> {
716        let network_description = self.genesis_config.network_description();
717        match node.get_network_description().await {
718            Ok(description) => {
719                if description == network_description {
720                    Ok(description.genesis_config_hash)
721                } else {
722                    Err(error::Inner::UnexpectedNetworkDescription {
723                        remote: Box::new(description),
724                        local: Box::new(network_description),
725                    }
726                    .into())
727                }
728            }
729            Err(error) => Err(error::Inner::UnavailableNetworkDescription {
730                address: address.to_string(),
731                error: Box::new(error),
732            }
733            .into()),
734        }
735    }
736
737    pub async fn check_validator_chain_info_response(
738        &self,
739        public_key: Option<&ValidatorPublicKey>,
740        address: &str,
741        node: &impl ValidatorNode,
742        chain_id: ChainId,
743    ) -> Result<ChainInfo, Error> {
744        let query = ChainInfoQuery::new(chain_id).with_manager_values();
745        match node.handle_chain_info_query(query).await {
746            Ok(response) => {
747                debug!(
748                    "Validator {address} sees chain {chain_id} at block height {} and epoch {:?}",
749                    response.info.next_block_height, response.info.epoch,
750                );
751                if let Some(public_key) = public_key {
752                    if response.check(*public_key).is_ok() {
753                        debug!("Signature for public key {public_key} is OK.");
754                    } else {
755                        return Err(error::Inner::InvalidSignature {
756                            public_key: *public_key,
757                        }
758                        .into());
759                    }
760                } else {
761                    warn!("Not checking signature as public key was not given");
762                }
763                Ok(*response.info)
764            }
765            Err(error) => Err(error::Inner::UnavailableChainInfo {
766                address: address.to_string(),
767                chain_id,
768                error: Box::new(error),
769            }
770            .into()),
771        }
772    }
773
774    /// Query a validator for version info, network description, and chain info.
775    ///
776    /// Returns a `ValidatorQueryResults` struct with the results of all three queries.
777    pub async fn query_validator(
778        &self,
779        address: &str,
780        node: &impl ValidatorNode,
781        chain_id: ChainId,
782        public_key: Option<&ValidatorPublicKey>,
783    ) -> ValidatorQueryResults {
784        let version_info = self.check_compatible_version_info(address, node).await;
785        let genesis_config_hash = self.check_matching_network_description(address, node).await;
786        let chain_info = self
787            .check_validator_chain_info_response(public_key, address, node, chain_id)
788            .await;
789
790        ValidatorQueryResults {
791            version_info,
792            genesis_config_hash,
793            chain_info,
794        }
795    }
796
797    /// Query the local node for version info, network description, and chain info.
798    ///
799    /// Returns a `ValidatorQueryResults` struct with the local node's information.
800    pub async fn query_local_node(
801        &self,
802        chain_id: ChainId,
803    ) -> Result<ValidatorQueryResults, Error> {
804        let version_info = Ok(linera_version::VERSION_INFO.clone());
805        let genesis_config_hash = Ok(self
806            .genesis_config
807            .network_description()
808            .genesis_config_hash);
809        let chain_info = self
810            .make_chain_client(chain_id)
811            .await?
812            .chain_info_with_manager_values()
813            .await
814            .map(|info| *info)
815            .map_err(|e| e.into());
816
817        Ok(ValidatorQueryResults {
818            version_info,
819            genesis_config_hash,
820            chain_info,
821        })
822    }
823}
824
825#[cfg(feature = "fs")]
826impl<Env: Environment> ClientContext<Env> {
827    pub async fn publish_module(
828        &mut self,
829        chain_client: &ChainClient<Env>,
830        contract: PathBuf,
831        service: PathBuf,
832        vm_runtime: VmRuntime,
833        formats: Option<PathBuf>,
834    ) -> Result<ModuleId, Error> {
835        info!("Loading bytecode files");
836        let contract_bytecode = Bytecode::load_from_file(&contract).await.map_err(|e| {
837            std::io::Error::new(
838                e.kind(),
839                format!("failed to load contract bytecode from {contract:?}: {e}"),
840            )
841        })?;
842        let service_bytecode = Bytecode::load_from_file(&service).await.map_err(|e| {
843            std::io::Error::new(
844                e.kind(),
845                format!("failed to load service bytecode from {service:?}: {e}"),
846            )
847        })?;
848
849        let formats_bytes = match formats {
850            Some(path) => Some(load_formats_from_snap(&path)?),
851            None => None,
852        };
853
854        info!("Publishing module");
855        let (blobs, module_id) = create_bytecode_blobs(
856            contract_bytecode,
857            service_bytecode,
858            vm_runtime,
859            formats_bytes,
860        )
861        .await;
862        let (module_id, _) = self
863            .apply_client_command(chain_client, |chain_client| {
864                let blobs = blobs.clone();
865                let chain_client = chain_client.clone();
866                async move {
867                    chain_client
868                        .publish_module_blobs(blobs, module_id)
869                        .await
870                        .context("Failed to publish module")
871                }
872            })
873            .await?;
874
875        info!("{}", "Module published successfully!");
876
877        info!("Synchronizing client and processing inbox");
878        self.process_inbox(chain_client).await?;
879        Ok(module_id)
880    }
881
882    pub async fn publish_data_blob(
883        &mut self,
884        chain_client: &ChainClient<Env>,
885        blob_path: PathBuf,
886    ) -> Result<CryptoHash, Error> {
887        info!("Loading data blob file");
888        let blob_bytes = fs::read(&blob_path).map_err(|e| {
889            std::io::Error::new(
890                e.kind(),
891                format!("failed to load data blob bytes from {blob_path:?}: {e}"),
892            )
893        })?;
894
895        info!("Publishing data blob");
896        self.apply_client_command(chain_client, |chain_client| {
897            let blob_bytes = blob_bytes.clone();
898            let chain_client = chain_client.clone();
899            async move {
900                chain_client
901                    .publish_data_blob(blob_bytes)
902                    .await
903                    .context("Failed to publish data blob")
904            }
905        })
906        .await?;
907
908        info!("{}", "Data blob published successfully!");
909        Ok(CryptoHash::new(&BlobContent::new_data(blob_bytes)))
910    }
911
912    // TODO(#2490): Consider removing or renaming this.
913    pub async fn read_data_blob(
914        &mut self,
915        chain_client: &ChainClient<Env>,
916        hash: CryptoHash,
917    ) -> Result<(), Error> {
918        info!("Verifying data blob");
919        self.apply_client_command(chain_client, |chain_client| {
920            let chain_client = chain_client.clone();
921            async move {
922                chain_client
923                    .read_data_blob(hash)
924                    .await
925                    .context("Failed to verify data blob")
926            }
927        })
928        .await?;
929
930        info!("{}", "Data blob verified successfully!");
931        Ok(())
932    }
933}
934
935/// Reads an insta SNAP file containing a YAML-encoded `Formats` value, parses
936/// it, and returns the canonical JSON encoding to publish as the application
937/// formats blob.
938#[cfg(feature = "fs")]
939fn load_formats_from_snap(path: &std::path::Path) -> Result<Vec<u8>, Error> {
940    let content = fs::read_to_string(path).map_err(|e| {
941        std::io::Error::new(e.kind(), format!("failed to read SNAP file {path:?}: {e}"))
942    })?;
943    let body = strip_snap_frontmatter(&content).ok_or_else(|| {
944        std::io::Error::new(
945            std::io::ErrorKind::InvalidData,
946            format!("SNAP file {path:?} is missing the `---` frontmatter delimiters"),
947        )
948    })?;
949    let formats: linera_sdk::formats::Formats = serde_yaml_08::from_str(body).map_err(|e| {
950        std::io::Error::new(
951            std::io::ErrorKind::InvalidData,
952            format!("failed to parse SNAP body in {path:?} as Formats: {e}"),
953        )
954    })?;
955    serde_json::to_vec(&formats).map_err(|e| {
956        std::io::Error::new(
957            std::io::ErrorKind::InvalidData,
958            format!("failed to serialize Formats as JSON: {e}"),
959        )
960        .into()
961    })
962}
963
964#[cfg(feature = "fs")]
965fn strip_snap_frontmatter(content: &str) -> Option<&str> {
966    let rest = content.strip_prefix("---\n")?;
967    let end = rest.find("\n---\n")?;
968    Some(&rest[end + "\n---\n".len()..])
969}
970
971#[cfg(not(web))]
972impl<Env: Environment> ClientContext<Env> {
973    pub async fn prepare_for_benchmark(
974        &mut self,
975        num_chains: usize,
976        tokens_per_chain: Amount,
977        fungible_application_id: Option<ApplicationId>,
978        pub_keys: Vec<AccountPublicKey>,
979        chains_config_path: Option<&Path>,
980        close_chains: bool,
981    ) -> Result<Vec<ChainClient<Env>>, Error> {
982        let start = Instant::now();
983        // Below all block proposals are supposed to succeed without retries, we
984        // must make sure that all incoming payments have been accepted on-chain
985        // and that no validator is missing user certificates.
986        self.process_inboxes_and_force_validator_updates().await;
987        info!(
988            "Processed inboxes and forced validator updates in {} ms",
989            start.elapsed().as_millis()
990        );
991
992        let start = Instant::now();
993        let (benchmark_chains, chain_clients) = self
994            .make_benchmark_chains(
995                num_chains,
996                tokens_per_chain,
997                pub_keys,
998                chains_config_path.is_some(),
999                close_chains,
1000            )
1001            .await?;
1002        info!(
1003            "Got {} chains in {} ms",
1004            num_chains,
1005            start.elapsed().as_millis()
1006        );
1007
1008        if let Some(id) = fungible_application_id {
1009            let start = Instant::now();
1010            self.supply_fungible_tokens(&benchmark_chains, id).await?;
1011            info!(
1012                "Supplied fungible tokens in {} ms",
1013                start.elapsed().as_millis()
1014            );
1015            // Need to process inboxes to make sure the chains receive the supplied tokens.
1016            let start = Instant::now();
1017            for chain_client in &chain_clients {
1018                chain_client.process_inbox().await?;
1019            }
1020            info!(
1021                "Processed inboxes after supplying fungible tokens in {} ms",
1022                start.elapsed().as_millis()
1023            );
1024        }
1025
1026        let all_chains = Benchmark::<Env>::get_all_chains(chains_config_path, &benchmark_chains)?;
1027        let known_chain_ids: HashSet<_> = benchmark_chains.iter().map(|(id, _)| *id).collect();
1028        let unknown_chain_ids: Vec<_> = all_chains
1029            .iter()
1030            .filter(|id| !known_chain_ids.contains(id))
1031            .copied()
1032            .collect();
1033        if !unknown_chain_ids.is_empty() {
1034            // The current client won't have the blobs for the chains in the other wallets. Even
1035            // though it will eventually get those blobs, we're getting a head start here and
1036            // fetching those blobs in advance.
1037            for chain_id in &unknown_chain_ids {
1038                self.client.get_chain_description(*chain_id).await?;
1039            }
1040        }
1041
1042        Ok(chain_clients)
1043    }
1044
1045    pub async fn wrap_up_benchmark(
1046        &mut self,
1047        chain_clients: Vec<ChainClient<Env>>,
1048        close_chains: bool,
1049        wrap_up_max_in_flight: usize,
1050    ) -> Result<(), Error> {
1051        if close_chains {
1052            info!("Closing chains...");
1053            let stream = stream::iter(chain_clients)
1054                .map(|chain_client| async move {
1055                    Benchmark::<Env>::close_benchmark_chain(&chain_client).await?;
1056                    info!("Closed chain {:?}", chain_client.chain_id());
1057                    Ok::<(), BenchmarkError>(())
1058                })
1059                .buffer_unordered(wrap_up_max_in_flight);
1060            stream.try_collect::<Vec<_>>().await?;
1061        } else {
1062            info!("Processing inbox for all chains...");
1063            let stream = stream::iter(chain_clients.clone())
1064                .map(|chain_client| async move {
1065                    chain_client.process_inbox().await?;
1066                    info!("Processed inbox for chain {:?}", chain_client.chain_id());
1067                    Ok::<(), chain_client::Error>(())
1068                })
1069                .buffer_unordered(wrap_up_max_in_flight);
1070            stream.try_collect::<Vec<_>>().await?;
1071
1072            info!("Updating wallet from chain clients...");
1073            for chain_client in chain_clients {
1074                let info = chain_client.chain_info().await?;
1075                let client_owner = chain_client.preferred_owner();
1076                let pending_fast_proposal = chain_client
1077                    .pending_proposal()
1078                    .await
1079                    .filter(|p| p.round.is_some_and(|r| r.is_fast()));
1080                self.wallet()
1081                    .insert(
1082                        info.chain_id,
1083                        wallet::Chain {
1084                            pending_fast_proposal,
1085                            owner: client_owner,
1086                            ..info.as_ref().into()
1087                        },
1088                    )
1089                    .await
1090                    .map_err(error::Inner::wallet)?;
1091            }
1092        }
1093
1094        Ok(())
1095    }
1096
1097    async fn process_inboxes_and_force_validator_updates(&mut self) {
1098        let mut join_set = task::JoinSet::new();
1099
1100        let chain_clients: Vec<_> = self
1101            .wallet()
1102            .owned_chain_ids()
1103            .map_err(|e| error::Inner::wallet(e).into())
1104            .and_then(|id| self.make_chain_client(id))
1105            .try_collect()
1106            .await
1107            .unwrap();
1108
1109        for chain_client in chain_clients {
1110            join_set.spawn(async move {
1111                Self::process_inbox_without_updating_wallet(&chain_client)
1112                    .await
1113                    .expect("Processing inbox should not fail!");
1114                chain_client
1115            });
1116        }
1117
1118        for chain_client in join_set.join_all().await {
1119            self.update_wallet_from_client(&chain_client).await.unwrap();
1120        }
1121    }
1122
1123    async fn process_inbox_without_updating_wallet(
1124        chain_client: &ChainClient<Env>,
1125    ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
1126        // Try processing the inbox optimistically without waiting for validator notifications.
1127        chain_client.synchronize_from_validators().await?;
1128        let (certificates, maybe_timeout) = chain_client.process_inbox_without_prepare().await?;
1129        assert!(
1130            maybe_timeout.is_none(),
1131            "Should not timeout within benchmark!"
1132        );
1133
1134        Ok(certificates)
1135    }
1136
1137    /// Creates chains if necessary, and returns a map of exactly `num_chains` chain IDs
1138    /// with key pairs, as well as a map of the chain clients.
1139    ///
1140    /// If `close_chains` is true, chains are not looked up from or stored in the wallet,
1141    /// since they will be closed after the benchmark and shouldn't be reused.
1142    async fn make_benchmark_chains(
1143        &mut self,
1144        num_chains: usize,
1145        balance: Amount,
1146        pub_keys: Vec<AccountPublicKey>,
1147        wallet_only: bool,
1148        close_chains: bool,
1149    ) -> Result<(Vec<(ChainId, AccountOwner)>, Vec<ChainClient<Env>>), Error> {
1150        let mut chains_found_in_wallet = 0;
1151        let mut benchmark_chains = Vec::with_capacity(num_chains);
1152        let mut chain_clients = Vec::with_capacity(num_chains);
1153        let start = Instant::now();
1154
1155        // When close_chains is true and we're creating our own chains (not wallet_only),
1156        // skip wallet lookup to avoid picking up existing chains that would then be closed.
1157        // When wallet_only is true, chains were pre-created by the parent process and must
1158        // be read from the wallet.
1159        if !close_chains || wallet_only {
1160            let mut owned_chain_ids = std::pin::pin!(self.wallet().owned_chain_ids());
1161            while let Some(chain_id) = owned_chain_ids.next().await {
1162                let chain_id = chain_id.map_err(error::Inner::wallet)?;
1163                if chains_found_in_wallet == num_chains {
1164                    break;
1165                }
1166                let chain_client = self.make_chain_client(chain_id).await?;
1167                let ownership = chain_client.chain_info().await?.manager.ownership;
1168                if !ownership.owners.is_empty() || ownership.super_owners.len() != 1 {
1169                    continue;
1170                }
1171                let owner = *ownership.super_owners.first().unwrap();
1172                chain_client.process_inbox().await?;
1173                benchmark_chains.push((chain_id, owner));
1174                chain_clients.push(chain_client);
1175                chains_found_in_wallet += 1;
1176            }
1177            info!(
1178                "Got {} chains from the wallet in {} ms",
1179                benchmark_chains.len(),
1180                start.elapsed().as_millis()
1181            );
1182        }
1183
1184        let num_chains_to_create = num_chains - chains_found_in_wallet;
1185
1186        let default_chain_client = self.make_chain_client(self.default_chain()).await?;
1187
1188        if num_chains_to_create > 0 {
1189            if wallet_only {
1190                return Err(
1191                    error::Inner::Benchmark(BenchmarkError::NotEnoughChainsInWallet(
1192                        num_chains,
1193                        chains_found_in_wallet,
1194                    ))
1195                    .into(),
1196                );
1197            }
1198            let mut pub_keys_iter = pub_keys.into_iter().take(num_chains_to_create);
1199            let operations_per_block = 900; // Over this we seem to hit the block size limits.
1200            for i in (0..num_chains_to_create).step_by(operations_per_block) {
1201                let num_new_chains = operations_per_block.min(num_chains_to_create - i);
1202                // Each chain gets its own unique owner (previously all chains in a batch
1203                // shared one owner, which could cause conflicts during benchmarking).
1204                let owners: Vec<AccountOwner> = (&mut pub_keys_iter)
1205                    .take(num_new_chains)
1206                    .map(|pk| pk.into())
1207                    .collect();
1208
1209                let certificate = Self::execute_open_chains_operations(
1210                    &default_chain_client,
1211                    balance,
1212                    owners.clone(),
1213                )
1214                .await?;
1215                info!("Block executed successfully");
1216
1217                let block = certificate.block();
1218                for (i, owner) in owners.into_iter().enumerate() {
1219                    let chain_id = block.body.blobs[i]
1220                        .iter()
1221                        .find(|blob| blob.id().blob_type == BlobType::ChainDescription)
1222                        .map(|blob| ChainId(blob.id().hash))
1223                        .expect("failed to create a new chain");
1224                    self.client
1225                        .extend_chain_mode(chain_id, ListeningMode::FullChain);
1226
1227                    let mut chain_client = self.client.create_chain_client(
1228                        chain_id,
1229                        None,
1230                        BlockHeight::ZERO,
1231                        &None,
1232                        Some(owner),
1233                        self.timing_sender(),
1234                        false,
1235                    );
1236                    chain_client.set_preferred_owner(owner);
1237                    chain_client.process_inbox().await?;
1238                    benchmark_chains.push((chain_id, owner));
1239                    chain_clients.push(chain_client);
1240                }
1241            }
1242
1243            info!(
1244                "Created {} chains in {} ms",
1245                num_chains_to_create,
1246                start.elapsed().as_millis()
1247            );
1248        }
1249
1250        // Only update wallet if chains will be reused (not closed after benchmark)
1251        if !close_chains {
1252            info!("Updating wallet from client");
1253            self.update_wallet_from_client(&default_chain_client)
1254                .await?;
1255        }
1256        info!("Retrying pending outgoing messages");
1257        default_chain_client
1258            .retry_pending_outgoing_messages()
1259            .await
1260            .context("outgoing messages to create the new chains should be delivered")?;
1261        info!("Processing default chain inbox");
1262        default_chain_client.process_inbox().await?;
1263
1264        assert_eq!(
1265            benchmark_chains.len(),
1266            chain_clients.len(),
1267            "benchmark_chains and chain_clients must have the same size"
1268        );
1269
1270        Ok((benchmark_chains, chain_clients))
1271    }
1272
1273    async fn execute_open_chains_operations(
1274        chain_client: &ChainClient<Env>,
1275        balance: Amount,
1276        owners: Vec<AccountOwner>,
1277    ) -> Result<ConfirmedBlockCertificate, Error> {
1278        let operations: Vec<_> = owners
1279            .iter()
1280            .map(|owner| {
1281                let config = OpenChainConfig {
1282                    ownership: ChainOwnership::single_super(*owner),
1283                    balance,
1284                    application_permissions: Default::default(),
1285                };
1286                Operation::system(SystemOperation::OpenChain(config))
1287            })
1288            .collect();
1289        info!("Executing {} OpenChain operations", operations.len());
1290        Ok(chain_client
1291            .execute_operations(operations, vec![])
1292            .await?
1293            .expect("should execute block with OpenChain operations"))
1294    }
1295
1296    /// Supplies fungible tokens to the chains.
1297    async fn supply_fungible_tokens(
1298        &mut self,
1299        key_pairs: &[(ChainId, AccountOwner)],
1300        application_id: ApplicationId,
1301    ) -> Result<(), Error> {
1302        let default_chain_id = self.default_chain();
1303        let default_key = self
1304            .wallet()
1305            .get(default_chain_id)
1306            .await
1307            .unwrap()
1308            .unwrap()
1309            .owner
1310            .unwrap();
1311        // This should be enough to run the benchmark at 1M TPS for an hour.
1312        let amount = Amount::from_nanos(4);
1313        let operations: Vec<Operation> = key_pairs
1314            .iter()
1315            .map(|(chain_id, owner)| {
1316                fungible_transfer(application_id, *chain_id, default_key, *owner, amount)
1317            })
1318            .collect();
1319        let chain_client = self.make_chain_client(default_chain_id).await?;
1320        // Put at most 1000 fungible token operations in each block.
1321        for operation_chunk in operations.chunks(1000) {
1322            chain_client
1323                .execute_operations(operation_chunk.to_vec(), vec![])
1324                .await?
1325                .expect("should execute block with Transfer operations");
1326        }
1327        self.update_wallet_from_client(&chain_client).await?;
1328
1329        Ok(())
1330    }
1331}