linera_client/
client_context.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use futures::{Future, StreamExt as _, TryStreamExt as _};
7use linera_base::{
8    crypto::{CryptoHash, ValidatorPublicKey},
9    data_types::{Epoch, Timestamp},
10    identifiers::{Account, AccountOwner, ChainId},
11    ownership::ChainOwnership,
12    time::{Duration, Instant},
13    util::future::FutureSyncExt as _,
14};
15use linera_chain::{manager::LockingBlock, types::ConfirmedBlockCertificate};
16use linera_core::{
17    client::{chain_client, ChainClient, Client, ListeningMode},
18    data_types::{ChainInfo, ChainInfoQuery, ClientOutcome},
19    join_set_ext::JoinSet,
20    node::ValidatorNode,
21    wallet, Environment, JoinSetExt as _, Wallet as _,
22};
23use linera_rpc::node_provider::{NodeOptions, NodeProvider};
24use linera_version::VersionInfo;
25use thiserror_context::Context;
26use tracing::{debug, info, warn};
27#[cfg(not(web))]
28use {
29    crate::{
30        benchmark::{Benchmark, BenchmarkError},
31        client_metrics::ClientMetrics,
32    },
33    futures::stream,
34    linera_base::{
35        crypto::AccountPublicKey,
36        data_types::{Amount, BlockHeight},
37        identifiers::{ApplicationId, BlobType},
38    },
39    linera_execution::{
40        system::{OpenChainConfig, SystemOperation},
41        Operation,
42    },
43    std::{collections::HashSet, iter, path::Path},
44    tokio::{sync::mpsc, task},
45};
46#[cfg(feature = "fs")]
47use {
48    linera_base::{
49        data_types::{BlobContent, Bytecode},
50        identifiers::ModuleId,
51        vm::VmRuntime,
52    },
53    linera_core::client::create_bytecode_blobs,
54    std::{fs, path::PathBuf},
55};
56
57use crate::{
58    chain_listener::{self, ClientContext as _},
59    client_options::{ChainOwnershipConfig, Options},
60    config::GenesisConfig,
61    error, util, Error,
62};
63
64/// Results from querying a validator about version, network description, and chain info.
65pub struct ValidatorQueryResults {
66    /// The validator's version information.
67    pub version_info: Result<VersionInfo, Error>,
68    /// The validator's genesis config hash.
69    pub genesis_config_hash: Result<CryptoHash, Error>,
70    /// The validator's chain info (if valid and signature check passed).
71    pub chain_info: Result<ChainInfo, Error>,
72}
73
74impl ValidatorQueryResults {
75    /// Returns a vector of references to all errors in the query results.
76    pub fn errors(&self) -> Vec<&Error> {
77        let mut errors = Vec::new();
78        if let Err(e) = &self.version_info {
79            errors.push(e);
80        }
81        if let Err(e) = &self.genesis_config_hash {
82            errors.push(e);
83        }
84        if let Err(e) = &self.chain_info {
85            errors.push(e);
86        }
87        errors
88    }
89
90    /// Prints validator information to stdout.
91    ///
92    /// Prints public key, address, and optionally weight, version info, and chain info.
93    /// If `reference` is provided, only prints fields that differ from the reference.
94    pub fn print(
95        &self,
96        public_key: Option<&ValidatorPublicKey>,
97        address: Option<&str>,
98        weight: Option<u64>,
99        reference: Option<&ValidatorQueryResults>,
100    ) {
101        if let Some(key) = public_key {
102            println!("Public key: {}", key);
103        }
104        if let Some(address) = address {
105            println!("Address: {}", address);
106        }
107        if let Some(w) = weight {
108            println!("Weight: {}", w);
109        }
110
111        let ref_version = reference.and_then(|ref_results| ref_results.version_info.as_ref().ok());
112        match &self.version_info {
113            Ok(version_info) => {
114                if ref_version.is_none_or(|ref_v| ref_v.crate_version != version_info.crate_version)
115                {
116                    println!("Linera protocol: v{}", version_info.crate_version);
117                }
118                if ref_version.is_none_or(|ref_v| ref_v.rpc_hash != version_info.rpc_hash) {
119                    println!("RPC API hash: {}", version_info.rpc_hash);
120                }
121                if ref_version.is_none_or(|ref_v| ref_v.graphql_hash != version_info.graphql_hash) {
122                    println!("GraphQL API hash: {}", version_info.graphql_hash);
123                }
124                if ref_version.is_none_or(|ref_v| ref_v.wit_hash != version_info.wit_hash) {
125                    println!("WIT API hash: {}", version_info.wit_hash);
126                }
127                if ref_version.is_none_or(|ref_v| {
128                    (&ref_v.git_commit, ref_v.git_dirty)
129                        != (&version_info.git_commit, version_info.git_dirty)
130                }) {
131                    println!(
132                        "Source code: {}/tree/{}{}",
133                        env!("CARGO_PKG_REPOSITORY"),
134                        version_info.git_commit,
135                        if version_info.git_dirty {
136                            " (dirty)"
137                        } else {
138                            ""
139                        }
140                    );
141                }
142            }
143            Err(err) => println!("Error getting version info: {err}"),
144        }
145
146        let ref_genesis_hash =
147            reference.and_then(|ref_results| ref_results.genesis_config_hash.as_ref().ok());
148        match &self.genesis_config_hash {
149            Ok(hash) if ref_genesis_hash.is_some_and(|ref_hash| ref_hash == hash) => {}
150            Ok(hash) => println!("Genesis config hash: {hash}"),
151            Err(err) => println!("Error getting genesis config: {err}"),
152        }
153
154        let ref_info = reference.and_then(|ref_results| ref_results.chain_info.as_ref().ok());
155        match &self.chain_info {
156            Ok(info) => {
157                if ref_info.is_none_or(|ref_info| info.block_hash != ref_info.block_hash) {
158                    if let Some(hash) = info.block_hash {
159                        println!("Block hash: {}", hash);
160                    } else {
161                        println!("Block hash: None");
162                    }
163                }
164                if ref_info
165                    .is_none_or(|ref_info| info.next_block_height != ref_info.next_block_height)
166                {
167                    println!("Next height: {}", info.next_block_height);
168                }
169                if ref_info.is_none_or(|ref_info| info.timestamp != ref_info.timestamp) {
170                    println!("Timestamp: {}", info.timestamp);
171                }
172                if ref_info.is_none_or(|ref_info| info.epoch != ref_info.epoch) {
173                    println!("Epoch: {}", info.epoch);
174                }
175                if ref_info.is_none_or(|ref_info| {
176                    info.manager.current_round != ref_info.manager.current_round
177                }) {
178                    println!("Round: {}", info.manager.current_round);
179                }
180                if let Some(locking) = &info.manager.requested_locking {
181                    match &**locking {
182                        LockingBlock::Fast(proposal) => {
183                            println!(
184                                "Locking fast block from {}",
185                                proposal.content.block.timestamp
186                            );
187                        }
188                        LockingBlock::Regular(validated) => {
189                            println!(
190                                "Locking block {} in {} from {}",
191                                validated.hash(),
192                                validated.round,
193                                validated.block().header.timestamp
194                            );
195                        }
196                    }
197                }
198            }
199            Err(err) => println!("Error getting chain info: {err}"),
200        }
201    }
202}
203
204pub struct ClientContext<Env: Environment> {
205    pub client: Arc<Client<Env>>,
206    // TODO(#5083): this doesn't really need to be stored
207    pub genesis_config: crate::config::GenesisConfig,
208    pub send_timeout: Duration,
209    pub recv_timeout: Duration,
210    pub retry_delay: Duration,
211    pub max_retries: u32,
212    pub chain_listeners: JoinSet,
213    // TODO(#5082): move this into the upstream UI layers (maybe just the CLI)
214    pub default_chain: Option<ChainId>,
215    #[cfg(not(web))]
216    pub client_metrics: Option<ClientMetrics>,
217}
218
219impl<Env: Environment> chain_listener::ClientContext for ClientContext<Env> {
220    type Environment = Env;
221
222    fn wallet(&self) -> &Env::Wallet {
223        self.client.wallet()
224    }
225
226    fn storage(&self) -> &Env::Storage {
227        self.client.storage_client()
228    }
229
230    fn client(&self) -> &Arc<Client<Env>> {
231        &self.client
232    }
233
234    #[cfg(not(web))]
235    fn timing_sender(
236        &self,
237    ) -> Option<mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
238        self.client_metrics
239            .as_ref()
240            .map(|metrics| metrics.timing_sender.clone())
241    }
242
243    async fn update_wallet_for_new_chain(
244        &mut self,
245        chain_id: ChainId,
246        owner: Option<AccountOwner>,
247        timestamp: Timestamp,
248        epoch: Epoch,
249    ) -> Result<(), Error> {
250        self.update_wallet_for_new_chain(chain_id, owner, timestamp, epoch)
251            .make_sync()
252            .await
253    }
254
255    async fn update_wallet(&mut self, client: &ChainClient<Env>) -> Result<(), Error> {
256        self.update_wallet_from_client(client).make_sync().await
257    }
258}
259
260impl<S, Si, W> ClientContext<linera_core::environment::Impl<S, NodeProvider, Si, W>>
261where
262    S: linera_core::environment::Storage,
263    Si: linera_core::environment::Signer,
264    W: linera_core::environment::Wallet,
265{
266    // not worth refactoring this because
267    // https://github.com/linera-io/linera-protocol/issues/5082
268    // https://github.com/linera-io/linera-protocol/issues/5083
269    #[allow(clippy::too_many_arguments)]
270    pub async fn new(
271        storage: S,
272        wallet: W,
273        signer: Si,
274        options: &Options,
275        default_chain: Option<ChainId>,
276        genesis_config: GenesisConfig,
277        block_cache_size: usize,
278        execution_state_cache_size: usize,
279    ) -> Result<Self, Error> {
280        #[cfg(not(web))]
281        let timing_config = options.to_timing_config();
282        let node_provider = NodeProvider::new(NodeOptions {
283            send_timeout: options.send_timeout,
284            recv_timeout: options.recv_timeout,
285            retry_delay: options.retry_delay,
286            max_retries: options.max_retries,
287        });
288        let chain_modes: Vec<_> = wallet
289            .items()
290            .map_ok(|(id, chain)| {
291                let mode = if chain.is_follow_only() {
292                    ListeningMode::FollowChain
293                } else {
294                    ListeningMode::FullChain
295                };
296                (id, mode)
297            })
298            .try_collect()
299            .await
300            .map_err(error::Inner::wallet)?;
301        let name = match chain_modes.len() {
302            0 => "Client node".to_string(),
303            1 => format!("Client node for {:.8}", chain_modes[0].0),
304            n => format!(
305                "Client node for {:.8} and {} others",
306                chain_modes[0].0,
307                n - 1
308            ),
309        };
310
311        let client = Client::new(
312            linera_core::environment::Impl {
313                network: node_provider,
314                storage,
315                signer,
316                wallet,
317            },
318            genesis_config.admin_chain_id(),
319            options.long_lived_services,
320            chain_modes,
321            name,
322            options.chain_worker_ttl,
323            options.sender_chain_worker_ttl,
324            options.to_chain_client_options(),
325            block_cache_size,
326            execution_state_cache_size,
327            options.to_requests_scheduler_config(),
328        );
329
330        #[cfg(not(web))]
331        let client_metrics = if timing_config.enabled {
332            Some(ClientMetrics::new(timing_config))
333        } else {
334            None
335        };
336
337        Ok(ClientContext {
338            client: Arc::new(client),
339            default_chain,
340            genesis_config,
341            send_timeout: options.send_timeout,
342            recv_timeout: options.recv_timeout,
343            retry_delay: options.retry_delay,
344            max_retries: options.max_retries,
345            chain_listeners: JoinSet::default(),
346            #[cfg(not(web))]
347            client_metrics,
348        })
349    }
350}
351
352impl<Env: Environment> ClientContext<Env> {
353    // TODO(#5084) this (and other injected dependencies) should not be re-exposed by the
354    // client interface
355    /// Returns a reference to the wallet.
356    pub fn wallet(&self) -> &Env::Wallet {
357        self.client.wallet()
358    }
359
360    /// Returns the ID of the admin chain.
361    pub fn admin_chain_id(&self) -> ChainId {
362        self.client.admin_chain_id()
363    }
364
365    /// Retrieve the default account. Current this is the common account of the default
366    /// chain.
367    pub fn default_account(&self) -> Account {
368        Account::chain(self.default_chain())
369    }
370
371    /// Retrieve the default chain.
372    pub fn default_chain(&self) -> ChainId {
373        self.default_chain
374            .expect("default chain requested but none set")
375    }
376
377    pub async fn first_non_admin_chain(&self) -> Result<ChainId, Error> {
378        let admin_chain_id = self.admin_chain_id();
379        std::pin::pin!(self
380            .wallet()
381            .chain_ids()
382            .try_filter(|chain_id| futures::future::ready(*chain_id != admin_chain_id)))
383        .next()
384        .await
385        .expect("No non-admin chain specified in wallet with no non-admin chain")
386        .map_err(Error::wallet)
387    }
388
389    // TODO(#5084) this should match the `NodeProvider` from the `Environment`
390    pub fn make_node_provider(&self) -> NodeProvider {
391        NodeProvider::new(self.make_node_options())
392    }
393
394    fn make_node_options(&self) -> NodeOptions {
395        NodeOptions {
396            send_timeout: self.send_timeout,
397            recv_timeout: self.recv_timeout,
398            retry_delay: self.retry_delay,
399            max_retries: self.max_retries,
400        }
401    }
402
403    #[cfg(not(web))]
404    pub fn client_metrics(&self) -> Option<&ClientMetrics> {
405        self.client_metrics.as_ref()
406    }
407
408    pub async fn update_wallet_from_client<Env_: Environment>(
409        &self,
410        client: &ChainClient<Env_>,
411    ) -> Result<(), Error> {
412        let info = client.chain_info().await?;
413        let chain_id = info.chain_id;
414        let new_chain = wallet::Chain {
415            pending_proposal: client.pending_proposal().clone(),
416            owner: client.preferred_owner(),
417            ..info.as_ref().into()
418        };
419
420        self.wallet()
421            .insert(chain_id, new_chain)
422            .await
423            .map_err(error::Inner::wallet)?;
424
425        Ok(())
426    }
427
428    /// Remembers the new chain and its owner (if any) in the wallet.
429    pub async fn update_wallet_for_new_chain(
430        &mut self,
431        chain_id: ChainId,
432        owner: Option<AccountOwner>,
433        timestamp: Timestamp,
434        epoch: Epoch,
435    ) -> Result<(), Error> {
436        self.wallet()
437            .try_insert(
438                chain_id,
439                linera_core::wallet::Chain::new(owner, epoch, timestamp),
440            )
441            .await
442            .map_err(error::Inner::wallet)?;
443        Ok(())
444    }
445
446    pub async fn process_inbox(
447        &mut self,
448        chain_client: &ChainClient<Env>,
449    ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
450        let mut certificates = Vec::new();
451        // Try processing the inbox optimistically without waiting for validator notifications.
452        let (new_certificates, maybe_timeout) = {
453            chain_client.synchronize_from_validators().await?;
454            let result = chain_client.process_inbox_without_prepare().await;
455            self.update_wallet_from_client(chain_client).await?;
456            result?
457        };
458        certificates.extend(new_certificates);
459        if maybe_timeout.is_none() {
460            return Ok(certificates);
461        }
462
463        // Start listening for notifications, so we learn about new rounds and blocks.
464        let (listener, _listen_handle, mut notification_stream) = chain_client.listen().await?;
465        self.chain_listeners.spawn_task(listener);
466
467        loop {
468            let (new_certificates, maybe_timeout) = {
469                let result = chain_client.process_inbox().await;
470                self.update_wallet_from_client(chain_client).await?;
471                result?
472            };
473            certificates.extend(new_certificates);
474            if let Some(timestamp) = maybe_timeout {
475                util::wait_for_next_round(&mut notification_stream, timestamp).await
476            } else {
477                return Ok(certificates);
478            }
479        }
480    }
481
482    pub async fn assign_new_chain_to_key(
483        &mut self,
484        chain_id: ChainId,
485        owner: AccountOwner,
486    ) -> Result<(), Error> {
487        self.client
488            .extend_chain_mode(chain_id, ListeningMode::FullChain);
489        let client = self.make_chain_client(chain_id).await?;
490        let chain_description = client.get_chain_description().await?;
491        let config = chain_description.config();
492
493        if !config.ownership.is_owner(&owner) {
494            tracing::error!(
495                "The chain with the ID returned by the faucet is not owned by you. \
496                Please make sure you are connecting to a genuine faucet."
497            );
498            return Err(error::Inner::ChainOwnership.into());
499        }
500
501        // Try to modify existing chain entry, setting the owner.
502        let modified = self
503            .wallet()
504            .modify(chain_id, |chain| chain.owner = Some(owner))
505            .await
506            .map_err(error::Inner::wallet)?;
507        // If the chain didn't exist, insert a new entry.
508        if modified.is_none() {
509            let timestamp = chain_description.timestamp();
510            let epoch = chain_description.config().epoch;
511            self.wallet()
512                .insert(
513                    chain_id,
514                    wallet::Chain {
515                        owner: Some(owner),
516                        timestamp,
517                        epoch: Some(epoch),
518                        ..Default::default()
519                    },
520                )
521                .await
522                .map_err(error::Inner::wallet)
523                .context("assigning new chain")?;
524        }
525        Ok(())
526    }
527
528    /// Applies the given function to the chain client.
529    ///
530    /// Updates the wallet regardless of the outcome. As long as the function returns a round
531    /// timeout, it will wait and retry.
532    pub async fn apply_client_command<E, F, Fut, T>(
533        &mut self,
534        client: &ChainClient<Env>,
535        mut f: F,
536    ) -> Result<T, Error>
537    where
538        F: FnMut(&ChainClient<Env>) -> Fut,
539        Fut: Future<Output = Result<ClientOutcome<T>, E>>,
540        Error: From<E>,
541    {
542        client.prepare_chain().await?;
543        // Try applying f optimistically without validator notifications. Return if committed.
544        let result = f(client).await;
545        self.update_wallet_from_client(client).await?;
546        match result? {
547            ClientOutcome::Committed(t) => return Ok(t),
548            ClientOutcome::Conflict(certificate) => {
549                return Err(chain_client::Error::Conflict(certificate.hash()).into());
550            }
551            ClientOutcome::WaitForTimeout(_) => {}
552        }
553
554        // Start listening for notifications, so we learn about new rounds and blocks.
555        let (listener, _listen_handle, mut notification_stream) = client.listen().await?;
556        self.chain_listeners.spawn_task(listener);
557
558        loop {
559            // Try applying f. Return if committed.
560            let result = f(client).await;
561            self.update_wallet_from_client(client).await?;
562            let timeout = match result? {
563                ClientOutcome::Committed(t) => return Ok(t),
564                ClientOutcome::Conflict(certificate) => {
565                    return Err(chain_client::Error::Conflict(certificate.hash()).into());
566                }
567                ClientOutcome::WaitForTimeout(timeout) => timeout,
568            };
569            // Otherwise wait and try again in the next round.
570            util::wait_for_next_round(&mut notification_stream, timeout).await;
571        }
572    }
573
574    pub async fn ownership(&mut self, chain_id: Option<ChainId>) -> Result<ChainOwnership, Error> {
575        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
576        let client = self.make_chain_client(chain_id).await?;
577        let info = client.chain_info().await?;
578        Ok(info.manager.ownership)
579    }
580
581    pub async fn change_ownership(
582        &mut self,
583        chain_id: Option<ChainId>,
584        ownership_config: ChainOwnershipConfig,
585    ) -> Result<(), Error> {
586        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
587        let chain_client = self.make_chain_client(chain_id).await?;
588        info!(
589            ?ownership_config, %chain_id, preferred_owner=?chain_client.preferred_owner(),
590            "Changing ownership of a chain"
591        );
592        let time_start = Instant::now();
593        let mut ownership = chain_client.query_chain_ownership().await?;
594        ownership_config.update(&mut ownership)?;
595
596        if ownership.super_owners.is_empty() && ownership.owners.is_empty() {
597            tracing::error!("At least one owner or super owner of the chain has to be set.");
598            return Err(error::Inner::ChainOwnership.into());
599        }
600
601        let certificate = self
602            .apply_client_command(&chain_client, |chain_client| {
603                let ownership = ownership.clone();
604                let chain_client = chain_client.clone();
605                async move {
606                    chain_client
607                        .change_ownership(ownership)
608                        .await
609                        .map_err(Error::from)
610                        .context("Failed to change ownership")
611                }
612            })
613            .await?;
614        let time_total = time_start.elapsed();
615        info!("Operation confirmed after {} ms", time_total.as_millis());
616        debug!("{:?}", certificate);
617        Ok(())
618    }
619
620    pub async fn set_preferred_owner(
621        &mut self,
622        chain_id: Option<ChainId>,
623        preferred_owner: AccountOwner,
624    ) -> Result<(), Error> {
625        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
626        let mut chain_client = self.make_chain_client(chain_id).await?;
627        let old_owner = chain_client.preferred_owner();
628        info!(%chain_id, ?old_owner, %preferred_owner, "Changing preferred owner for chain");
629        chain_client.set_preferred_owner(preferred_owner);
630        self.update_wallet_from_client(&chain_client).await?;
631        info!("New preferred owner set");
632        Ok(())
633    }
634
635    pub async fn check_compatible_version_info(
636        &self,
637        address: &str,
638        node: &impl ValidatorNode,
639    ) -> Result<VersionInfo, Error> {
640        match node.get_version_info().await {
641            Ok(version_info) if version_info.is_compatible_with(&linera_version::VERSION_INFO) => {
642                debug!(
643                    "Version information for validator {address}: {}",
644                    version_info
645                );
646                Ok(version_info)
647            }
648            Ok(version_info) => Err(error::Inner::UnexpectedVersionInfo {
649                remote: Box::new(version_info),
650                local: Box::new(linera_version::VERSION_INFO.clone()),
651            }
652            .into()),
653            Err(error) => Err(error::Inner::UnavailableVersionInfo {
654                address: address.to_string(),
655                error: Box::new(error),
656            }
657            .into()),
658        }
659    }
660
661    pub async fn check_matching_network_description(
662        &self,
663        address: &str,
664        node: &impl ValidatorNode,
665    ) -> Result<CryptoHash, Error> {
666        let network_description = self.genesis_config.network_description();
667        match node.get_network_description().await {
668            Ok(description) => {
669                if description == network_description {
670                    Ok(description.genesis_config_hash)
671                } else {
672                    Err(error::Inner::UnexpectedNetworkDescription {
673                        remote: Box::new(description),
674                        local: Box::new(network_description),
675                    }
676                    .into())
677                }
678            }
679            Err(error) => Err(error::Inner::UnavailableNetworkDescription {
680                address: address.to_string(),
681                error: Box::new(error),
682            }
683            .into()),
684        }
685    }
686
687    pub async fn check_validator_chain_info_response(
688        &self,
689        public_key: Option<&ValidatorPublicKey>,
690        address: &str,
691        node: &impl ValidatorNode,
692        chain_id: ChainId,
693    ) -> Result<ChainInfo, Error> {
694        let query = ChainInfoQuery::new(chain_id).with_manager_values();
695        match node.handle_chain_info_query(query).await {
696            Ok(response) => {
697                debug!(
698                    "Validator {address} sees chain {chain_id} at block height {} and epoch {:?}",
699                    response.info.next_block_height, response.info.epoch,
700                );
701                if let Some(public_key) = public_key {
702                    if response.check(*public_key).is_ok() {
703                        debug!("Signature for public key {public_key} is OK.");
704                    } else {
705                        return Err(error::Inner::InvalidSignature {
706                            public_key: *public_key,
707                        }
708                        .into());
709                    }
710                } else {
711                    warn!("Not checking signature as public key was not given");
712                }
713                Ok(*response.info)
714            }
715            Err(error) => Err(error::Inner::UnavailableChainInfo {
716                address: address.to_string(),
717                chain_id,
718                error: Box::new(error),
719            }
720            .into()),
721        }
722    }
723
724    /// Query a validator for version info, network description, and chain info.
725    ///
726    /// Returns a `ValidatorQueryResults` struct with the results of all three queries.
727    pub async fn query_validator(
728        &self,
729        address: &str,
730        node: &impl ValidatorNode,
731        chain_id: ChainId,
732        public_key: Option<&ValidatorPublicKey>,
733    ) -> ValidatorQueryResults {
734        let version_info = self.check_compatible_version_info(address, node).await;
735        let genesis_config_hash = self.check_matching_network_description(address, node).await;
736        let chain_info = self
737            .check_validator_chain_info_response(public_key, address, node, chain_id)
738            .await;
739
740        ValidatorQueryResults {
741            version_info,
742            genesis_config_hash,
743            chain_info,
744        }
745    }
746
747    /// Query the local node for version info, network description, and chain info.
748    ///
749    /// Returns a `ValidatorQueryResults` struct with the local node's information.
750    pub async fn query_local_node(
751        &self,
752        chain_id: ChainId,
753    ) -> Result<ValidatorQueryResults, Error> {
754        let version_info = Ok(linera_version::VERSION_INFO.clone());
755        let genesis_config_hash = Ok(self
756            .genesis_config
757            .network_description()
758            .genesis_config_hash);
759        let chain_info = self
760            .make_chain_client(chain_id)
761            .await?
762            .chain_info_with_manager_values()
763            .await
764            .map(|info| *info)
765            .map_err(|e| e.into());
766
767        Ok(ValidatorQueryResults {
768            version_info,
769            genesis_config_hash,
770            chain_info,
771        })
772    }
773}
774
775#[cfg(feature = "fs")]
776impl<Env: Environment> ClientContext<Env> {
777    pub async fn publish_module(
778        &mut self,
779        chain_client: &ChainClient<Env>,
780        contract: PathBuf,
781        service: PathBuf,
782        vm_runtime: VmRuntime,
783    ) -> Result<ModuleId, Error> {
784        info!("Loading bytecode files");
785        let contract_bytecode = Bytecode::load_from_file(&contract)
786            .await
787            .with_context(|| format!("failed to load contract bytecode from {:?}", &contract))?;
788        let service_bytecode = Bytecode::load_from_file(&service)
789            .await
790            .with_context(|| format!("failed to load service bytecode from {:?}", &service))?;
791
792        info!("Publishing module");
793        let (blobs, module_id) =
794            create_bytecode_blobs(contract_bytecode, service_bytecode, vm_runtime).await;
795        let (module_id, _) = self
796            .apply_client_command(chain_client, |chain_client| {
797                let blobs = blobs.clone();
798                let chain_client = chain_client.clone();
799                async move {
800                    chain_client
801                        .publish_module_blobs(blobs, module_id)
802                        .await
803                        .context("Failed to publish module")
804                }
805            })
806            .await?;
807
808        info!("{}", "Module published successfully!");
809
810        info!("Synchronizing client and processing inbox");
811        self.process_inbox(chain_client).await?;
812        Ok(module_id)
813    }
814
815    pub async fn publish_data_blob(
816        &mut self,
817        chain_client: &ChainClient<Env>,
818        blob_path: PathBuf,
819    ) -> Result<CryptoHash, Error> {
820        info!("Loading data blob file");
821        let blob_bytes = fs::read(&blob_path).context(format!(
822            "failed to load data blob bytes from {:?}",
823            &blob_path
824        ))?;
825
826        info!("Publishing data blob");
827        self.apply_client_command(chain_client, |chain_client| {
828            let blob_bytes = blob_bytes.clone();
829            let chain_client = chain_client.clone();
830            async move {
831                chain_client
832                    .publish_data_blob(blob_bytes)
833                    .await
834                    .context("Failed to publish data blob")
835            }
836        })
837        .await?;
838
839        info!("{}", "Data blob published successfully!");
840        Ok(CryptoHash::new(&BlobContent::new_data(blob_bytes)))
841    }
842
843    // TODO(#2490): Consider removing or renaming this.
844    pub async fn read_data_blob(
845        &mut self,
846        chain_client: &ChainClient<Env>,
847        hash: CryptoHash,
848    ) -> Result<(), Error> {
849        info!("Verifying data blob");
850        self.apply_client_command(chain_client, |chain_client| {
851            let chain_client = chain_client.clone();
852            async move {
853                chain_client
854                    .read_data_blob(hash)
855                    .await
856                    .context("Failed to verify data blob")
857            }
858        })
859        .await?;
860
861        info!("{}", "Data blob verified successfully!");
862        Ok(())
863    }
864}
865
866#[cfg(not(web))]
867impl<Env: Environment> ClientContext<Env> {
868    pub async fn prepare_for_benchmark(
869        &mut self,
870        num_chains: usize,
871        tokens_per_chain: Amount,
872        fungible_application_id: Option<ApplicationId>,
873        pub_keys: Vec<AccountPublicKey>,
874        chains_config_path: Option<&Path>,
875    ) -> Result<(Vec<ChainClient<Env>>, Vec<ChainId>), Error> {
876        let start = Instant::now();
877        // Below all block proposals are supposed to succeed without retries, we
878        // must make sure that all incoming payments have been accepted on-chain
879        // and that no validator is missing user certificates.
880        self.process_inboxes_and_force_validator_updates().await;
881        info!(
882            "Processed inboxes and forced validator updates in {} ms",
883            start.elapsed().as_millis()
884        );
885
886        let start = Instant::now();
887        let (benchmark_chains, chain_clients) = self
888            .make_benchmark_chains(
889                num_chains,
890                tokens_per_chain,
891                pub_keys,
892                chains_config_path.is_some(),
893            )
894            .await?;
895        info!(
896            "Got {} chains in {} ms",
897            num_chains,
898            start.elapsed().as_millis()
899        );
900
901        if let Some(id) = fungible_application_id {
902            let start = Instant::now();
903            self.supply_fungible_tokens(&benchmark_chains, id).await?;
904            info!(
905                "Supplied fungible tokens in {} ms",
906                start.elapsed().as_millis()
907            );
908            // Need to process inboxes to make sure the chains receive the supplied tokens.
909            let start = Instant::now();
910            for chain_client in &chain_clients {
911                chain_client.process_inbox().await?;
912            }
913            info!(
914                "Processed inboxes after supplying fungible tokens in {} ms",
915                start.elapsed().as_millis()
916            );
917        }
918
919        let all_chains = Benchmark::<Env>::get_all_chains(chains_config_path, &benchmark_chains)?;
920        let known_chain_ids: HashSet<_> = benchmark_chains.iter().map(|(id, _)| *id).collect();
921        let unknown_chain_ids: Vec<_> = all_chains
922            .iter()
923            .filter(|id| !known_chain_ids.contains(id))
924            .copied()
925            .collect();
926        if !unknown_chain_ids.is_empty() {
927            // The current client won't have the blobs for the chains in the other wallets. Even
928            // though it will eventually get those blobs, we're getting a head start here and
929            // fetching those blobs in advance.
930            for chain_id in &unknown_chain_ids {
931                self.client.get_chain_description(*chain_id).await?;
932            }
933        }
934
935        Ok((chain_clients, all_chains))
936    }
937
938    pub async fn wrap_up_benchmark(
939        &mut self,
940        chain_clients: Vec<ChainClient<Env>>,
941        close_chains: bool,
942        wrap_up_max_in_flight: usize,
943    ) -> Result<(), Error> {
944        if close_chains {
945            info!("Closing chains...");
946            let stream = stream::iter(chain_clients)
947                .map(|chain_client| async move {
948                    Benchmark::<Env>::close_benchmark_chain(&chain_client).await?;
949                    info!("Closed chain {:?}", chain_client.chain_id());
950                    Ok::<(), BenchmarkError>(())
951                })
952                .buffer_unordered(wrap_up_max_in_flight);
953            stream.try_collect::<Vec<_>>().await?;
954        } else {
955            info!("Processing inbox for all chains...");
956            let stream = stream::iter(chain_clients.clone())
957                .map(|chain_client| async move {
958                    chain_client.process_inbox().await?;
959                    info!("Processed inbox for chain {:?}", chain_client.chain_id());
960                    Ok::<(), chain_client::Error>(())
961                })
962                .buffer_unordered(wrap_up_max_in_flight);
963            stream.try_collect::<Vec<_>>().await?;
964
965            info!("Updating wallet from chain clients...");
966            for chain_client in chain_clients {
967                let info = chain_client.chain_info().await?;
968                let client_owner = chain_client.preferred_owner();
969                let pending_proposal = chain_client.pending_proposal().clone();
970                self.wallet()
971                    .insert(
972                        info.chain_id,
973                        wallet::Chain {
974                            pending_proposal,
975                            owner: client_owner,
976                            ..info.as_ref().into()
977                        },
978                    )
979                    .await
980                    .map_err(error::Inner::wallet)?;
981            }
982        }
983
984        Ok(())
985    }
986
987    async fn process_inboxes_and_force_validator_updates(&mut self) {
988        let mut join_set = task::JoinSet::new();
989
990        let chain_clients: Vec<_> = self
991            .wallet()
992            .owned_chain_ids()
993            .map_err(|e| error::Inner::wallet(e).into())
994            .and_then(|id| self.make_chain_client(id))
995            .try_collect()
996            .await
997            .unwrap();
998
999        for chain_client in chain_clients {
1000            join_set.spawn(async move {
1001                Self::process_inbox_without_updating_wallet(&chain_client)
1002                    .await
1003                    .expect("Processing inbox should not fail!");
1004                chain_client
1005            });
1006        }
1007
1008        for chain_client in join_set.join_all().await {
1009            self.update_wallet_from_client(&chain_client).await.unwrap();
1010        }
1011    }
1012
1013    async fn process_inbox_without_updating_wallet(
1014        chain_client: &ChainClient<Env>,
1015    ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
1016        // Try processing the inbox optimistically without waiting for validator notifications.
1017        chain_client.synchronize_from_validators().await?;
1018        let (certificates, maybe_timeout) = chain_client.process_inbox_without_prepare().await?;
1019        assert!(
1020            maybe_timeout.is_none(),
1021            "Should not timeout within benchmark!"
1022        );
1023
1024        Ok(certificates)
1025    }
1026
1027    /// Creates chains if necessary, and returns a map of exactly `num_chains` chain IDs
1028    /// with key pairs, as well as a map of the chain clients.
1029    async fn make_benchmark_chains(
1030        &mut self,
1031        num_chains: usize,
1032        balance: Amount,
1033        pub_keys: Vec<AccountPublicKey>,
1034        wallet_only: bool,
1035    ) -> Result<(Vec<(ChainId, AccountOwner)>, Vec<ChainClient<Env>>), Error> {
1036        let mut chains_found_in_wallet = 0;
1037        let mut benchmark_chains = Vec::with_capacity(num_chains);
1038        let mut chain_clients = Vec::with_capacity(num_chains);
1039        let start = Instant::now();
1040        let mut owned_chain_ids = std::pin::pin!(self.wallet().owned_chain_ids());
1041        while let Some(chain_id) = owned_chain_ids.next().await {
1042            let chain_id = chain_id.map_err(error::Inner::wallet)?;
1043            if chains_found_in_wallet == num_chains {
1044                break;
1045            }
1046            let chain_client = self.make_chain_client(chain_id).await?;
1047            let ownership = chain_client.chain_info().await?.manager.ownership;
1048            if !ownership.owners.is_empty() || ownership.super_owners.len() != 1 {
1049                continue;
1050            }
1051            chain_client.process_inbox().await?;
1052            benchmark_chains.push((
1053                chain_id,
1054                *ownership
1055                    .super_owners
1056                    .first()
1057                    .expect("should have a super owner"),
1058            ));
1059            chain_clients.push(chain_client);
1060            chains_found_in_wallet += 1;
1061        }
1062        info!(
1063            "Got {} chains from the wallet in {} ms",
1064            benchmark_chains.len(),
1065            start.elapsed().as_millis()
1066        );
1067
1068        let num_chains_to_create = num_chains - chains_found_in_wallet;
1069
1070        let default_chain_client = self.make_chain_client(self.default_chain()).await?;
1071
1072        if num_chains_to_create > 0 {
1073            if wallet_only {
1074                return Err(
1075                    error::Inner::Benchmark(BenchmarkError::NotEnoughChainsInWallet(
1076                        num_chains,
1077                        chains_found_in_wallet,
1078                    ))
1079                    .into(),
1080                );
1081            }
1082            let mut pub_keys_iter = pub_keys.into_iter().take(num_chains_to_create);
1083            let operations_per_block = 900; // Over this we seem to hit the block size limits.
1084            for i in (0..num_chains_to_create).step_by(operations_per_block) {
1085                let num_new_chains = operations_per_block.min(num_chains_to_create - i);
1086                let pub_key = pub_keys_iter.next().unwrap();
1087                let owner = pub_key.into();
1088
1089                let certificate = Self::execute_open_chains_operations(
1090                    num_new_chains,
1091                    &default_chain_client,
1092                    balance,
1093                    owner,
1094                )
1095                .await?;
1096                info!("Block executed successfully");
1097
1098                let block = certificate.block();
1099                for i in 0..num_new_chains {
1100                    let chain_id = block.body.blobs[i]
1101                        .iter()
1102                        .find(|blob| blob.id().blob_type == BlobType::ChainDescription)
1103                        .map(|blob| ChainId(blob.id().hash))
1104                        .expect("failed to create a new chain");
1105                    self.client
1106                        .extend_chain_mode(chain_id, ListeningMode::FullChain);
1107
1108                    let mut chain_client = self.client.create_chain_client(
1109                        chain_id,
1110                        None,
1111                        BlockHeight::ZERO,
1112                        None,
1113                        Some(owner),
1114                        self.timing_sender(),
1115                        false,
1116                    );
1117                    chain_client.set_preferred_owner(owner);
1118                    chain_client.process_inbox().await?;
1119                    benchmark_chains.push((chain_id, owner));
1120                    chain_clients.push(chain_client);
1121                }
1122            }
1123
1124            info!(
1125                "Created {} chains in {} ms",
1126                num_chains_to_create,
1127                start.elapsed().as_millis()
1128            );
1129        }
1130
1131        info!("Updating wallet from client");
1132        self.update_wallet_from_client(&default_chain_client)
1133            .await?;
1134        info!("Retrying pending outgoing messages");
1135        default_chain_client
1136            .retry_pending_outgoing_messages()
1137            .await
1138            .context("outgoing messages to create the new chains should be delivered")?;
1139        info!("Processing default chain inbox");
1140        default_chain_client.process_inbox().await?;
1141
1142        assert_eq!(
1143            benchmark_chains.len(),
1144            chain_clients.len(),
1145            "benchmark_chains and chain_clients must have the same size"
1146        );
1147
1148        Ok((benchmark_chains, chain_clients))
1149    }
1150
1151    async fn execute_open_chains_operations(
1152        num_new_chains: usize,
1153        chain_client: &ChainClient<Env>,
1154        balance: Amount,
1155        owner: AccountOwner,
1156    ) -> Result<ConfirmedBlockCertificate, Error> {
1157        let config = OpenChainConfig {
1158            ownership: ChainOwnership::single_super(owner),
1159            balance,
1160            application_permissions: Default::default(),
1161        };
1162        let operations = iter::repeat_n(
1163            Operation::system(SystemOperation::OpenChain(config)),
1164            num_new_chains,
1165        )
1166        .collect();
1167        info!("Executing {} OpenChain operations", num_new_chains);
1168        Ok(chain_client
1169            .execute_operations(operations, vec![])
1170            .await?
1171            .expect("should execute block with OpenChain operations"))
1172    }
1173
1174    /// Supplies fungible tokens to the chains.
1175    async fn supply_fungible_tokens(
1176        &mut self,
1177        key_pairs: &[(ChainId, AccountOwner)],
1178        application_id: ApplicationId,
1179    ) -> Result<(), Error> {
1180        let default_chain_id = self.default_chain();
1181        let default_key = self
1182            .wallet()
1183            .get(default_chain_id)
1184            .await
1185            .unwrap()
1186            .unwrap()
1187            .owner
1188            .unwrap();
1189        // This should be enough to run the benchmark at 1M TPS for an hour.
1190        let amount = Amount::from_nanos(4);
1191        let operations: Vec<Operation> = key_pairs
1192            .iter()
1193            .map(|(chain_id, owner)| {
1194                Benchmark::<Env>::fungible_transfer(
1195                    application_id,
1196                    *chain_id,
1197                    default_key,
1198                    *owner,
1199                    amount,
1200                )
1201            })
1202            .collect();
1203        let chain_client = self.make_chain_client(default_chain_id).await?;
1204        // Put at most 1000 fungible token operations in each block.
1205        for operation_chunk in operations.chunks(1000) {
1206            chain_client
1207                .execute_operations(operation_chunk.to_vec(), vec![])
1208                .await?
1209                .expect("should execute block with Transfer operations");
1210        }
1211        self.update_wallet_from_client(&chain_client).await?;
1212
1213        Ok(())
1214    }
1215}