linera_client/
client_context.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use futures::Future;
7use linera_base::{
8    crypto::{CryptoHash, ValidatorPublicKey},
9    data_types::{BlockHeight, Epoch, Timestamp},
10    identifiers::{Account, AccountOwner, ChainId},
11    ownership::ChainOwnership,
12    time::{Duration, Instant},
13};
14use linera_chain::{manager::LockingBlock, types::ConfirmedBlockCertificate};
15use linera_core::{
16    client::{ChainClient, Client, ListeningMode},
17    data_types::{ChainInfo, ChainInfoQuery, ClientOutcome},
18    join_set_ext::JoinSet,
19    node::ValidatorNode,
20    Environment, JoinSetExt as _,
21};
22use linera_persistent::{Persist, PersistExt as _};
23use linera_rpc::node_provider::{NodeOptions, NodeProvider};
24use linera_version::VersionInfo;
25use thiserror_context::Context;
26use tracing::{debug, info, warn};
27#[cfg(not(web))]
28use {
29    crate::{
30        benchmark::{Benchmark, BenchmarkError},
31        client_metrics::ClientMetrics,
32    },
33    futures::{stream, StreamExt, TryStreamExt},
34    linera_base::{
35        crypto::AccountPublicKey,
36        data_types::Amount,
37        identifiers::{ApplicationId, BlobType},
38    },
39    linera_core::client::chain_client,
40    linera_execution::{
41        system::{OpenChainConfig, SystemOperation},
42        Operation,
43    },
44    std::{collections::HashSet, iter, path::Path},
45    tokio::{sync::mpsc, task},
46};
47#[cfg(feature = "fs")]
48use {
49    linera_base::{
50        data_types::{BlobContent, Bytecode},
51        identifiers::ModuleId,
52        vm::VmRuntime,
53    },
54    linera_core::client::create_bytecode_blobs,
55    std::{fs, path::PathBuf},
56};
57
58use crate::{
59    chain_listener::{self, ClientContext as _},
60    client_options::{ChainOwnershipConfig, ClientContextOptions},
61    error, util,
62    wallet::{UserChain, Wallet},
63    Error,
64};
65
66/// Results from querying a validator about version, network description, and chain info.
67pub struct ValidatorQueryResults {
68    /// The validator's version information.
69    pub version_info: Result<VersionInfo, Error>,
70    /// The validator's genesis config hash.
71    pub genesis_config_hash: Result<CryptoHash, Error>,
72    /// The validator's chain info (if valid and signature check passed).
73    pub chain_info: Result<ChainInfo, Error>,
74}
75
76impl ValidatorQueryResults {
77    /// Returns a vector of references to all errors in the query results.
78    pub fn errors(&self) -> Vec<&Error> {
79        let mut errors = Vec::new();
80        if let Err(e) = &self.version_info {
81            errors.push(e);
82        }
83        if let Err(e) = &self.genesis_config_hash {
84            errors.push(e);
85        }
86        if let Err(e) = &self.chain_info {
87            errors.push(e);
88        }
89        errors
90    }
91
92    /// Prints validator information to stdout.
93    ///
94    /// Prints public key, address, and optionally weight, version info, and chain info.
95    /// If `reference` is provided, only prints fields that differ from the reference.
96    pub fn print(
97        &self,
98        public_key: Option<&ValidatorPublicKey>,
99        address: Option<&str>,
100        weight: Option<u64>,
101        reference: Option<&ValidatorQueryResults>,
102    ) {
103        if let Some(key) = public_key {
104            println!("Public key: {}", key);
105        }
106        if let Some(address) = address {
107            println!("Address: {}", address);
108        }
109        if let Some(w) = weight {
110            println!("Weight: {}", w);
111        }
112
113        let ref_version = reference.and_then(|ref_results| ref_results.version_info.as_ref().ok());
114        match &self.version_info {
115            Ok(version_info) => {
116                if ref_version.is_none_or(|ref_v| ref_v.crate_version != version_info.crate_version)
117                {
118                    println!("Linera protocol: v{}", version_info.crate_version);
119                }
120                if ref_version.is_none_or(|ref_v| ref_v.rpc_hash != version_info.rpc_hash) {
121                    println!("RPC API hash: {}", version_info.rpc_hash);
122                }
123                if ref_version.is_none_or(|ref_v| ref_v.graphql_hash != version_info.graphql_hash) {
124                    println!("GraphQL API hash: {}", version_info.graphql_hash);
125                }
126                if ref_version.is_none_or(|ref_v| ref_v.wit_hash != version_info.wit_hash) {
127                    println!("WIT API hash: v{}", version_info.wit_hash);
128                }
129                if ref_version.is_none_or(|ref_v| {
130                    (&ref_v.git_commit, ref_v.git_dirty)
131                        != (&version_info.git_commit, version_info.git_dirty)
132                }) {
133                    println!(
134                        "Source code: {}/tree/{}{}",
135                        env!("CARGO_PKG_REPOSITORY"),
136                        version_info.git_commit,
137                        if version_info.git_dirty {
138                            " (dirty)"
139                        } else {
140                            ""
141                        }
142                    );
143                }
144            }
145            Err(err) => println!("Error getting version info: {err}"),
146        }
147
148        let ref_genesis_hash =
149            reference.and_then(|ref_results| ref_results.genesis_config_hash.as_ref().ok());
150        match &self.genesis_config_hash {
151            Ok(hash) if ref_genesis_hash.is_some_and(|ref_hash| ref_hash == hash) => {}
152            Ok(hash) => println!("Genesis config hash: {hash}"),
153            Err(err) => println!("Error getting genesis config: {err}"),
154        }
155
156        let ref_info = reference.and_then(|ref_results| ref_results.chain_info.as_ref().ok());
157        match &self.chain_info {
158            Ok(info) => {
159                if ref_info.is_none_or(|ref_info| info.block_hash != ref_info.block_hash) {
160                    if let Some(hash) = info.block_hash {
161                        println!("Block hash: {}", hash);
162                    } else {
163                        println!("Block hash: None");
164                    }
165                }
166                if ref_info
167                    .is_none_or(|ref_info| info.next_block_height != ref_info.next_block_height)
168                {
169                    println!("Next height: {}", info.next_block_height);
170                }
171                if ref_info.is_none_or(|ref_info| info.timestamp != ref_info.timestamp) {
172                    println!("Timestamp: {}", info.timestamp);
173                }
174                if ref_info.is_none_or(|ref_info| info.epoch != ref_info.epoch) {
175                    println!("Epoch: {}", info.epoch);
176                }
177                if ref_info.is_none_or(|ref_info| {
178                    info.manager.current_round != ref_info.manager.current_round
179                }) {
180                    println!("Round: {}", info.manager.current_round);
181                }
182                if let Some(locking) = &info.manager.requested_locking {
183                    match &**locking {
184                        LockingBlock::Fast(proposal) => {
185                            println!(
186                                "Locking fast block from {}",
187                                proposal.content.block.timestamp
188                            );
189                        }
190                        LockingBlock::Regular(validated) => {
191                            println!(
192                                "Locking block {} in {} from {}",
193                                validated.hash(),
194                                validated.round,
195                                validated.block().header.timestamp
196                            );
197                        }
198                    }
199                }
200            }
201            Err(err) => println!("Error getting chain info: {err}"),
202        }
203    }
204}
205
206pub struct ClientContext<Env: Environment, W> {
207    pub wallet: W,
208    pub client: Arc<Client<Env>>,
209    pub send_timeout: Duration,
210    pub recv_timeout: Duration,
211    pub retry_delay: Duration,
212    pub max_retries: u32,
213    pub chain_listeners: JoinSet,
214    #[cfg(not(web))]
215    pub client_metrics: Option<ClientMetrics>,
216}
217
218impl<Env: Environment, W> chain_listener::ClientContext for ClientContext<Env, W>
219where
220    W: Persist<Target = Wallet>,
221{
222    type Environment = Env;
223
224    fn wallet(&self) -> &Wallet {
225        &self.wallet
226    }
227
228    fn storage(&self) -> &Env::Storage {
229        self.client.storage_client()
230    }
231
232    fn client(&self) -> &Arc<Client<Env>> {
233        &self.client
234    }
235
236    #[cfg(not(web))]
237    fn timing_sender(
238        &self,
239    ) -> Option<mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
240        self.client_metrics
241            .as_ref()
242            .map(|metrics| metrics.timing_sender.clone())
243    }
244
245    async fn update_wallet_for_new_chain(
246        &mut self,
247        chain_id: ChainId,
248        owner: Option<AccountOwner>,
249        timestamp: Timestamp,
250        epoch: Epoch,
251    ) -> Result<(), Error> {
252        self.update_wallet_for_new_chain(chain_id, owner, timestamp, epoch)
253            .await
254    }
255
256    async fn update_wallet(&mut self, client: &ChainClient<Env>) -> Result<(), Error> {
257        self.update_wallet_from_client(client).await
258    }
259}
260
261impl<S, Si, W> ClientContext<linera_core::environment::Impl<S, NodeProvider, Si>, W>
262where
263    S: linera_core::environment::Storage,
264    Si: linera_core::environment::Signer,
265    W: Persist<Target = Wallet>,
266{
267    pub fn new(
268        storage: S,
269        options: ClientContextOptions,
270        wallet: W,
271        signer: Si,
272        block_cache_size: usize,
273        execution_state_cache_size: usize,
274    ) -> Self {
275        #[cfg(not(web))]
276        let timing_config = options.to_timing_config();
277        let node_provider = NodeProvider::new(NodeOptions {
278            send_timeout: options.send_timeout,
279            recv_timeout: options.recv_timeout,
280            retry_delay: options.retry_delay,
281            max_retries: options.max_retries,
282        });
283        let chain_ids = wallet.chain_ids();
284        let name = match chain_ids.len() {
285            0 => "Client node".to_string(),
286            1 => format!("Client node for {:.8}", chain_ids[0]),
287            n => format!("Client node for {:.8} and {} others", chain_ids[0], n - 1),
288        };
289        let client = Client::new(
290            linera_core::environment::Impl {
291                network: node_provider,
292                storage,
293                signer,
294            },
295            wallet.genesis_admin_chain(),
296            options.long_lived_services,
297            chain_ids,
298            name,
299            options.chain_worker_ttl,
300            options.sender_chain_worker_ttl,
301            options.to_chain_client_options(),
302            block_cache_size,
303            execution_state_cache_size,
304        );
305
306        #[cfg(not(web))]
307        let client_metrics = if timing_config.enabled {
308            Some(ClientMetrics::new(timing_config))
309        } else {
310            None
311        };
312
313        ClientContext {
314            client: Arc::new(client),
315            wallet,
316            send_timeout: options.send_timeout,
317            recv_timeout: options.recv_timeout,
318            retry_delay: options.retry_delay,
319            max_retries: options.max_retries,
320            chain_listeners: JoinSet::default(),
321            #[cfg(not(web))]
322            client_metrics,
323        }
324    }
325
326    #[cfg(with_testing)]
327    pub fn new_test_client_context(
328        storage: S,
329        wallet: W,
330        signer: Si,
331        block_cache_size: usize,
332        execution_state_cache_size: usize,
333    ) -> Self {
334        use linera_core::{client::chain_client, node::CrossChainMessageDelivery};
335
336        let send_recv_timeout = Duration::from_millis(4000);
337        let retry_delay = Duration::from_millis(1000);
338        let max_retries = 10;
339        let chain_worker_ttl = Duration::from_secs(30);
340        let sender_chain_worker_ttl = Duration::from_secs(1);
341
342        let node_options = NodeOptions {
343            send_timeout: send_recv_timeout,
344            recv_timeout: send_recv_timeout,
345            retry_delay,
346            max_retries,
347        };
348        let chain_ids = wallet.chain_ids();
349        let name = match chain_ids.len() {
350            0 => "Client node".to_string(),
351            1 => format!("Client node for {:.8}", chain_ids[0]),
352            n => format!("Client node for {:.8} and {} others", chain_ids[0], n - 1),
353        };
354        let client = Client::new(
355            linera_core::environment::Impl {
356                storage,
357                network: NodeProvider::new(node_options),
358                signer,
359            },
360            wallet.genesis_admin_chain(),
361            false,
362            chain_ids,
363            name,
364            chain_worker_ttl,
365            sender_chain_worker_ttl,
366            chain_client::Options {
367                cross_chain_message_delivery: CrossChainMessageDelivery::Blocking,
368                ..chain_client::Options::test_default()
369            },
370            block_cache_size,
371            execution_state_cache_size,
372        );
373
374        ClientContext {
375            client: Arc::new(client),
376            wallet,
377            send_timeout: send_recv_timeout,
378            recv_timeout: send_recv_timeout,
379            retry_delay,
380            max_retries,
381            chain_listeners: JoinSet::default(),
382            client_metrics: None,
383        }
384    }
385}
386
387impl<Env: Environment, W: Persist<Target = Wallet>> ClientContext<Env, W> {
388    /// Returns a reference to the wallet.
389    pub fn wallet(&self) -> &Wallet {
390        &self.wallet
391    }
392
393    /// Returns the wallet as a mutable reference.
394    pub fn wallet_mut(&mut self) -> &mut W {
395        &mut self.wallet
396    }
397
398    pub async fn mutate_wallet<R: Send>(
399        &mut self,
400        mutation: impl FnOnce(&mut Wallet) -> R + Send,
401    ) -> Result<R, Error> {
402        self.wallet
403            .mutate(mutation)
404            .await
405            .map_err(|e| error::Inner::Persistence(Box::new(e)).into())
406    }
407
408    /// Retrieve the default account. Current this is the common account of the default
409    /// chain.
410    pub fn default_account(&self) -> Account {
411        Account::chain(self.default_chain())
412    }
413
414    /// Retrieve the default chain.
415    pub fn default_chain(&self) -> ChainId {
416        self.wallet
417            .default_chain()
418            .expect("No chain specified in wallet with no default chain")
419    }
420
421    pub fn first_non_admin_chain(&self) -> ChainId {
422        self.wallet
423            .first_non_admin_chain()
424            .expect("No non-admin chain specified in wallet with no non-admin chain")
425    }
426
427    pub fn make_node_provider(&self) -> NodeProvider {
428        NodeProvider::new(self.make_node_options())
429    }
430
431    fn make_node_options(&self) -> NodeOptions {
432        NodeOptions {
433            send_timeout: self.send_timeout,
434            recv_timeout: self.recv_timeout,
435            retry_delay: self.retry_delay,
436            max_retries: self.max_retries,
437        }
438    }
439
440    #[cfg(not(web))]
441    pub fn client_metrics(&self) -> Option<&ClientMetrics> {
442        self.client_metrics.as_ref()
443    }
444
445    pub async fn save_wallet(&mut self) -> Result<(), Error> {
446        self.wallet
447            .persist()
448            .await
449            .map_err(|e| error::Inner::Persistence(Box::new(e)).into())
450    }
451
452    pub async fn update_wallet_from_client<Env_: Environment>(
453        &mut self,
454        client: &ChainClient<Env_>,
455    ) -> Result<(), Error> {
456        let info = client.chain_info().await?;
457        let client_owner = client.preferred_owner();
458        let pending_proposal = client.pending_proposal().clone();
459        self.wallet
460            .as_mut()
461            .update_from_info(pending_proposal, client_owner, &info);
462        self.save_wallet().await
463    }
464
465    /// Remembers the new chain and its owner (if any) in the wallet.
466    pub async fn update_wallet_for_new_chain(
467        &mut self,
468        chain_id: ChainId,
469        owner: Option<AccountOwner>,
470        timestamp: Timestamp,
471        epoch: Epoch,
472    ) -> Result<(), Error> {
473        if self.wallet.get(chain_id).is_none() {
474            self.mutate_wallet(|w| {
475                w.insert(UserChain {
476                    chain_id,
477                    owner,
478                    block_hash: None,
479                    timestamp,
480                    next_block_height: BlockHeight::ZERO,
481                    pending_proposal: None,
482                    epoch: Some(epoch),
483                })
484            })
485            .await?;
486        }
487
488        Ok(())
489    }
490
491    pub async fn process_inbox(
492        &mut self,
493        chain_client: &ChainClient<Env>,
494    ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
495        let mut certificates = Vec::new();
496        // Try processing the inbox optimistically without waiting for validator notifications.
497        let (new_certificates, maybe_timeout) = {
498            chain_client.synchronize_from_validators().await?;
499            let result = chain_client.process_inbox_without_prepare().await;
500            self.update_wallet_from_client(chain_client).await?;
501            if result.is_err() {
502                self.save_wallet().await?;
503            }
504            result?
505        };
506        certificates.extend(new_certificates);
507        if maybe_timeout.is_none() {
508            self.save_wallet().await?;
509            return Ok(certificates);
510        }
511
512        // Start listening for notifications, so we learn about new rounds and blocks.
513        let (listener, _listen_handle, mut notification_stream) =
514            chain_client.listen(ListeningMode::FullChain).await?;
515        self.chain_listeners.spawn_task(listener);
516
517        loop {
518            let (new_certificates, maybe_timeout) = {
519                let result = chain_client.process_inbox().await;
520                self.update_wallet_from_client(chain_client).await?;
521                if result.is_err() {
522                    self.save_wallet().await?;
523                }
524                result?
525            };
526            certificates.extend(new_certificates);
527            if let Some(timestamp) = maybe_timeout {
528                util::wait_for_next_round(&mut notification_stream, timestamp).await
529            } else {
530                self.save_wallet().await?;
531                return Ok(certificates);
532            }
533        }
534    }
535
536    pub async fn assign_new_chain_to_key(
537        &mut self,
538        chain_id: ChainId,
539        owner: AccountOwner,
540    ) -> Result<(), Error> {
541        self.client.track_chain(chain_id);
542        let client = self.make_chain_client(chain_id);
543        let chain_description = client.get_chain_description().await?;
544        let config = chain_description.config();
545
546        if !config.ownership.verify_owner(&owner) {
547            tracing::error!(
548                "The chain with the ID returned by the faucet is not owned by you. \
549                Please make sure you are connecting to a genuine faucet."
550            );
551            return Err(error::Inner::ChainOwnership.into());
552        }
553
554        self.wallet_mut()
555            .mutate(|w| {
556                w.assign_new_chain_to_owner(
557                    owner,
558                    chain_id,
559                    chain_description.timestamp(),
560                    chain_description.config().epoch,
561                )
562            })
563            .await
564            .map_err(|e| error::Inner::Persistence(Box::new(e)))?
565            .context("assigning new chain")?;
566        Ok(())
567    }
568
569    /// Applies the given function to the chain client.
570    ///
571    /// Updates the wallet regardless of the outcome. As long as the function returns a round
572    /// timeout, it will wait and retry.
573    pub async fn apply_client_command<E, F, Fut, T>(
574        &mut self,
575        client: &ChainClient<Env>,
576        mut f: F,
577    ) -> Result<T, Error>
578    where
579        F: FnMut(&ChainClient<Env>) -> Fut,
580        Fut: Future<Output = Result<ClientOutcome<T>, E>>,
581        Error: From<E>,
582    {
583        client.prepare_chain().await?;
584        // Try applying f optimistically without validator notifications. Return if committed.
585        let result = f(client).await;
586        self.update_wallet_from_client(client).await?;
587        if let ClientOutcome::Committed(t) = result? {
588            return Ok(t);
589        }
590
591        // Start listening for notifications, so we learn about new rounds and blocks.
592        let (listener, _listen_handle, mut notification_stream) =
593            client.listen(ListeningMode::FullChain).await?;
594        self.chain_listeners.spawn_task(listener);
595
596        loop {
597            // Try applying f. Return if committed.
598            let result = f(client).await;
599            self.update_wallet_from_client(client).await?;
600            let timeout = match result? {
601                ClientOutcome::Committed(t) => return Ok(t),
602                ClientOutcome::WaitForTimeout(timeout) => timeout,
603            };
604            // Otherwise wait and try again in the next round.
605            util::wait_for_next_round(&mut notification_stream, timeout).await;
606        }
607    }
608
609    pub async fn ownership(&mut self, chain_id: Option<ChainId>) -> Result<ChainOwnership, Error> {
610        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
611        let client = self.make_chain_client(chain_id);
612        let info = client.chain_info().await?;
613        Ok(info.manager.ownership)
614    }
615
616    pub async fn change_ownership(
617        &mut self,
618        chain_id: Option<ChainId>,
619        ownership_config: ChainOwnershipConfig,
620    ) -> Result<(), Error> {
621        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
622        let chain_client = self.make_chain_client(chain_id);
623        info!(
624            ?ownership_config, %chain_id, preferred_owner=?chain_client.preferred_owner(),
625            "Changing ownership of a chain"
626        );
627        let time_start = Instant::now();
628        let ownership = ChainOwnership::try_from(ownership_config)?;
629
630        let certificate = self
631            .apply_client_command(&chain_client, |chain_client| {
632                let ownership = ownership.clone();
633                let chain_client = chain_client.clone();
634                async move {
635                    chain_client
636                        .change_ownership(ownership)
637                        .await
638                        .map_err(Error::from)
639                        .context("Failed to change ownership")
640                }
641            })
642            .await?;
643        let time_total = time_start.elapsed();
644        info!("Operation confirmed after {} ms", time_total.as_millis());
645        debug!("{:?}", certificate);
646        Ok(())
647    }
648
649    pub async fn set_preferred_owner(
650        &mut self,
651        chain_id: Option<ChainId>,
652        preferred_owner: AccountOwner,
653    ) -> Result<(), Error> {
654        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
655        let mut chain_client = self.make_chain_client(chain_id);
656        let old_owner = chain_client.preferred_owner();
657        info!(%chain_id, ?old_owner, %preferred_owner, "Changing preferred owner for chain");
658        chain_client.set_preferred_owner(preferred_owner);
659        self.update_wallet_from_client(&chain_client).await?;
660        info!("New preferred owner set");
661        Ok(())
662    }
663
664    pub async fn check_compatible_version_info(
665        &self,
666        address: &str,
667        node: &impl ValidatorNode,
668    ) -> Result<VersionInfo, Error> {
669        match node.get_version_info().await {
670            Ok(version_info) if version_info.is_compatible_with(&linera_version::VERSION_INFO) => {
671                debug!(
672                    "Version information for validator {address}: {}",
673                    version_info
674                );
675                Ok(version_info)
676            }
677            Ok(version_info) => Err(error::Inner::UnexpectedVersionInfo {
678                remote: Box::new(version_info),
679                local: Box::new(linera_version::VERSION_INFO.clone()),
680            }
681            .into()),
682            Err(error) => Err(error::Inner::UnavailableVersionInfo {
683                address: address.to_string(),
684                error: Box::new(error),
685            }
686            .into()),
687        }
688    }
689
690    pub async fn check_matching_network_description(
691        &self,
692        address: &str,
693        node: &impl ValidatorNode,
694    ) -> Result<CryptoHash, Error> {
695        let network_description = self.wallet().genesis_config().network_description();
696        match node.get_network_description().await {
697            Ok(description) => {
698                if description == network_description {
699                    Ok(description.genesis_config_hash)
700                } else {
701                    Err(error::Inner::UnexpectedNetworkDescription {
702                        remote: Box::new(description),
703                        local: Box::new(network_description),
704                    }
705                    .into())
706                }
707            }
708            Err(error) => Err(error::Inner::UnavailableNetworkDescription {
709                address: address.to_string(),
710                error: Box::new(error),
711            }
712            .into()),
713        }
714    }
715
716    pub async fn check_validator_chain_info_response(
717        &self,
718        public_key: Option<&ValidatorPublicKey>,
719        address: &str,
720        node: &impl ValidatorNode,
721        chain_id: ChainId,
722    ) -> Result<ChainInfo, Error> {
723        let query = ChainInfoQuery::new(chain_id).with_manager_values();
724        match node.handle_chain_info_query(query).await {
725            Ok(response) => {
726                debug!(
727                    "Validator {address} sees chain {chain_id} at block height {} and epoch {:?}",
728                    response.info.next_block_height, response.info.epoch,
729                );
730                if let Some(public_key) = public_key {
731                    if response.check(*public_key).is_ok() {
732                        debug!("Signature for public key {public_key} is OK.");
733                    } else {
734                        return Err(error::Inner::InvalidSignature {
735                            public_key: *public_key,
736                        }
737                        .into());
738                    }
739                } else {
740                    warn!("Not checking signature as public key was not given");
741                }
742                Ok(*response.info)
743            }
744            Err(error) => Err(error::Inner::UnavailableChainInfo {
745                address: address.to_string(),
746                chain_id,
747                error: Box::new(error),
748            }
749            .into()),
750        }
751    }
752
753    /// Query a validator for version info, network description, and chain info.
754    ///
755    /// Returns a `ValidatorQueryResults` struct with the results of all three queries.
756    pub async fn query_validator(
757        &self,
758        address: &str,
759        node: &impl ValidatorNode,
760        chain_id: ChainId,
761        public_key: Option<&ValidatorPublicKey>,
762    ) -> ValidatorQueryResults {
763        let version_info = self.check_compatible_version_info(address, node).await;
764        let genesis_config_hash = self.check_matching_network_description(address, node).await;
765        let chain_info = self
766            .check_validator_chain_info_response(public_key, address, node, chain_id)
767            .await;
768
769        ValidatorQueryResults {
770            version_info,
771            genesis_config_hash,
772            chain_info,
773        }
774    }
775
776    /// Query the local node for version info, network description, and chain info.
777    ///
778    /// Returns a `ValidatorQueryResults` struct with the local node's information.
779    pub async fn query_local_node(&self, chain_id: ChainId) -> ValidatorQueryResults {
780        let version_info = Ok(linera_version::VERSION_INFO.clone());
781        let genesis_config_hash = Ok(self
782            .wallet()
783            .genesis_config()
784            .network_description()
785            .genesis_config_hash);
786        let chain_info = self
787            .make_chain_client(chain_id)
788            .chain_info_with_manager_values()
789            .await
790            .map(|info| *info)
791            .map_err(|e| e.into());
792
793        ValidatorQueryResults {
794            version_info,
795            genesis_config_hash,
796            chain_info,
797        }
798    }
799}
800
801#[cfg(feature = "fs")]
802impl<Env: Environment, W> ClientContext<Env, W>
803where
804    W: Persist<Target = Wallet>,
805{
806    pub async fn publish_module(
807        &mut self,
808        chain_client: &ChainClient<Env>,
809        contract: PathBuf,
810        service: PathBuf,
811        vm_runtime: VmRuntime,
812    ) -> Result<ModuleId, Error> {
813        info!("Loading bytecode files");
814        let contract_bytecode = Bytecode::load_from_file(&contract)
815            .with_context(|| format!("failed to load contract bytecode from {:?}", &contract))?;
816        let service_bytecode = Bytecode::load_from_file(&service)
817            .with_context(|| format!("failed to load service bytecode from {:?}", &service))?;
818
819        info!("Publishing module");
820        let (blobs, module_id) =
821            create_bytecode_blobs(contract_bytecode, service_bytecode, vm_runtime).await;
822        let (module_id, _) = self
823            .apply_client_command(chain_client, |chain_client| {
824                let blobs = blobs.clone();
825                let chain_client = chain_client.clone();
826                async move {
827                    chain_client
828                        .publish_module_blobs(blobs, module_id)
829                        .await
830                        .context("Failed to publish module")
831                }
832            })
833            .await?;
834
835        info!("{}", "Module published successfully!");
836
837        info!("Synchronizing client and processing inbox");
838        self.process_inbox(chain_client).await?;
839        Ok(module_id)
840    }
841
842    pub async fn publish_data_blob(
843        &mut self,
844        chain_client: &ChainClient<Env>,
845        blob_path: PathBuf,
846    ) -> Result<CryptoHash, Error> {
847        info!("Loading data blob file");
848        let blob_bytes = fs::read(&blob_path).context(format!(
849            "failed to load data blob bytes from {:?}",
850            &blob_path
851        ))?;
852
853        info!("Publishing data blob");
854        self.apply_client_command(chain_client, |chain_client| {
855            let blob_bytes = blob_bytes.clone();
856            let chain_client = chain_client.clone();
857            async move {
858                chain_client
859                    .publish_data_blob(blob_bytes)
860                    .await
861                    .context("Failed to publish data blob")
862            }
863        })
864        .await?;
865
866        info!("{}", "Data blob published successfully!");
867        Ok(CryptoHash::new(&BlobContent::new_data(blob_bytes)))
868    }
869
870    // TODO(#2490): Consider removing or renaming this.
871    pub async fn read_data_blob(
872        &mut self,
873        chain_client: &ChainClient<Env>,
874        hash: CryptoHash,
875    ) -> Result<(), Error> {
876        info!("Verifying data blob");
877        self.apply_client_command(chain_client, |chain_client| {
878            let chain_client = chain_client.clone();
879            async move {
880                chain_client
881                    .read_data_blob(hash)
882                    .await
883                    .context("Failed to verify data blob")
884            }
885        })
886        .await?;
887
888        info!("{}", "Data blob verified successfully!");
889        Ok(())
890    }
891}
892
893#[cfg(not(web))]
894impl<Env: Environment, W> ClientContext<Env, W>
895where
896    W: Persist<Target = Wallet>,
897{
898    pub async fn prepare_for_benchmark(
899        &mut self,
900        num_chains: usize,
901        tokens_per_chain: Amount,
902        fungible_application_id: Option<ApplicationId>,
903        pub_keys: Vec<AccountPublicKey>,
904        chains_config_path: Option<&Path>,
905    ) -> Result<(Vec<ChainClient<Env>>, Vec<ChainId>), Error> {
906        let start = Instant::now();
907        // Below all block proposals are supposed to succeed without retries, we
908        // must make sure that all incoming payments have been accepted on-chain
909        // and that no validator is missing user certificates.
910        self.process_inboxes_and_force_validator_updates().await;
911        info!(
912            "Processed inboxes and forced validator updates in {} ms",
913            start.elapsed().as_millis()
914        );
915
916        let start = Instant::now();
917        let (benchmark_chains, chain_clients) = self
918            .make_benchmark_chains(
919                num_chains,
920                tokens_per_chain,
921                pub_keys,
922                chains_config_path.is_some(),
923            )
924            .await?;
925        info!(
926            "Got {} chains in {} ms",
927            num_chains,
928            start.elapsed().as_millis()
929        );
930
931        if let Some(id) = fungible_application_id {
932            let start = Instant::now();
933            self.supply_fungible_tokens(&benchmark_chains, id).await?;
934            info!(
935                "Supplied fungible tokens in {} ms",
936                start.elapsed().as_millis()
937            );
938            // Need to process inboxes to make sure the chains receive the supplied tokens.
939            let start = Instant::now();
940            for chain_client in &chain_clients {
941                chain_client.process_inbox().await?;
942            }
943            info!(
944                "Processed inboxes after supplying fungible tokens in {} ms",
945                start.elapsed().as_millis()
946            );
947        }
948
949        let all_chains = Benchmark::<Env>::get_all_chains(chains_config_path, &benchmark_chains)?;
950        let known_chain_ids: HashSet<_> = benchmark_chains.iter().map(|(id, _)| *id).collect();
951        let unknown_chain_ids: Vec<_> = all_chains
952            .iter()
953            .filter(|id| !known_chain_ids.contains(id))
954            .copied()
955            .collect();
956        if !unknown_chain_ids.is_empty() {
957            // The current client won't have the blobs for the chains in the other wallets. Even
958            // though it will eventually get those blobs, we're getting a head start here and
959            // fetching those blobs in advance.
960            for chain_id in &unknown_chain_ids {
961                self.client.get_chain_description(*chain_id).await?;
962            }
963        }
964
965        Ok((chain_clients, all_chains))
966    }
967
968    pub async fn wrap_up_benchmark(
969        &mut self,
970        chain_clients: Vec<ChainClient<Env>>,
971        close_chains: bool,
972        wrap_up_max_in_flight: usize,
973    ) -> Result<(), Error> {
974        if close_chains {
975            info!("Closing chains...");
976            let stream = stream::iter(chain_clients)
977                .map(|chain_client| async move {
978                    Benchmark::<Env>::close_benchmark_chain(&chain_client).await?;
979                    info!("Closed chain {:?}", chain_client.chain_id());
980                    Ok::<(), BenchmarkError>(())
981                })
982                .buffer_unordered(wrap_up_max_in_flight);
983            stream.try_collect::<Vec<_>>().await?;
984        } else {
985            info!("Processing inbox for all chains...");
986            let stream = stream::iter(chain_clients.clone())
987                .map(|chain_client| async move {
988                    chain_client.process_inbox().await?;
989                    info!("Processed inbox for chain {:?}", chain_client.chain_id());
990                    Ok::<(), chain_client::Error>(())
991                })
992                .buffer_unordered(wrap_up_max_in_flight);
993            stream.try_collect::<Vec<_>>().await?;
994
995            info!("Updating wallet from chain clients...");
996            for chain_client in chain_clients {
997                let info = chain_client.chain_info().await?;
998                let client_owner = chain_client.preferred_owner();
999                let pending_proposal = chain_client.pending_proposal().clone();
1000                self.wallet
1001                    .as_mut()
1002                    .update_from_info(pending_proposal, client_owner, &info);
1003            }
1004            self.save_wallet().await?;
1005        }
1006
1007        Ok(())
1008    }
1009
1010    async fn process_inboxes_and_force_validator_updates(&mut self) {
1011        let mut chain_clients = vec![];
1012        for chain_id in &self.wallet.owned_chain_ids() {
1013            chain_clients.push(self.make_chain_client(*chain_id));
1014        }
1015
1016        let mut join_set = task::JoinSet::new();
1017        for chain_client in chain_clients {
1018            join_set.spawn(async move {
1019                Self::process_inbox_without_updating_wallet(&chain_client)
1020                    .await
1021                    .expect("Processing inbox should not fail!");
1022                chain_client
1023            });
1024        }
1025
1026        let chain_clients = join_set.join_all().await;
1027        for chain_client in &chain_clients {
1028            self.update_wallet_from_client(chain_client).await.unwrap();
1029        }
1030    }
1031
1032    async fn process_inbox_without_updating_wallet(
1033        chain_client: &ChainClient<Env>,
1034    ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
1035        // Try processing the inbox optimistically without waiting for validator notifications.
1036        chain_client.synchronize_from_validators().await?;
1037        let (certificates, maybe_timeout) = chain_client.process_inbox_without_prepare().await?;
1038        assert!(
1039            maybe_timeout.is_none(),
1040            "Should not timeout within benchmark!"
1041        );
1042
1043        Ok(certificates)
1044    }
1045
1046    /// Creates chains if necessary, and returns a map of exactly `num_chains` chain IDs
1047    /// with key pairs, as well as a map of the chain clients.
1048    async fn make_benchmark_chains(
1049        &mut self,
1050        num_chains: usize,
1051        balance: Amount,
1052        pub_keys: Vec<AccountPublicKey>,
1053        wallet_only: bool,
1054    ) -> Result<(Vec<(ChainId, AccountOwner)>, Vec<ChainClient<Env>>), Error> {
1055        let mut chains_found_in_wallet = 0;
1056        let mut benchmark_chains = Vec::with_capacity(num_chains);
1057        let mut chain_clients = Vec::with_capacity(num_chains);
1058        let start = Instant::now();
1059        for chain_id in self.wallet.owned_chain_ids() {
1060            if chains_found_in_wallet == num_chains {
1061                break;
1062            }
1063            let chain_client = self.make_chain_client(chain_id);
1064            let ownership = chain_client.chain_info().await?.manager.ownership;
1065            if !ownership.owners.is_empty() || ownership.super_owners.len() != 1 {
1066                continue;
1067            }
1068            chain_client.process_inbox().await?;
1069            benchmark_chains.push((
1070                chain_id,
1071                *ownership
1072                    .super_owners
1073                    .first()
1074                    .expect("should have a super owner"),
1075            ));
1076            chain_clients.push(chain_client);
1077            chains_found_in_wallet += 1;
1078        }
1079        info!(
1080            "Got {} chains from the wallet in {} ms",
1081            benchmark_chains.len(),
1082            start.elapsed().as_millis()
1083        );
1084
1085        let num_chains_to_create = num_chains - chains_found_in_wallet;
1086
1087        let default_chain_id = self
1088            .wallet
1089            .default_chain()
1090            .expect("should have default chain");
1091        let default_chain_client = self.make_chain_client(default_chain_id);
1092
1093        if num_chains_to_create > 0 {
1094            if wallet_only {
1095                return Err(
1096                    error::Inner::Benchmark(BenchmarkError::NotEnoughChainsInWallet(
1097                        num_chains,
1098                        chains_found_in_wallet,
1099                    ))
1100                    .into(),
1101                );
1102            }
1103            let mut pub_keys_iter = pub_keys.into_iter().take(num_chains_to_create);
1104            let operations_per_block = 900; // Over this we seem to hit the block size limits.
1105            for i in (0..num_chains_to_create).step_by(operations_per_block) {
1106                let num_new_chains = operations_per_block.min(num_chains_to_create - i);
1107                let pub_key = pub_keys_iter.next().unwrap();
1108                let owner = pub_key.into();
1109
1110                let certificate = Self::execute_open_chains_operations(
1111                    num_new_chains,
1112                    &default_chain_client,
1113                    balance,
1114                    owner,
1115                )
1116                .await?;
1117                info!("Block executed successfully");
1118
1119                let block = certificate.block();
1120                for i in 0..num_new_chains {
1121                    let chain_id = block.body.blobs[i]
1122                        .iter()
1123                        .find(|blob| blob.id().blob_type == BlobType::ChainDescription)
1124                        .map(|blob| ChainId(blob.id().hash))
1125                        .expect("failed to create a new chain");
1126                    self.client.track_chain(chain_id);
1127
1128                    let mut chain_client = self.client.create_chain_client(
1129                        chain_id,
1130                        None,
1131                        BlockHeight::ZERO,
1132                        None,
1133                        Some(owner),
1134                        self.timing_sender(),
1135                    );
1136                    chain_client.set_preferred_owner(owner);
1137                    chain_client.process_inbox().await?;
1138                    benchmark_chains.push((chain_id, owner));
1139                    chain_clients.push(chain_client);
1140                }
1141            }
1142
1143            info!(
1144                "Created {} chains in {} ms",
1145                num_chains_to_create,
1146                start.elapsed().as_millis()
1147            );
1148        }
1149
1150        info!("Updating wallet from client");
1151        self.update_wallet_from_client(&default_chain_client)
1152            .await?;
1153        info!("Retrying pending outgoing messages");
1154        default_chain_client
1155            .retry_pending_outgoing_messages()
1156            .await
1157            .context("outgoing messages to create the new chains should be delivered")?;
1158        info!("Processing default chain inbox");
1159        default_chain_client.process_inbox().await?;
1160
1161        assert_eq!(
1162            benchmark_chains.len(),
1163            chain_clients.len(),
1164            "benchmark_chains and chain_clients must have the same size"
1165        );
1166
1167        Ok((benchmark_chains, chain_clients))
1168    }
1169
1170    async fn execute_open_chains_operations(
1171        num_new_chains: usize,
1172        chain_client: &ChainClient<Env>,
1173        balance: Amount,
1174        owner: AccountOwner,
1175    ) -> Result<ConfirmedBlockCertificate, Error> {
1176        let config = OpenChainConfig {
1177            ownership: ChainOwnership::single_super(owner),
1178            balance,
1179            application_permissions: Default::default(),
1180        };
1181        let operations = iter::repeat_n(
1182            Operation::system(SystemOperation::OpenChain(config)),
1183            num_new_chains,
1184        )
1185        .collect();
1186        info!("Executing {} OpenChain operations", num_new_chains);
1187        Ok(chain_client
1188            .execute_operations(operations, vec![])
1189            .await?
1190            .expect("should execute block with OpenChain operations"))
1191    }
1192
1193    /// Supplies fungible tokens to the chains.
1194    async fn supply_fungible_tokens(
1195        &mut self,
1196        key_pairs: &[(ChainId, AccountOwner)],
1197        application_id: ApplicationId,
1198    ) -> Result<(), Error> {
1199        let default_chain_id = self
1200            .wallet
1201            .default_chain()
1202            .expect("should have default chain");
1203        let default_key = self.wallet.get(default_chain_id).unwrap().owner.unwrap();
1204        // This should be enough to run the benchmark at 1M TPS for an hour.
1205        let amount = Amount::from_nanos(4);
1206        let operations: Vec<Operation> = key_pairs
1207            .iter()
1208            .map(|(chain_id, owner)| {
1209                Benchmark::<Env>::fungible_transfer(
1210                    application_id,
1211                    *chain_id,
1212                    default_key,
1213                    *owner,
1214                    amount,
1215                )
1216            })
1217            .collect();
1218        let chain_client = self.make_chain_client(default_chain_id);
1219        // Put at most 1000 fungible token operations in each block.
1220        for operation_chunk in operations.chunks(1000) {
1221            chain_client
1222                .execute_operations(operation_chunk.to_vec(), vec![])
1223                .await?
1224                .expect("should execute block with Transfer operations");
1225        }
1226        self.update_wallet_from_client(&chain_client).await?;
1227
1228        Ok(())
1229    }
1230}