linera_client/
client_context.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use futures::{Future, StreamExt as _, TryStreamExt as _};
7use linera_base::{
8    crypto::{CryptoHash, ValidatorPublicKey},
9    data_types::{Epoch, Timestamp},
10    identifiers::{Account, AccountOwner, ChainId},
11    ownership::ChainOwnership,
12    time::{Duration, Instant},
13    util::future::FutureSyncExt as _,
14};
15use linera_chain::{manager::LockingBlock, types::ConfirmedBlockCertificate};
16use linera_core::{
17    client::{ChainClient, Client, ListeningMode},
18    data_types::{ChainInfo, ChainInfoQuery, ClientOutcome},
19    join_set_ext::JoinSet,
20    node::ValidatorNode,
21    wallet, Environment, JoinSetExt as _, Wallet as _,
22};
23use linera_rpc::node_provider::{NodeOptions, NodeProvider};
24use linera_version::VersionInfo;
25use thiserror_context::Context;
26use tracing::{debug, info, warn};
27#[cfg(not(web))]
28use {
29    crate::{
30        benchmark::{Benchmark, BenchmarkError},
31        client_metrics::ClientMetrics,
32    },
33    futures::stream,
34    linera_base::{
35        crypto::AccountPublicKey,
36        data_types::{Amount, BlockHeight},
37        identifiers::{ApplicationId, BlobType},
38    },
39    linera_core::client::chain_client,
40    linera_execution::{
41        system::{OpenChainConfig, SystemOperation},
42        Operation,
43    },
44    std::{collections::HashSet, iter, path::Path},
45    tokio::{sync::mpsc, task},
46};
47#[cfg(feature = "fs")]
48use {
49    linera_base::{
50        data_types::{BlobContent, Bytecode},
51        identifiers::ModuleId,
52        vm::VmRuntime,
53    },
54    linera_core::client::create_bytecode_blobs,
55    std::{fs, path::PathBuf},
56};
57
58use crate::{
59    chain_listener::{self, ClientContext as _},
60    client_options::{ChainOwnershipConfig, ClientContextOptions},
61    config::GenesisConfig,
62    error, util, Error,
63};
64
65/// Results from querying a validator about version, network description, and chain info.
66pub struct ValidatorQueryResults {
67    /// The validator's version information.
68    pub version_info: Result<VersionInfo, Error>,
69    /// The validator's genesis config hash.
70    pub genesis_config_hash: Result<CryptoHash, Error>,
71    /// The validator's chain info (if valid and signature check passed).
72    pub chain_info: Result<ChainInfo, Error>,
73}
74
75impl ValidatorQueryResults {
76    /// Returns a vector of references to all errors in the query results.
77    pub fn errors(&self) -> Vec<&Error> {
78        let mut errors = Vec::new();
79        if let Err(e) = &self.version_info {
80            errors.push(e);
81        }
82        if let Err(e) = &self.genesis_config_hash {
83            errors.push(e);
84        }
85        if let Err(e) = &self.chain_info {
86            errors.push(e);
87        }
88        errors
89    }
90
91    /// Prints validator information to stdout.
92    ///
93    /// Prints public key, address, and optionally weight, version info, and chain info.
94    /// If `reference` is provided, only prints fields that differ from the reference.
95    pub fn print(
96        &self,
97        public_key: Option<&ValidatorPublicKey>,
98        address: Option<&str>,
99        weight: Option<u64>,
100        reference: Option<&ValidatorQueryResults>,
101    ) {
102        if let Some(key) = public_key {
103            println!("Public key: {}", key);
104        }
105        if let Some(address) = address {
106            println!("Address: {}", address);
107        }
108        if let Some(w) = weight {
109            println!("Weight: {}", w);
110        }
111
112        let ref_version = reference.and_then(|ref_results| ref_results.version_info.as_ref().ok());
113        match &self.version_info {
114            Ok(version_info) => {
115                if ref_version.is_none_or(|ref_v| ref_v.crate_version != version_info.crate_version)
116                {
117                    println!("Linera protocol: v{}", version_info.crate_version);
118                }
119                if ref_version.is_none_or(|ref_v| ref_v.rpc_hash != version_info.rpc_hash) {
120                    println!("RPC API hash: {}", version_info.rpc_hash);
121                }
122                if ref_version.is_none_or(|ref_v| ref_v.graphql_hash != version_info.graphql_hash) {
123                    println!("GraphQL API hash: {}", version_info.graphql_hash);
124                }
125                if ref_version.is_none_or(|ref_v| ref_v.wit_hash != version_info.wit_hash) {
126                    println!("WIT API hash: {}", version_info.wit_hash);
127                }
128                if ref_version.is_none_or(|ref_v| {
129                    (&ref_v.git_commit, ref_v.git_dirty)
130                        != (&version_info.git_commit, version_info.git_dirty)
131                }) {
132                    println!(
133                        "Source code: {}/tree/{}{}",
134                        env!("CARGO_PKG_REPOSITORY"),
135                        version_info.git_commit,
136                        if version_info.git_dirty {
137                            " (dirty)"
138                        } else {
139                            ""
140                        }
141                    );
142                }
143            }
144            Err(err) => println!("Error getting version info: {err}"),
145        }
146
147        let ref_genesis_hash =
148            reference.and_then(|ref_results| ref_results.genesis_config_hash.as_ref().ok());
149        match &self.genesis_config_hash {
150            Ok(hash) if ref_genesis_hash.is_some_and(|ref_hash| ref_hash == hash) => {}
151            Ok(hash) => println!("Genesis config hash: {hash}"),
152            Err(err) => println!("Error getting genesis config: {err}"),
153        }
154
155        let ref_info = reference.and_then(|ref_results| ref_results.chain_info.as_ref().ok());
156        match &self.chain_info {
157            Ok(info) => {
158                if ref_info.is_none_or(|ref_info| info.block_hash != ref_info.block_hash) {
159                    if let Some(hash) = info.block_hash {
160                        println!("Block hash: {}", hash);
161                    } else {
162                        println!("Block hash: None");
163                    }
164                }
165                if ref_info
166                    .is_none_or(|ref_info| info.next_block_height != ref_info.next_block_height)
167                {
168                    println!("Next height: {}", info.next_block_height);
169                }
170                if ref_info.is_none_or(|ref_info| info.timestamp != ref_info.timestamp) {
171                    println!("Timestamp: {}", info.timestamp);
172                }
173                if ref_info.is_none_or(|ref_info| info.epoch != ref_info.epoch) {
174                    println!("Epoch: {}", info.epoch);
175                }
176                if ref_info.is_none_or(|ref_info| {
177                    info.manager.current_round != ref_info.manager.current_round
178                }) {
179                    println!("Round: {}", info.manager.current_round);
180                }
181                if let Some(locking) = &info.manager.requested_locking {
182                    match &**locking {
183                        LockingBlock::Fast(proposal) => {
184                            println!(
185                                "Locking fast block from {}",
186                                proposal.content.block.timestamp
187                            );
188                        }
189                        LockingBlock::Regular(validated) => {
190                            println!(
191                                "Locking block {} in {} from {}",
192                                validated.hash(),
193                                validated.round,
194                                validated.block().header.timestamp
195                            );
196                        }
197                    }
198                }
199            }
200            Err(err) => println!("Error getting chain info: {err}"),
201        }
202    }
203}
204
205pub struct ClientContext<Env: Environment> {
206    pub client: Arc<Client<Env>>,
207    // TODO(#5083): this doesn't really need to be stored
208    pub genesis_config: crate::config::GenesisConfig,
209    pub send_timeout: Duration,
210    pub recv_timeout: Duration,
211    pub retry_delay: Duration,
212    pub max_retries: u32,
213    pub chain_listeners: JoinSet,
214    // TODO(#5082): move this into the upstream UI layers (maybe just the CLI)
215    pub default_chain: Option<ChainId>,
216    #[cfg(not(web))]
217    pub client_metrics: Option<ClientMetrics>,
218}
219
220impl<Env: Environment> chain_listener::ClientContext for ClientContext<Env> {
221    type Environment = Env;
222
223    fn wallet(&self) -> &Env::Wallet {
224        self.client.wallet()
225    }
226
227    fn storage(&self) -> &Env::Storage {
228        self.client.storage_client()
229    }
230
231    fn client(&self) -> &Arc<Client<Env>> {
232        &self.client
233    }
234
235    #[cfg(not(web))]
236    fn timing_sender(
237        &self,
238    ) -> Option<mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
239        self.client_metrics
240            .as_ref()
241            .map(|metrics| metrics.timing_sender.clone())
242    }
243
244    async fn update_wallet_for_new_chain(
245        &mut self,
246        chain_id: ChainId,
247        owner: Option<AccountOwner>,
248        timestamp: Timestamp,
249        epoch: Epoch,
250    ) -> Result<(), Error> {
251        self.update_wallet_for_new_chain(chain_id, owner, timestamp, epoch)
252            .make_sync()
253            .await
254    }
255
256    async fn update_wallet(&mut self, client: &ChainClient<Env>) -> Result<(), Error> {
257        self.update_wallet_from_client(client).make_sync().await
258    }
259}
260
261impl<S, Si, W> ClientContext<linera_core::environment::Impl<S, NodeProvider, Si, W>>
262where
263    S: linera_core::environment::Storage,
264    Si: linera_core::environment::Signer,
265    W: linera_core::environment::Wallet,
266{
267    // not worth refactoring this because
268    // https://github.com/linera-io/linera-protocol/issues/5082
269    // https://github.com/linera-io/linera-protocol/issues/5083
270    #[allow(clippy::too_many_arguments)]
271    pub async fn new(
272        storage: S,
273        wallet: W,
274        signer: Si,
275        options: ClientContextOptions,
276        default_chain: Option<ChainId>,
277        genesis_config: GenesisConfig,
278        block_cache_size: usize,
279        execution_state_cache_size: usize,
280    ) -> Result<Self, Error> {
281        #[cfg(not(web))]
282        let timing_config = options.to_timing_config();
283        let node_provider = NodeProvider::new(NodeOptions {
284            send_timeout: options.send_timeout,
285            recv_timeout: options.recv_timeout,
286            retry_delay: options.retry_delay,
287            max_retries: options.max_retries,
288        });
289        let chain_ids: Vec<_> = wallet
290            .chain_ids()
291            .try_collect()
292            .await
293            .map_err(error::Inner::wallet)?;
294        let name = match chain_ids.len() {
295            0 => "Client node".to_string(),
296            1 => format!("Client node for {:.8}", chain_ids[0]),
297            n => format!("Client node for {:.8} and {} others", chain_ids[0], n - 1),
298        };
299        let client = Client::new(
300            linera_core::environment::Impl {
301                network: node_provider,
302                storage,
303                signer,
304                wallet,
305            },
306            genesis_config.admin_id(),
307            options.long_lived_services,
308            chain_ids,
309            name,
310            options.chain_worker_ttl,
311            options.sender_chain_worker_ttl,
312            options.to_chain_client_options(),
313            block_cache_size,
314            execution_state_cache_size,
315            options.to_requests_scheduler_config(),
316        );
317
318        #[cfg(not(web))]
319        let client_metrics = if timing_config.enabled {
320            Some(ClientMetrics::new(timing_config))
321        } else {
322            None
323        };
324
325        Ok(ClientContext {
326            client: Arc::new(client),
327            default_chain,
328            genesis_config,
329            send_timeout: options.send_timeout,
330            recv_timeout: options.recv_timeout,
331            retry_delay: options.retry_delay,
332            max_retries: options.max_retries,
333            chain_listeners: JoinSet::default(),
334            #[cfg(not(web))]
335            client_metrics,
336        })
337    }
338}
339
340impl<Env: Environment> ClientContext<Env> {
341    // TODO(#5084) this (and other injected dependencies) should not be re-exposed by the
342    // client interface
343    /// Returns a reference to the wallet.
344    pub fn wallet(&self) -> &Env::Wallet {
345        self.client.wallet()
346    }
347
348    /// Returns the ID of the admin chain.
349    pub fn admin_chain(&self) -> ChainId {
350        self.client.admin_chain()
351    }
352
353    /// Retrieve the default account. Current this is the common account of the default
354    /// chain.
355    pub fn default_account(&self) -> Account {
356        Account::chain(self.default_chain())
357    }
358
359    /// Retrieve the default chain.
360    pub fn default_chain(&self) -> ChainId {
361        self.default_chain
362            .expect("default chain requested but none set")
363    }
364
365    pub async fn first_non_admin_chain(&self) -> Result<ChainId, Error> {
366        let admin_id = self.admin_chain();
367        std::pin::pin!(self
368            .wallet()
369            .chain_ids()
370            .try_filter(|chain_id| futures::future::ready(*chain_id != admin_id)))
371        .next()
372        .await
373        .expect("No non-admin chain specified in wallet with no non-admin chain")
374        .map_err(Error::wallet)
375    }
376
377    // TODO(#5084) this should match the `NodeProvider` from the `Environment`
378    pub fn make_node_provider(&self) -> NodeProvider {
379        NodeProvider::new(self.make_node_options())
380    }
381
382    fn make_node_options(&self) -> NodeOptions {
383        NodeOptions {
384            send_timeout: self.send_timeout,
385            recv_timeout: self.recv_timeout,
386            retry_delay: self.retry_delay,
387            max_retries: self.max_retries,
388        }
389    }
390
391    #[cfg(not(web))]
392    pub fn client_metrics(&self) -> Option<&ClientMetrics> {
393        self.client_metrics.as_ref()
394    }
395
396    pub async fn update_wallet_from_client<Env_: Environment>(
397        &self,
398        client: &ChainClient<Env_>,
399    ) -> Result<(), Error> {
400        let info = client.chain_info().await?;
401        let chain_id = info.chain_id;
402        let client_owner = client.preferred_owner();
403        let pending_proposal = client.pending_proposal().clone();
404        let new_chain: wallet::Chain = info.as_ref().into();
405        // Try to modify the existing entry, preserving `follow_only`.
406        let modified = self
407            .wallet()
408            .modify(chain_id, |chain| {
409                chain.block_hash = new_chain.block_hash;
410                chain.next_block_height = new_chain.next_block_height;
411                chain.timestamp = new_chain.timestamp;
412                chain.epoch = new_chain.epoch;
413                chain.owner = client_owner;
414                chain.pending_proposal = pending_proposal.clone();
415            })
416            .await
417            .map_err(error::Inner::wallet)?;
418        // If the chain didn't exist, insert a new entry.
419        if modified.is_none() {
420            self.wallet()
421                .insert(
422                    chain_id,
423                    wallet::Chain {
424                        pending_proposal,
425                        owner: client_owner,
426                        ..new_chain
427                    },
428                )
429                .await
430                .map_err(error::Inner::wallet)?;
431        }
432        Ok(())
433    }
434
435    /// Remembers the new chain and its owner (if any) in the wallet.
436    pub async fn update_wallet_for_new_chain(
437        &mut self,
438        chain_id: ChainId,
439        owner: Option<AccountOwner>,
440        timestamp: Timestamp,
441        epoch: Epoch,
442    ) -> Result<(), Error> {
443        let _ = self
444            .wallet()
445            .try_insert(
446                chain_id,
447                linera_core::wallet::Chain::new(owner, epoch, timestamp),
448            )
449            .await
450            .map_err(error::Inner::wallet)?;
451        Ok(())
452    }
453
454    /// Sets the `follow_only` flag for a chain in both the wallet and the in-memory client state.
455    pub async fn set_follow_only(&self, chain_id: ChainId, follow_only: bool) -> Result<(), Error> {
456        self.wallet()
457            .modify(chain_id, |chain| chain.follow_only = follow_only)
458            .await
459            .map_err(error::Inner::wallet)?
460            .ok_or_else(|| error::Inner::UnknownChainId(chain_id))?;
461        self.client.set_chain_follow_only(chain_id, follow_only);
462        Ok(())
463    }
464
465    pub async fn process_inbox(
466        &mut self,
467        chain_client: &ChainClient<Env>,
468    ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
469        let mut certificates = Vec::new();
470        // Try processing the inbox optimistically without waiting for validator notifications.
471        let (new_certificates, maybe_timeout) = {
472            chain_client.synchronize_from_validators().await?;
473            let result = chain_client.process_inbox_without_prepare().await;
474            self.update_wallet_from_client(chain_client).await?;
475            result?
476        };
477        certificates.extend(new_certificates);
478        if maybe_timeout.is_none() {
479            return Ok(certificates);
480        }
481
482        // Start listening for notifications, so we learn about new rounds and blocks.
483        let (listener, _listen_handle, mut notification_stream) =
484            chain_client.listen(ListeningMode::FullChain).await?;
485        self.chain_listeners.spawn_task(listener);
486
487        loop {
488            let (new_certificates, maybe_timeout) = {
489                let result = chain_client.process_inbox().await;
490                self.update_wallet_from_client(chain_client).await?;
491                result?
492            };
493            certificates.extend(new_certificates);
494            if let Some(timestamp) = maybe_timeout {
495                util::wait_for_next_round(&mut notification_stream, timestamp).await
496            } else {
497                return Ok(certificates);
498            }
499        }
500    }
501
502    pub async fn assign_new_chain_to_key(
503        &mut self,
504        chain_id: ChainId,
505        owner: AccountOwner,
506    ) -> Result<(), Error> {
507        self.client.track_chain(chain_id);
508        let client = self.make_chain_client(chain_id).await?;
509        let chain_description = client.get_chain_description().await?;
510        let config = chain_description.config();
511
512        if !config.ownership.is_owner(&owner) {
513            tracing::error!(
514                "The chain with the ID returned by the faucet is not owned by you. \
515                Please make sure you are connecting to a genuine faucet."
516            );
517            return Err(error::Inner::ChainOwnership.into());
518        }
519
520        // Try to modify existing chain entry, setting the owner and disabling follow-only mode.
521        let timestamp = chain_description.timestamp();
522        let epoch = chain_description.config().epoch;
523        let modified = self
524            .wallet()
525            .modify(chain_id, |chain| {
526                chain.owner = Some(owner);
527                chain.follow_only = false;
528            })
529            .await
530            .map_err(error::Inner::wallet)?;
531        // If the chain didn't exist, insert a new entry.
532        if modified.is_none() {
533            self.wallet()
534                .insert(
535                    chain_id,
536                    wallet::Chain {
537                        owner: Some(owner),
538                        timestamp,
539                        epoch: Some(epoch),
540                        ..Default::default()
541                    },
542                )
543                .await
544                .map_err(error::Inner::wallet)
545                .context("assigning new chain")?;
546        }
547        Ok(())
548    }
549
550    /// Applies the given function to the chain client.
551    ///
552    /// Updates the wallet regardless of the outcome. As long as the function returns a round
553    /// timeout, it will wait and retry.
554    pub async fn apply_client_command<E, F, Fut, T>(
555        &mut self,
556        client: &ChainClient<Env>,
557        mut f: F,
558    ) -> Result<T, Error>
559    where
560        F: FnMut(&ChainClient<Env>) -> Fut,
561        Fut: Future<Output = Result<ClientOutcome<T>, E>>,
562        Error: From<E>,
563    {
564        client.prepare_chain().await?;
565        // Try applying f optimistically without validator notifications. Return if committed.
566        let result = f(client).await;
567        self.update_wallet_from_client(client).await?;
568        if let ClientOutcome::Committed(t) = result? {
569            return Ok(t);
570        }
571
572        // Start listening for notifications, so we learn about new rounds and blocks.
573        let (listener, _listen_handle, mut notification_stream) =
574            client.listen(ListeningMode::FullChain).await?;
575        self.chain_listeners.spawn_task(listener);
576
577        loop {
578            // Try applying f. Return if committed.
579            let result = f(client).await;
580            self.update_wallet_from_client(client).await?;
581            let timeout = match result? {
582                ClientOutcome::Committed(t) => return Ok(t),
583                ClientOutcome::WaitForTimeout(timeout) => timeout,
584            };
585            // Otherwise wait and try again in the next round.
586            util::wait_for_next_round(&mut notification_stream, timeout).await;
587        }
588    }
589
590    pub async fn ownership(&mut self, chain_id: Option<ChainId>) -> Result<ChainOwnership, Error> {
591        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
592        let client = self.make_chain_client(chain_id).await?;
593        let info = client.chain_info().await?;
594        Ok(info.manager.ownership)
595    }
596
597    pub async fn change_ownership(
598        &mut self,
599        chain_id: Option<ChainId>,
600        ownership_config: ChainOwnershipConfig,
601    ) -> Result<(), Error> {
602        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
603        let chain_client = self.make_chain_client(chain_id).await?;
604        info!(
605            ?ownership_config, %chain_id, preferred_owner=?chain_client.preferred_owner(),
606            "Changing ownership of a chain"
607        );
608        let time_start = Instant::now();
609        let ownership = ChainOwnership::try_from(ownership_config)?;
610
611        let certificate = self
612            .apply_client_command(&chain_client, |chain_client| {
613                let ownership = ownership.clone();
614                let chain_client = chain_client.clone();
615                async move {
616                    chain_client
617                        .change_ownership(ownership)
618                        .await
619                        .map_err(Error::from)
620                        .context("Failed to change ownership")
621                }
622            })
623            .await?;
624        let time_total = time_start.elapsed();
625        info!("Operation confirmed after {} ms", time_total.as_millis());
626        debug!("{:?}", certificate);
627        Ok(())
628    }
629
630    pub async fn set_preferred_owner(
631        &mut self,
632        chain_id: Option<ChainId>,
633        preferred_owner: AccountOwner,
634    ) -> Result<(), Error> {
635        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
636        let mut chain_client = self.make_chain_client(chain_id).await?;
637        let old_owner = chain_client.preferred_owner();
638        info!(%chain_id, ?old_owner, %preferred_owner, "Changing preferred owner for chain");
639        chain_client.set_preferred_owner(preferred_owner);
640        self.update_wallet_from_client(&chain_client).await?;
641        info!("New preferred owner set");
642        Ok(())
643    }
644
645    pub async fn check_compatible_version_info(
646        &self,
647        address: &str,
648        node: &impl ValidatorNode,
649    ) -> Result<VersionInfo, Error> {
650        match node.get_version_info().await {
651            Ok(version_info) if version_info.is_compatible_with(&linera_version::VERSION_INFO) => {
652                debug!(
653                    "Version information for validator {address}: {}",
654                    version_info
655                );
656                Ok(version_info)
657            }
658            Ok(version_info) => Err(error::Inner::UnexpectedVersionInfo {
659                remote: Box::new(version_info),
660                local: Box::new(linera_version::VERSION_INFO.clone()),
661            }
662            .into()),
663            Err(error) => Err(error::Inner::UnavailableVersionInfo {
664                address: address.to_string(),
665                error: Box::new(error),
666            }
667            .into()),
668        }
669    }
670
671    pub async fn check_matching_network_description(
672        &self,
673        address: &str,
674        node: &impl ValidatorNode,
675    ) -> Result<CryptoHash, Error> {
676        let network_description = self.genesis_config.network_description();
677        match node.get_network_description().await {
678            Ok(description) => {
679                if description == network_description {
680                    Ok(description.genesis_config_hash)
681                } else {
682                    Err(error::Inner::UnexpectedNetworkDescription {
683                        remote: Box::new(description),
684                        local: Box::new(network_description),
685                    }
686                    .into())
687                }
688            }
689            Err(error) => Err(error::Inner::UnavailableNetworkDescription {
690                address: address.to_string(),
691                error: Box::new(error),
692            }
693            .into()),
694        }
695    }
696
697    pub async fn check_validator_chain_info_response(
698        &self,
699        public_key: Option<&ValidatorPublicKey>,
700        address: &str,
701        node: &impl ValidatorNode,
702        chain_id: ChainId,
703    ) -> Result<ChainInfo, Error> {
704        let query = ChainInfoQuery::new(chain_id).with_manager_values();
705        match node.handle_chain_info_query(query).await {
706            Ok(response) => {
707                debug!(
708                    "Validator {address} sees chain {chain_id} at block height {} and epoch {:?}",
709                    response.info.next_block_height, response.info.epoch,
710                );
711                if let Some(public_key) = public_key {
712                    if response.check(*public_key).is_ok() {
713                        debug!("Signature for public key {public_key} is OK.");
714                    } else {
715                        return Err(error::Inner::InvalidSignature {
716                            public_key: *public_key,
717                        }
718                        .into());
719                    }
720                } else {
721                    warn!("Not checking signature as public key was not given");
722                }
723                Ok(*response.info)
724            }
725            Err(error) => Err(error::Inner::UnavailableChainInfo {
726                address: address.to_string(),
727                chain_id,
728                error: Box::new(error),
729            }
730            .into()),
731        }
732    }
733
734    /// Query a validator for version info, network description, and chain info.
735    ///
736    /// Returns a `ValidatorQueryResults` struct with the results of all three queries.
737    pub async fn query_validator(
738        &self,
739        address: &str,
740        node: &impl ValidatorNode,
741        chain_id: ChainId,
742        public_key: Option<&ValidatorPublicKey>,
743    ) -> ValidatorQueryResults {
744        let version_info = self.check_compatible_version_info(address, node).await;
745        let genesis_config_hash = self.check_matching_network_description(address, node).await;
746        let chain_info = self
747            .check_validator_chain_info_response(public_key, address, node, chain_id)
748            .await;
749
750        ValidatorQueryResults {
751            version_info,
752            genesis_config_hash,
753            chain_info,
754        }
755    }
756
757    /// Query the local node for version info, network description, and chain info.
758    ///
759    /// Returns a `ValidatorQueryResults` struct with the local node's information.
760    pub async fn query_local_node(
761        &self,
762        chain_id: ChainId,
763    ) -> Result<ValidatorQueryResults, Error> {
764        let version_info = Ok(linera_version::VERSION_INFO.clone());
765        let genesis_config_hash = Ok(self
766            .genesis_config
767            .network_description()
768            .genesis_config_hash);
769        let chain_info = self
770            .make_chain_client(chain_id)
771            .await?
772            .chain_info_with_manager_values()
773            .await
774            .map(|info| *info)
775            .map_err(|e| e.into());
776
777        Ok(ValidatorQueryResults {
778            version_info,
779            genesis_config_hash,
780            chain_info,
781        })
782    }
783}
784
785#[cfg(feature = "fs")]
786impl<Env: Environment> ClientContext<Env> {
787    pub async fn publish_module(
788        &mut self,
789        chain_client: &ChainClient<Env>,
790        contract: PathBuf,
791        service: PathBuf,
792        vm_runtime: VmRuntime,
793    ) -> Result<ModuleId, Error> {
794        info!("Loading bytecode files");
795        let contract_bytecode = Bytecode::load_from_file(&contract)
796            .with_context(|| format!("failed to load contract bytecode from {:?}", &contract))?;
797        let service_bytecode = Bytecode::load_from_file(&service)
798            .with_context(|| format!("failed to load service bytecode from {:?}", &service))?;
799
800        info!("Publishing module");
801        let (blobs, module_id) =
802            create_bytecode_blobs(contract_bytecode, service_bytecode, vm_runtime).await;
803        let (module_id, _) = self
804            .apply_client_command(chain_client, |chain_client| {
805                let blobs = blobs.clone();
806                let chain_client = chain_client.clone();
807                async move {
808                    chain_client
809                        .publish_module_blobs(blobs, module_id)
810                        .await
811                        .context("Failed to publish module")
812                }
813            })
814            .await?;
815
816        info!("{}", "Module published successfully!");
817
818        info!("Synchronizing client and processing inbox");
819        self.process_inbox(chain_client).await?;
820        Ok(module_id)
821    }
822
823    pub async fn publish_data_blob(
824        &mut self,
825        chain_client: &ChainClient<Env>,
826        blob_path: PathBuf,
827    ) -> Result<CryptoHash, Error> {
828        info!("Loading data blob file");
829        let blob_bytes = fs::read(&blob_path).context(format!(
830            "failed to load data blob bytes from {:?}",
831            &blob_path
832        ))?;
833
834        info!("Publishing data blob");
835        self.apply_client_command(chain_client, |chain_client| {
836            let blob_bytes = blob_bytes.clone();
837            let chain_client = chain_client.clone();
838            async move {
839                chain_client
840                    .publish_data_blob(blob_bytes)
841                    .await
842                    .context("Failed to publish data blob")
843            }
844        })
845        .await?;
846
847        info!("{}", "Data blob published successfully!");
848        Ok(CryptoHash::new(&BlobContent::new_data(blob_bytes)))
849    }
850
851    // TODO(#2490): Consider removing or renaming this.
852    pub async fn read_data_blob(
853        &mut self,
854        chain_client: &ChainClient<Env>,
855        hash: CryptoHash,
856    ) -> Result<(), Error> {
857        info!("Verifying data blob");
858        self.apply_client_command(chain_client, |chain_client| {
859            let chain_client = chain_client.clone();
860            async move {
861                chain_client
862                    .read_data_blob(hash)
863                    .await
864                    .context("Failed to verify data blob")
865            }
866        })
867        .await?;
868
869        info!("{}", "Data blob verified successfully!");
870        Ok(())
871    }
872}
873
874#[cfg(not(web))]
875impl<Env: Environment> ClientContext<Env> {
876    pub async fn prepare_for_benchmark(
877        &mut self,
878        num_chains: usize,
879        tokens_per_chain: Amount,
880        fungible_application_id: Option<ApplicationId>,
881        pub_keys: Vec<AccountPublicKey>,
882        chains_config_path: Option<&Path>,
883    ) -> Result<(Vec<ChainClient<Env>>, Vec<ChainId>), Error> {
884        let start = Instant::now();
885        // Below all block proposals are supposed to succeed without retries, we
886        // must make sure that all incoming payments have been accepted on-chain
887        // and that no validator is missing user certificates.
888        self.process_inboxes_and_force_validator_updates().await;
889        info!(
890            "Processed inboxes and forced validator updates in {} ms",
891            start.elapsed().as_millis()
892        );
893
894        let start = Instant::now();
895        let (benchmark_chains, chain_clients) = self
896            .make_benchmark_chains(
897                num_chains,
898                tokens_per_chain,
899                pub_keys,
900                chains_config_path.is_some(),
901            )
902            .await?;
903        info!(
904            "Got {} chains in {} ms",
905            num_chains,
906            start.elapsed().as_millis()
907        );
908
909        if let Some(id) = fungible_application_id {
910            let start = Instant::now();
911            self.supply_fungible_tokens(&benchmark_chains, id).await?;
912            info!(
913                "Supplied fungible tokens in {} ms",
914                start.elapsed().as_millis()
915            );
916            // Need to process inboxes to make sure the chains receive the supplied tokens.
917            let start = Instant::now();
918            for chain_client in &chain_clients {
919                chain_client.process_inbox().await?;
920            }
921            info!(
922                "Processed inboxes after supplying fungible tokens in {} ms",
923                start.elapsed().as_millis()
924            );
925        }
926
927        let all_chains = Benchmark::<Env>::get_all_chains(chains_config_path, &benchmark_chains)?;
928        let known_chain_ids: HashSet<_> = benchmark_chains.iter().map(|(id, _)| *id).collect();
929        let unknown_chain_ids: Vec<_> = all_chains
930            .iter()
931            .filter(|id| !known_chain_ids.contains(id))
932            .copied()
933            .collect();
934        if !unknown_chain_ids.is_empty() {
935            // The current client won't have the blobs for the chains in the other wallets. Even
936            // though it will eventually get those blobs, we're getting a head start here and
937            // fetching those blobs in advance.
938            for chain_id in &unknown_chain_ids {
939                self.client.get_chain_description(*chain_id).await?;
940            }
941        }
942
943        Ok((chain_clients, all_chains))
944    }
945
946    pub async fn wrap_up_benchmark(
947        &mut self,
948        chain_clients: Vec<ChainClient<Env>>,
949        close_chains: bool,
950        wrap_up_max_in_flight: usize,
951    ) -> Result<(), Error> {
952        if close_chains {
953            info!("Closing chains...");
954            let stream = stream::iter(chain_clients)
955                .map(|chain_client| async move {
956                    Benchmark::<Env>::close_benchmark_chain(&chain_client).await?;
957                    info!("Closed chain {:?}", chain_client.chain_id());
958                    Ok::<(), BenchmarkError>(())
959                })
960                .buffer_unordered(wrap_up_max_in_flight);
961            stream.try_collect::<Vec<_>>().await?;
962        } else {
963            info!("Processing inbox for all chains...");
964            let stream = stream::iter(chain_clients.clone())
965                .map(|chain_client| async move {
966                    chain_client.process_inbox().await?;
967                    info!("Processed inbox for chain {:?}", chain_client.chain_id());
968                    Ok::<(), chain_client::Error>(())
969                })
970                .buffer_unordered(wrap_up_max_in_flight);
971            stream.try_collect::<Vec<_>>().await?;
972
973            info!("Updating wallet from chain clients...");
974            for chain_client in chain_clients {
975                let info = chain_client.chain_info().await?;
976                let client_owner = chain_client.preferred_owner();
977                let pending_proposal = chain_client.pending_proposal().clone();
978                self.wallet()
979                    .insert(
980                        info.chain_id,
981                        wallet::Chain {
982                            pending_proposal,
983                            owner: client_owner,
984                            ..info.as_ref().into()
985                        },
986                    )
987                    .await
988                    .map_err(error::Inner::wallet)?;
989            }
990        }
991
992        Ok(())
993    }
994
995    async fn process_inboxes_and_force_validator_updates(&mut self) {
996        let mut join_set = task::JoinSet::new();
997
998        let chain_clients: Vec<_> = self
999            .wallet()
1000            .owned_chain_ids()
1001            .map_err(|e| error::Inner::wallet(e).into())
1002            .and_then(|id| self.make_chain_client(id))
1003            .try_collect()
1004            .await
1005            .unwrap();
1006
1007        for chain_client in chain_clients {
1008            join_set.spawn(async move {
1009                Self::process_inbox_without_updating_wallet(&chain_client)
1010                    .await
1011                    .expect("Processing inbox should not fail!");
1012                chain_client
1013            });
1014        }
1015
1016        for chain_client in join_set.join_all().await {
1017            self.update_wallet_from_client(&chain_client).await.unwrap();
1018        }
1019    }
1020
1021    async fn process_inbox_without_updating_wallet(
1022        chain_client: &ChainClient<Env>,
1023    ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
1024        // Try processing the inbox optimistically without waiting for validator notifications.
1025        chain_client.synchronize_from_validators().await?;
1026        let (certificates, maybe_timeout) = chain_client.process_inbox_without_prepare().await?;
1027        assert!(
1028            maybe_timeout.is_none(),
1029            "Should not timeout within benchmark!"
1030        );
1031
1032        Ok(certificates)
1033    }
1034
1035    /// Creates chains if necessary, and returns a map of exactly `num_chains` chain IDs
1036    /// with key pairs, as well as a map of the chain clients.
1037    async fn make_benchmark_chains(
1038        &mut self,
1039        num_chains: usize,
1040        balance: Amount,
1041        pub_keys: Vec<AccountPublicKey>,
1042        wallet_only: bool,
1043    ) -> Result<(Vec<(ChainId, AccountOwner)>, Vec<ChainClient<Env>>), Error> {
1044        let mut chains_found_in_wallet = 0;
1045        let mut benchmark_chains = Vec::with_capacity(num_chains);
1046        let mut chain_clients = Vec::with_capacity(num_chains);
1047        let start = Instant::now();
1048        let mut owned_chain_ids = std::pin::pin!(self.wallet().owned_chain_ids());
1049        while let Some(chain_id) = owned_chain_ids.next().await {
1050            let chain_id = chain_id.map_err(error::Inner::wallet)?;
1051            if chains_found_in_wallet == num_chains {
1052                break;
1053            }
1054            let chain_client = self.make_chain_client(chain_id).await?;
1055            let ownership = chain_client.chain_info().await?.manager.ownership;
1056            if !ownership.owners.is_empty() || ownership.super_owners.len() != 1 {
1057                continue;
1058            }
1059            chain_client.process_inbox().await?;
1060            benchmark_chains.push((
1061                chain_id,
1062                *ownership
1063                    .super_owners
1064                    .first()
1065                    .expect("should have a super owner"),
1066            ));
1067            chain_clients.push(chain_client);
1068            chains_found_in_wallet += 1;
1069        }
1070        info!(
1071            "Got {} chains from the wallet in {} ms",
1072            benchmark_chains.len(),
1073            start.elapsed().as_millis()
1074        );
1075
1076        let num_chains_to_create = num_chains - chains_found_in_wallet;
1077
1078        let default_chain_client = self.make_chain_client(self.default_chain()).await?;
1079
1080        if num_chains_to_create > 0 {
1081            if wallet_only {
1082                return Err(
1083                    error::Inner::Benchmark(BenchmarkError::NotEnoughChainsInWallet(
1084                        num_chains,
1085                        chains_found_in_wallet,
1086                    ))
1087                    .into(),
1088                );
1089            }
1090            let mut pub_keys_iter = pub_keys.into_iter().take(num_chains_to_create);
1091            let operations_per_block = 900; // Over this we seem to hit the block size limits.
1092            for i in (0..num_chains_to_create).step_by(operations_per_block) {
1093                let num_new_chains = operations_per_block.min(num_chains_to_create - i);
1094                let pub_key = pub_keys_iter.next().unwrap();
1095                let owner = pub_key.into();
1096
1097                let certificate = Self::execute_open_chains_operations(
1098                    num_new_chains,
1099                    &default_chain_client,
1100                    balance,
1101                    owner,
1102                )
1103                .await?;
1104                info!("Block executed successfully");
1105
1106                let block = certificate.block();
1107                for i in 0..num_new_chains {
1108                    let chain_id = block.body.blobs[i]
1109                        .iter()
1110                        .find(|blob| blob.id().blob_type == BlobType::ChainDescription)
1111                        .map(|blob| ChainId(blob.id().hash))
1112                        .expect("failed to create a new chain");
1113                    self.client.track_chain(chain_id);
1114
1115                    let mut chain_client = self.client.create_chain_client(
1116                        chain_id,
1117                        None,
1118                        BlockHeight::ZERO,
1119                        None,
1120                        Some(owner),
1121                        self.timing_sender(),
1122                        false,
1123                    );
1124                    chain_client.set_preferred_owner(owner);
1125                    chain_client.process_inbox().await?;
1126                    benchmark_chains.push((chain_id, owner));
1127                    chain_clients.push(chain_client);
1128                }
1129            }
1130
1131            info!(
1132                "Created {} chains in {} ms",
1133                num_chains_to_create,
1134                start.elapsed().as_millis()
1135            );
1136        }
1137
1138        info!("Updating wallet from client");
1139        self.update_wallet_from_client(&default_chain_client)
1140            .await?;
1141        info!("Retrying pending outgoing messages");
1142        default_chain_client
1143            .retry_pending_outgoing_messages()
1144            .await
1145            .context("outgoing messages to create the new chains should be delivered")?;
1146        info!("Processing default chain inbox");
1147        default_chain_client.process_inbox().await?;
1148
1149        assert_eq!(
1150            benchmark_chains.len(),
1151            chain_clients.len(),
1152            "benchmark_chains and chain_clients must have the same size"
1153        );
1154
1155        Ok((benchmark_chains, chain_clients))
1156    }
1157
1158    async fn execute_open_chains_operations(
1159        num_new_chains: usize,
1160        chain_client: &ChainClient<Env>,
1161        balance: Amount,
1162        owner: AccountOwner,
1163    ) -> Result<ConfirmedBlockCertificate, Error> {
1164        let config = OpenChainConfig {
1165            ownership: ChainOwnership::single_super(owner),
1166            balance,
1167            application_permissions: Default::default(),
1168        };
1169        let operations = iter::repeat_n(
1170            Operation::system(SystemOperation::OpenChain(config)),
1171            num_new_chains,
1172        )
1173        .collect();
1174        info!("Executing {} OpenChain operations", num_new_chains);
1175        Ok(chain_client
1176            .execute_operations(operations, vec![])
1177            .await?
1178            .expect("should execute block with OpenChain operations"))
1179    }
1180
1181    /// Supplies fungible tokens to the chains.
1182    async fn supply_fungible_tokens(
1183        &mut self,
1184        key_pairs: &[(ChainId, AccountOwner)],
1185        application_id: ApplicationId,
1186    ) -> Result<(), Error> {
1187        let default_chain_id = self.default_chain();
1188        let default_key = self
1189            .wallet()
1190            .get(default_chain_id)
1191            .await
1192            .unwrap()
1193            .unwrap()
1194            .owner
1195            .unwrap();
1196        // This should be enough to run the benchmark at 1M TPS for an hour.
1197        let amount = Amount::from_nanos(4);
1198        let operations: Vec<Operation> = key_pairs
1199            .iter()
1200            .map(|(chain_id, owner)| {
1201                Benchmark::<Env>::fungible_transfer(
1202                    application_id,
1203                    *chain_id,
1204                    default_key,
1205                    *owner,
1206                    amount,
1207                )
1208            })
1209            .collect();
1210        let chain_client = self.make_chain_client(default_chain_id).await?;
1211        // Put at most 1000 fungible token operations in each block.
1212        for operation_chunk in operations.chunks(1000) {
1213            chain_client
1214                .execute_operations(operation_chunk.to_vec(), vec![])
1215                .await?
1216                .expect("should execute block with Transfer operations");
1217        }
1218        self.update_wallet_from_client(&chain_client).await?;
1219
1220        Ok(())
1221    }
1222}