linera_client/
client_context.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6#[cfg(not(web))]
7use futures::StreamExt as _;
8use futures::{Future, TryStreamExt as _};
9use linera_base::{
10    crypto::{CryptoHash, ValidatorPublicKey},
11    data_types::{ChainDescription, Epoch, Timestamp},
12    identifiers::{Account, AccountOwner, ChainId},
13    ownership::ChainOwnership,
14    time::{Duration, Instant},
15    util::future::FutureSyncExt as _,
16};
17use linera_chain::{manager::LockingBlock, types::ConfirmedBlockCertificate};
18use linera_core::{
19    client::{chain_client, ChainClient, Client, ListeningMode},
20    data_types::{ChainInfo, ChainInfoQuery, ClientOutcome},
21    join_set_ext::JoinSet,
22    node::ValidatorNode,
23    wallet, Environment, JoinSetExt as _, Wallet as _,
24};
25use linera_rpc::node_provider::{NodeOptions, NodeProvider};
26use linera_storage::Storage as _;
27use linera_version::VersionInfo;
28use thiserror_context::Context;
29use tracing::{debug, info, warn};
30#[cfg(not(web))]
31use {
32    crate::{
33        benchmark::{fungible_transfer, Benchmark, BenchmarkError},
34        client_metrics::ClientMetrics,
35    },
36    futures::stream,
37    linera_base::{
38        crypto::AccountPublicKey,
39        data_types::{Amount, BlockHeight},
40        identifiers::{ApplicationId, BlobType},
41    },
42    linera_execution::{
43        system::{OpenChainConfig, SystemOperation},
44        Operation,
45    },
46    std::{collections::HashSet, path::Path},
47    tokio::{sync::mpsc, task},
48};
49#[cfg(feature = "fs")]
50use {
51    linera_base::{
52        data_types::{BlobContent, Bytecode},
53        identifiers::ModuleId,
54        vm::VmRuntime,
55    },
56    linera_core::client::create_bytecode_blobs,
57    std::{fs, path::PathBuf},
58};
59
60use crate::{
61    chain_listener::{self, ClientContext as _},
62    client_options::{ChainOwnershipConfig, Options},
63    config::GenesisConfig,
64    error, util, Error,
65};
66
67/// Results from querying a validator about version, network description, and chain info.
68pub struct ValidatorQueryResults {
69    /// The validator's version information.
70    pub version_info: Result<VersionInfo, Error>,
71    /// The validator's genesis config hash.
72    pub genesis_config_hash: Result<CryptoHash, Error>,
73    /// The validator's chain info (if valid and signature check passed).
74    pub chain_info: Result<ChainInfo, Error>,
75}
76
77impl ValidatorQueryResults {
78    /// Returns a vector of references to all errors in the query results.
79    pub fn errors(&self) -> Vec<&Error> {
80        let mut errors = Vec::new();
81        if let Err(e) = &self.version_info {
82            errors.push(e);
83        }
84        if let Err(e) = &self.genesis_config_hash {
85            errors.push(e);
86        }
87        if let Err(e) = &self.chain_info {
88            errors.push(e);
89        }
90        errors
91    }
92
93    /// Prints validator information to stdout.
94    ///
95    /// Prints public key, address, and optionally weight, version info, and chain info.
96    /// If `reference` is provided, only prints fields that differ from the reference.
97    pub fn print(
98        &self,
99        public_key: Option<&ValidatorPublicKey>,
100        address: Option<&str>,
101        weight: Option<u64>,
102        reference: Option<&ValidatorQueryResults>,
103    ) {
104        if let Some(key) = public_key {
105            println!("Public key: {key}");
106        }
107        if let Some(address) = address {
108            println!("Address: {address}");
109        }
110        if let Some(w) = weight {
111            println!("Weight: {w}");
112        }
113
114        let ref_version = reference.and_then(|ref_results| ref_results.version_info.as_ref().ok());
115        match &self.version_info {
116            Ok(version_info) => {
117                if ref_version.is_none_or(|ref_v| ref_v.crate_version != version_info.crate_version)
118                {
119                    println!("Linera protocol: v{}", version_info.crate_version);
120                }
121                if ref_version.is_none_or(|ref_v| ref_v.rpc_hash != version_info.rpc_hash) {
122                    println!("RPC API hash: {}", version_info.rpc_hash);
123                }
124                if ref_version.is_none_or(|ref_v| ref_v.graphql_hash != version_info.graphql_hash) {
125                    println!("GraphQL API hash: {}", version_info.graphql_hash);
126                }
127                if ref_version.is_none_or(|ref_v| ref_v.wit_hash != version_info.wit_hash) {
128                    println!("WIT API hash: {}", version_info.wit_hash);
129                }
130                if ref_version.is_none_or(|ref_v| {
131                    (&ref_v.git_commit, ref_v.git_dirty)
132                        != (&version_info.git_commit, version_info.git_dirty)
133                }) {
134                    println!(
135                        "Source code: {}/tree/{}{}",
136                        env!("CARGO_PKG_REPOSITORY"),
137                        version_info.git_commit,
138                        if version_info.git_dirty {
139                            " (dirty)"
140                        } else {
141                            ""
142                        }
143                    );
144                }
145            }
146            Err(err) => println!("Error getting version info: {err}"),
147        }
148
149        let ref_genesis_hash =
150            reference.and_then(|ref_results| ref_results.genesis_config_hash.as_ref().ok());
151        match &self.genesis_config_hash {
152            Ok(hash) if ref_genesis_hash.is_some_and(|ref_hash| ref_hash == hash) => {}
153            Ok(hash) => println!("Genesis config hash: {hash}"),
154            Err(err) => println!("Error getting genesis config: {err}"),
155        }
156
157        let ref_info = reference.and_then(|ref_results| ref_results.chain_info.as_ref().ok());
158        match &self.chain_info {
159            Ok(info) => {
160                if ref_info.is_none_or(|ref_info| info.block_hash != ref_info.block_hash) {
161                    if let Some(hash) = info.block_hash {
162                        println!("Block hash: {hash}");
163                    } else {
164                        println!("Block hash: None");
165                    }
166                }
167                if ref_info
168                    .is_none_or(|ref_info| info.next_block_height != ref_info.next_block_height)
169                {
170                    println!("Next height: {}", info.next_block_height);
171                }
172                if ref_info.is_none_or(|ref_info| info.timestamp != ref_info.timestamp) {
173                    println!("Timestamp: {}", info.timestamp);
174                }
175                if ref_info.is_none_or(|ref_info| info.epoch != ref_info.epoch) {
176                    println!("Epoch: {}", info.epoch);
177                }
178                if ref_info.is_none_or(|ref_info| {
179                    info.manager.current_round != ref_info.manager.current_round
180                }) {
181                    println!("Round: {}", info.manager.current_round);
182                }
183                if let Some(locking) = &info.manager.requested_locking {
184                    match &**locking {
185                        LockingBlock::Fast(proposal) => {
186                            println!(
187                                "Locking fast block from {}",
188                                proposal.content.block.timestamp
189                            );
190                        }
191                        LockingBlock::Regular(validated) => {
192                            println!(
193                                "Locking block {} in {} from {}",
194                                validated.hash(),
195                                validated.round,
196                                validated.block().header.timestamp
197                            );
198                        }
199                    }
200                }
201            }
202            Err(err) => println!("Error getting chain info: {err}"),
203        }
204        println!();
205    }
206}
207
208pub struct ClientContext<Env: Environment> {
209    pub client: Arc<Client<Env>>,
210    // TODO(#5083): this doesn't really need to be stored
211    pub genesis_config: crate::config::GenesisConfig,
212    pub send_timeout: Duration,
213    pub recv_timeout: Duration,
214    pub retry_delay: Duration,
215    pub max_retries: u32,
216    pub max_backoff: Duration,
217    pub chain_listeners: JoinSet,
218    // TODO(#5082): move this into the upstream UI layers (maybe just the CLI)
219    pub default_chain: Option<ChainId>,
220    #[cfg(not(web))]
221    pub client_metrics: Option<ClientMetrics>,
222}
223
224impl<Env: Environment> chain_listener::ClientContext for ClientContext<Env> {
225    type Environment = Env;
226
227    fn wallet(&self) -> &Env::Wallet {
228        self.client.wallet()
229    }
230
231    fn storage(&self) -> &Env::Storage {
232        self.client.storage_client()
233    }
234
235    fn client(&self) -> &Arc<Client<Env>> {
236        &self.client
237    }
238
239    #[cfg(not(web))]
240    fn timing_sender(
241        &self,
242    ) -> Option<mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
243        self.client_metrics
244            .as_ref()
245            .map(|metrics| metrics.timing_sender.clone())
246    }
247
248    async fn update_wallet_for_new_chain(
249        &mut self,
250        chain_id: ChainId,
251        owner: Option<AccountOwner>,
252        timestamp: Timestamp,
253        epoch: Epoch,
254    ) -> Result<(), Error> {
255        self.update_wallet_for_new_chain(chain_id, owner, timestamp, epoch)
256            .make_sync()
257            .await
258    }
259
260    async fn update_wallet(&mut self, client: &ChainClient<Env>) -> Result<(), Error> {
261        self.update_wallet_from_client(client).make_sync().await
262    }
263}
264
265impl<S, Si, W> ClientContext<linera_core::environment::Impl<S, NodeProvider, Si, W>>
266where
267    S: linera_core::environment::Storage,
268    Si: linera_core::environment::Signer,
269    W: linera_core::environment::Wallet,
270{
271    // not worth refactoring this because
272    // https://github.com/linera-io/linera-protocol/issues/5082
273    // https://github.com/linera-io/linera-protocol/issues/5083
274    #[expect(clippy::too_many_arguments)]
275    pub async fn new(
276        storage: S,
277        wallet: W,
278        signer: Si,
279        options: &Options,
280        default_chain: Option<ChainId>,
281        genesis_config: GenesisConfig,
282        block_cache_size: usize,
283        execution_state_cache_size: usize,
284    ) -> Result<Self, Error> {
285        #[cfg(not(web))]
286        let timing_config = options.to_timing_config();
287        let node_provider = NodeProvider::new(NodeOptions {
288            send_timeout: options.send_timeout,
289            recv_timeout: options.recv_timeout,
290            retry_delay: options.retry_delay,
291            max_retries: options.max_retries,
292            max_backoff: options.max_backoff,
293        });
294        let chain_modes: Vec<_> = wallet
295            .items()
296            .map_ok(|(id, chain)| {
297                let mode = if chain.is_follow_only() {
298                    ListeningMode::FollowChain
299                } else {
300                    ListeningMode::FullChain
301                };
302                (id, mode)
303            })
304            .try_collect()
305            .await
306            .map_err(error::Inner::wallet)?;
307        let name = match chain_modes.len() {
308            0 => "Client node".to_string(),
309            1 => format!("Client node for {:.8}", chain_modes[0].0),
310            n => format!(
311                "Client node for {:.8} and {} others",
312                chain_modes[0].0,
313                n - 1
314            ),
315        };
316
317        let client = Client::new(
318            linera_core::environment::Impl {
319                network: node_provider,
320                storage,
321                signer,
322                wallet,
323            },
324            genesis_config.admin_chain_id(),
325            options.long_lived_services,
326            chain_modes,
327            name,
328            util::non_zero_duration(options.chain_worker_ttl),
329            util::non_zero_duration(options.sender_chain_worker_ttl),
330            options.cross_chain_batch_size_limit,
331            options.to_chain_client_options(),
332            block_cache_size,
333            execution_state_cache_size,
334            &options.to_requests_scheduler_config(),
335        );
336
337        #[cfg(not(web))]
338        let client_metrics = if timing_config.enabled {
339            Some(ClientMetrics::new(timing_config))
340        } else {
341            None
342        };
343
344        Ok(ClientContext {
345            client: Arc::new(client),
346            default_chain,
347            genesis_config,
348            send_timeout: options.send_timeout,
349            recv_timeout: options.recv_timeout,
350            retry_delay: options.retry_delay,
351            max_retries: options.max_retries,
352            max_backoff: options.max_backoff,
353            chain_listeners: JoinSet::default(),
354            #[cfg(not(web))]
355            client_metrics,
356        })
357    }
358}
359
360impl<Env: Environment> ClientContext<Env> {
361    // TODO(#5084) this (and other injected dependencies) should not be re-exposed by the
362    // client interface
363    /// Returns a reference to the wallet.
364    pub fn wallet(&self) -> &Env::Wallet {
365        self.client.wallet()
366    }
367
368    /// Returns the ID of the admin chain.
369    pub fn admin_chain_id(&self) -> ChainId {
370        self.client.admin_chain_id()
371    }
372
373    /// Retrieve the default account. Current this is the common account of the default
374    /// chain.
375    pub fn default_account(&self) -> Account {
376        Account::chain(self.default_chain())
377    }
378
379    /// Retrieve the default chain.
380    pub fn default_chain(&self) -> ChainId {
381        self.default_chain
382            .expect("default chain requested but none set")
383    }
384
385    pub async fn first_non_admin_chain(&self) -> Result<ChainId, Error> {
386        let admin_chain_id = self.admin_chain_id();
387        let chain_ids = self
388            .wallet()
389            .chain_ids()
390            .try_filter(|chain_id| futures::future::ready(*chain_id != admin_chain_id))
391            .try_collect::<Vec<ChainId>>()
392            .await
393            .map_err(Error::wallet)?;
394        Ok(chain_ids
395            .into_iter()
396            .min()
397            .expect("No non-admin chain specified in wallet with no non-admin chain"))
398    }
399
400    // TODO(#5084) this should match the `NodeProvider` from the `Environment`
401    pub fn make_node_provider(&self) -> NodeProvider {
402        NodeProvider::new(self.make_node_options())
403    }
404
405    fn make_node_options(&self) -> NodeOptions {
406        NodeOptions {
407            send_timeout: self.send_timeout,
408            recv_timeout: self.recv_timeout,
409            retry_delay: self.retry_delay,
410            max_retries: self.max_retries,
411            max_backoff: self.max_backoff,
412        }
413    }
414
415    #[cfg(not(web))]
416    pub fn client_metrics(&self) -> Option<&ClientMetrics> {
417        self.client_metrics.as_ref()
418    }
419
420    pub async fn update_wallet_from_client<Env_: Environment>(
421        &self,
422        client: &ChainClient<Env_>,
423    ) -> Result<(), Error> {
424        let info = client.chain_info().await?;
425        let chain_id = info.chain_id;
426        let existing_owner = self
427            .wallet()
428            .get(chain_id)
429            .await
430            .map_err(error::Inner::wallet)?
431            .and_then(|chain| chain.owner);
432
433        // Only persist proposals that were made in the fast round: they need to be
434        // remembered across sessions to make sure there are no conflicting fast proposals.
435        let pending_fast_proposal = client
436            .pending_proposal()
437            .await
438            .filter(|p| p.round.is_some_and(|r| r.is_fast()));
439        let new_chain = wallet::Chain {
440            pending_fast_proposal,
441            owner: existing_owner,
442            ..info.as_ref().into()
443        };
444
445        self.wallet()
446            .insert(chain_id, new_chain)
447            .await
448            .map_err(error::Inner::wallet)?;
449
450        Ok(())
451    }
452
453    /// Remembers the new chain and its owner (if any) in the wallet.
454    pub async fn update_wallet_for_new_chain(
455        &mut self,
456        chain_id: ChainId,
457        owner: Option<AccountOwner>,
458        timestamp: Timestamp,
459        epoch: Epoch,
460    ) -> Result<(), Error> {
461        self.wallet()
462            .try_insert(
463                chain_id,
464                linera_core::wallet::Chain::new(owner, epoch, timestamp),
465            )
466            .await
467            .map_err(error::Inner::wallet)?;
468        Ok(())
469    }
470
471    /// Registers a chain from its description: initializes local storage, adds to
472    /// wallet, and starts tracking it for cross-chain message delivery.
473    pub async fn extend_with_chain(
474        &mut self,
475        description: ChainDescription,
476        owner: Option<AccountOwner>,
477    ) -> Result<(), Error> {
478        let chain_id = description.id();
479        self.client
480            .storage_client()
481            .create_chain(description.clone())
482            .await?;
483        self.wallet()
484            .try_insert(
485                chain_id,
486                linera_core::wallet::Chain::new(
487                    owner,
488                    description.config().epoch,
489                    description.timestamp(),
490                ),
491            )
492            .await
493            .map_err(error::Inner::wallet)?;
494        self.client
495            .extend_chain_mode(chain_id, ListeningMode::FullChain);
496        Ok(())
497    }
498
499    pub async fn process_inbox(
500        &mut self,
501        chain_client: &ChainClient<Env>,
502    ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
503        let mut certificates = Vec::new();
504        // Try processing the inbox optimistically without waiting for validator notifications.
505        let (new_certificates, maybe_timeout) = {
506            chain_client.synchronize_from_validators().await?;
507            let result = chain_client.process_inbox_without_prepare().await;
508            self.update_wallet_from_client(chain_client).await?;
509            result?
510        };
511        certificates.extend(new_certificates);
512        if maybe_timeout.is_none() {
513            return Ok(certificates);
514        }
515
516        // Start listening for notifications, so we learn about new rounds and blocks.
517        let (listener, _listen_handle, mut notification_stream) = chain_client.listen().await?;
518        self.chain_listeners.spawn_task(listener);
519
520        loop {
521            let (new_certificates, maybe_timeout) = {
522                let result = chain_client.process_inbox().await;
523                self.update_wallet_from_client(chain_client).await?;
524                result?
525            };
526            certificates.extend(new_certificates);
527            if let Some(timestamp) = maybe_timeout {
528                util::wait_for_next_round(&mut notification_stream, timestamp).await
529            } else {
530                return Ok(certificates);
531            }
532        }
533    }
534
535    pub async fn assign_new_chain_to_key(
536        &mut self,
537        chain_id: ChainId,
538        owner: AccountOwner,
539    ) -> Result<(), Error> {
540        self.client
541            .extend_chain_mode(chain_id, ListeningMode::FullChain);
542        let client = self.make_chain_client(chain_id).await?;
543        let info = client.prepare_for_owner(owner).await.map_err(|error| {
544            tracing::error!(%chain_id, %owner, %error, "Chain is not owned");
545            error::Inner::ChainOwnership
546        })?;
547
548        // Try to modify existing chain entry, setting the owner.
549        let modified = self
550            .wallet()
551            .modify(chain_id, |chain| chain.owner = Some(owner))
552            .await
553            .map_err(error::Inner::wallet)?;
554        // If the chain didn't exist, insert a new entry.
555        if modified.is_none() {
556            self.wallet()
557                .insert(
558                    chain_id,
559                    wallet::Chain {
560                        owner: Some(owner),
561                        timestamp: info.timestamp,
562                        epoch: Some(info.epoch),
563                        ..Default::default()
564                    },
565                )
566                .await
567                .map_err(error::Inner::wallet)
568                .context("assigning new chain")?;
569        }
570        Ok(())
571    }
572
573    /// Applies the given function to the chain client.
574    ///
575    /// Updates the wallet regardless of the outcome. As long as the function returns a round
576    /// timeout, it will wait and retry.
577    pub async fn apply_client_command<E, F, Fut, T>(
578        &mut self,
579        client: &ChainClient<Env>,
580        mut f: F,
581    ) -> Result<T, Error>
582    where
583        F: FnMut(&ChainClient<Env>) -> Fut,
584        Fut: Future<Output = Result<ClientOutcome<T>, E>>,
585        Error: From<E>,
586    {
587        client.prepare_chain().await?;
588        // Try applying f optimistically without validator notifications. Return if committed.
589        let result = f(client).await;
590        self.update_wallet_from_client(client).await?;
591        match result? {
592            ClientOutcome::Committed(t) => return Ok(t),
593            ClientOutcome::Conflict(certificate) => {
594                return Err(chain_client::Error::Conflict(certificate.hash()).into());
595            }
596            ClientOutcome::WaitForTimeout(_) => {}
597        }
598
599        // Start listening for notifications, so we learn about new rounds and blocks.
600        let (listener, _listen_handle, mut notification_stream) = client.listen().await?;
601        self.chain_listeners.spawn_task(listener);
602
603        loop {
604            // Try applying f. Return if committed.
605            let result = f(client).await;
606            self.update_wallet_from_client(client).await?;
607            let timeout = match result? {
608                ClientOutcome::Committed(t) => return Ok(t),
609                ClientOutcome::Conflict(certificate) => {
610                    return Err(chain_client::Error::Conflict(certificate.hash()).into());
611                }
612                ClientOutcome::WaitForTimeout(timeout) => timeout,
613            };
614            // Otherwise wait and try again in the next round.
615            util::wait_for_next_round(&mut notification_stream, timeout).await;
616        }
617    }
618
619    pub async fn ownership(&mut self, chain_id: Option<ChainId>) -> Result<ChainOwnership, Error> {
620        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
621        let client = self.make_chain_client(chain_id).await?;
622        let info = client.chain_info().await?;
623        Ok(info.manager.ownership)
624    }
625
626    pub async fn change_ownership(
627        &mut self,
628        chain_id: Option<ChainId>,
629        ownership_config: ChainOwnershipConfig,
630    ) -> Result<(), Error> {
631        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
632        let chain_client = self.make_chain_client(chain_id).await?;
633        info!(
634            ?ownership_config, %chain_id, preferred_owner=?chain_client.preferred_owner(),
635            "Changing ownership of a chain"
636        );
637        let time_start = Instant::now();
638        let mut ownership = chain_client.query_chain_ownership().await?;
639        ownership_config.update(&mut ownership)?;
640
641        if ownership.super_owners.is_empty() && ownership.owners.is_empty() {
642            tracing::error!("At least one owner or super owner of the chain has to be set.");
643            return Err(error::Inner::ChainOwnership.into());
644        }
645
646        let certificate = self
647            .apply_client_command(&chain_client, |chain_client| {
648                let ownership = ownership.clone();
649                let chain_client = chain_client.clone();
650                async move {
651                    chain_client
652                        .change_ownership(ownership)
653                        .await
654                        .map_err(Error::from)
655                        .context("Failed to change ownership")
656                }
657            })
658            .await?;
659        let time_total = time_start.elapsed();
660        info!("Operation confirmed after {} ms", time_total.as_millis());
661        debug!("{:?}", certificate);
662        Ok(())
663    }
664
665    pub async fn set_preferred_owner(
666        &mut self,
667        chain_id: Option<ChainId>,
668        preferred_owner: AccountOwner,
669    ) -> Result<(), Error> {
670        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
671        let mut chain_client = self.make_chain_client(chain_id).await?;
672        let old_owner = chain_client.preferred_owner();
673        info!(%chain_id, ?old_owner, %preferred_owner, "Changing preferred owner for chain");
674        chain_client.set_preferred_owner(preferred_owner);
675        self.update_wallet_from_client(&chain_client).await?;
676        info!("New preferred owner set");
677        Ok(())
678    }
679
680    pub async fn check_compatible_version_info(
681        &self,
682        address: &str,
683        node: &impl ValidatorNode,
684    ) -> Result<VersionInfo, Error> {
685        match node.get_version_info().await {
686            Ok(version_info) if version_info.is_compatible_with(&linera_version::VERSION_INFO) => {
687                debug!(
688                    "Version information for validator {address}: {}",
689                    version_info
690                );
691                Ok(version_info)
692            }
693            Ok(version_info) => Err(error::Inner::UnexpectedVersionInfo {
694                remote: Box::new(version_info),
695                local: Box::new(linera_version::VERSION_INFO.clone()),
696            }
697            .into()),
698            Err(error) => Err(error::Inner::UnavailableVersionInfo {
699                address: address.to_string(),
700                error: Box::new(error),
701            }
702            .into()),
703        }
704    }
705
706    pub async fn check_matching_network_description(
707        &self,
708        address: &str,
709        node: &impl ValidatorNode,
710    ) -> Result<CryptoHash, Error> {
711        let network_description = self.genesis_config.network_description();
712        match node.get_network_description().await {
713            Ok(description) => {
714                if description == network_description {
715                    Ok(description.genesis_config_hash)
716                } else {
717                    Err(error::Inner::UnexpectedNetworkDescription {
718                        remote: Box::new(description),
719                        local: Box::new(network_description),
720                    }
721                    .into())
722                }
723            }
724            Err(error) => Err(error::Inner::UnavailableNetworkDescription {
725                address: address.to_string(),
726                error: Box::new(error),
727            }
728            .into()),
729        }
730    }
731
732    pub async fn check_validator_chain_info_response(
733        &self,
734        public_key: Option<&ValidatorPublicKey>,
735        address: &str,
736        node: &impl ValidatorNode,
737        chain_id: ChainId,
738    ) -> Result<ChainInfo, Error> {
739        let query = ChainInfoQuery::new(chain_id).with_manager_values();
740        match node.handle_chain_info_query(query).await {
741            Ok(response) => {
742                debug!(
743                    "Validator {address} sees chain {chain_id} at block height {} and epoch {:?}",
744                    response.info.next_block_height, response.info.epoch,
745                );
746                if let Some(public_key) = public_key {
747                    if response.check(*public_key).is_ok() {
748                        debug!("Signature for public key {public_key} is OK.");
749                    } else {
750                        return Err(error::Inner::InvalidSignature {
751                            public_key: *public_key,
752                        }
753                        .into());
754                    }
755                } else {
756                    warn!("Not checking signature as public key was not given");
757                }
758                Ok(*response.info)
759            }
760            Err(error) => Err(error::Inner::UnavailableChainInfo {
761                address: address.to_string(),
762                chain_id,
763                error: Box::new(error),
764            }
765            .into()),
766        }
767    }
768
769    /// Query a validator for version info, network description, and chain info.
770    ///
771    /// Returns a `ValidatorQueryResults` struct with the results of all three queries.
772    pub async fn query_validator(
773        &self,
774        address: &str,
775        node: &impl ValidatorNode,
776        chain_id: ChainId,
777        public_key: Option<&ValidatorPublicKey>,
778    ) -> ValidatorQueryResults {
779        let version_info = self.check_compatible_version_info(address, node).await;
780        let genesis_config_hash = self.check_matching_network_description(address, node).await;
781        let chain_info = self
782            .check_validator_chain_info_response(public_key, address, node, chain_id)
783            .await;
784
785        ValidatorQueryResults {
786            version_info,
787            genesis_config_hash,
788            chain_info,
789        }
790    }
791
792    /// Query the local node for version info, network description, and chain info.
793    ///
794    /// Returns a `ValidatorQueryResults` struct with the local node's information.
795    pub async fn query_local_node(
796        &self,
797        chain_id: ChainId,
798    ) -> Result<ValidatorQueryResults, Error> {
799        let version_info = Ok(linera_version::VERSION_INFO.clone());
800        let genesis_config_hash = Ok(self
801            .genesis_config
802            .network_description()
803            .genesis_config_hash);
804        let chain_info = self
805            .make_chain_client(chain_id)
806            .await?
807            .chain_info_with_manager_values()
808            .await
809            .map(|info| *info)
810            .map_err(|e| e.into());
811
812        Ok(ValidatorQueryResults {
813            version_info,
814            genesis_config_hash,
815            chain_info,
816        })
817    }
818}
819
820#[cfg(feature = "fs")]
821impl<Env: Environment> ClientContext<Env> {
822    pub async fn publish_module(
823        &mut self,
824        chain_client: &ChainClient<Env>,
825        contract: PathBuf,
826        service: PathBuf,
827        vm_runtime: VmRuntime,
828    ) -> Result<ModuleId, Error> {
829        info!("Loading bytecode files");
830        let contract_bytecode = Bytecode::load_from_file(&contract).await.map_err(|e| {
831            std::io::Error::new(
832                e.kind(),
833                format!("failed to load contract bytecode from {contract:?}: {e}"),
834            )
835        })?;
836        let service_bytecode = Bytecode::load_from_file(&service).await.map_err(|e| {
837            std::io::Error::new(
838                e.kind(),
839                format!("failed to load service bytecode from {service:?}: {e}"),
840            )
841        })?;
842
843        info!("Publishing module");
844        let (blobs, module_id) =
845            create_bytecode_blobs(contract_bytecode, service_bytecode, vm_runtime).await;
846        let (module_id, _) = self
847            .apply_client_command(chain_client, |chain_client| {
848                let blobs = blobs.clone();
849                let chain_client = chain_client.clone();
850                async move {
851                    chain_client
852                        .publish_module_blobs(blobs, module_id)
853                        .await
854                        .context("Failed to publish module")
855                }
856            })
857            .await?;
858
859        info!("{}", "Module published successfully!");
860
861        info!("Synchronizing client and processing inbox");
862        self.process_inbox(chain_client).await?;
863        Ok(module_id)
864    }
865
866    pub async fn publish_data_blob(
867        &mut self,
868        chain_client: &ChainClient<Env>,
869        blob_path: PathBuf,
870    ) -> Result<CryptoHash, Error> {
871        info!("Loading data blob file");
872        let blob_bytes = fs::read(&blob_path).map_err(|e| {
873            std::io::Error::new(
874                e.kind(),
875                format!("failed to load data blob bytes from {blob_path:?}: {e}"),
876            )
877        })?;
878
879        info!("Publishing data blob");
880        self.apply_client_command(chain_client, |chain_client| {
881            let blob_bytes = blob_bytes.clone();
882            let chain_client = chain_client.clone();
883            async move {
884                chain_client
885                    .publish_data_blob(blob_bytes)
886                    .await
887                    .context("Failed to publish data blob")
888            }
889        })
890        .await?;
891
892        info!("{}", "Data blob published successfully!");
893        Ok(CryptoHash::new(&BlobContent::new_data(blob_bytes)))
894    }
895
896    // TODO(#2490): Consider removing or renaming this.
897    pub async fn read_data_blob(
898        &mut self,
899        chain_client: &ChainClient<Env>,
900        hash: CryptoHash,
901    ) -> Result<(), Error> {
902        info!("Verifying data blob");
903        self.apply_client_command(chain_client, |chain_client| {
904            let chain_client = chain_client.clone();
905            async move {
906                chain_client
907                    .read_data_blob(hash)
908                    .await
909                    .context("Failed to verify data blob")
910            }
911        })
912        .await?;
913
914        info!("{}", "Data blob verified successfully!");
915        Ok(())
916    }
917}
918
919#[cfg(not(web))]
920impl<Env: Environment> ClientContext<Env> {
921    pub async fn prepare_for_benchmark(
922        &mut self,
923        num_chains: usize,
924        tokens_per_chain: Amount,
925        fungible_application_id: Option<ApplicationId>,
926        pub_keys: Vec<AccountPublicKey>,
927        chains_config_path: Option<&Path>,
928        close_chains: bool,
929    ) -> Result<Vec<ChainClient<Env>>, Error> {
930        let start = Instant::now();
931        // Below all block proposals are supposed to succeed without retries, we
932        // must make sure that all incoming payments have been accepted on-chain
933        // and that no validator is missing user certificates.
934        self.process_inboxes_and_force_validator_updates().await;
935        info!(
936            "Processed inboxes and forced validator updates in {} ms",
937            start.elapsed().as_millis()
938        );
939
940        let start = Instant::now();
941        let (benchmark_chains, chain_clients) = self
942            .make_benchmark_chains(
943                num_chains,
944                tokens_per_chain,
945                pub_keys,
946                chains_config_path.is_some(),
947                close_chains,
948            )
949            .await?;
950        info!(
951            "Got {} chains in {} ms",
952            num_chains,
953            start.elapsed().as_millis()
954        );
955
956        if let Some(id) = fungible_application_id {
957            let start = Instant::now();
958            self.supply_fungible_tokens(&benchmark_chains, id).await?;
959            info!(
960                "Supplied fungible tokens in {} ms",
961                start.elapsed().as_millis()
962            );
963            // Need to process inboxes to make sure the chains receive the supplied tokens.
964            let start = Instant::now();
965            for chain_client in &chain_clients {
966                chain_client.process_inbox().await?;
967            }
968            info!(
969                "Processed inboxes after supplying fungible tokens in {} ms",
970                start.elapsed().as_millis()
971            );
972        }
973
974        let all_chains = Benchmark::<Env>::get_all_chains(chains_config_path, &benchmark_chains)?;
975        let known_chain_ids: HashSet<_> = benchmark_chains.iter().map(|(id, _)| *id).collect();
976        let unknown_chain_ids: Vec<_> = all_chains
977            .iter()
978            .filter(|id| !known_chain_ids.contains(id))
979            .copied()
980            .collect();
981        if !unknown_chain_ids.is_empty() {
982            // The current client won't have the blobs for the chains in the other wallets. Even
983            // though it will eventually get those blobs, we're getting a head start here and
984            // fetching those blobs in advance.
985            for chain_id in &unknown_chain_ids {
986                self.client.get_chain_description(*chain_id).await?;
987            }
988        }
989
990        Ok(chain_clients)
991    }
992
993    pub async fn wrap_up_benchmark(
994        &mut self,
995        chain_clients: Vec<ChainClient<Env>>,
996        close_chains: bool,
997        wrap_up_max_in_flight: usize,
998    ) -> Result<(), Error> {
999        if close_chains {
1000            info!("Closing chains...");
1001            let stream = stream::iter(chain_clients)
1002                .map(|chain_client| async move {
1003                    Benchmark::<Env>::close_benchmark_chain(&chain_client).await?;
1004                    info!("Closed chain {:?}", chain_client.chain_id());
1005                    Ok::<(), BenchmarkError>(())
1006                })
1007                .buffer_unordered(wrap_up_max_in_flight);
1008            stream.try_collect::<Vec<_>>().await?;
1009        } else {
1010            info!("Processing inbox for all chains...");
1011            let stream = stream::iter(chain_clients.clone())
1012                .map(|chain_client| async move {
1013                    chain_client.process_inbox().await?;
1014                    info!("Processed inbox for chain {:?}", chain_client.chain_id());
1015                    Ok::<(), chain_client::Error>(())
1016                })
1017                .buffer_unordered(wrap_up_max_in_flight);
1018            stream.try_collect::<Vec<_>>().await?;
1019
1020            info!("Updating wallet from chain clients...");
1021            for chain_client in chain_clients {
1022                let info = chain_client.chain_info().await?;
1023                let client_owner = chain_client.preferred_owner();
1024                let pending_fast_proposal = chain_client
1025                    .pending_proposal()
1026                    .await
1027                    .filter(|p| p.round.is_some_and(|r| r.is_fast()));
1028                self.wallet()
1029                    .insert(
1030                        info.chain_id,
1031                        wallet::Chain {
1032                            pending_fast_proposal,
1033                            owner: client_owner,
1034                            ..info.as_ref().into()
1035                        },
1036                    )
1037                    .await
1038                    .map_err(error::Inner::wallet)?;
1039            }
1040        }
1041
1042        Ok(())
1043    }
1044
1045    async fn process_inboxes_and_force_validator_updates(&mut self) {
1046        let mut join_set = task::JoinSet::new();
1047
1048        let chain_clients: Vec<_> = self
1049            .wallet()
1050            .owned_chain_ids()
1051            .map_err(|e| error::Inner::wallet(e).into())
1052            .and_then(|id| self.make_chain_client(id))
1053            .try_collect()
1054            .await
1055            .unwrap();
1056
1057        for chain_client in chain_clients {
1058            join_set.spawn(async move {
1059                Self::process_inbox_without_updating_wallet(&chain_client)
1060                    .await
1061                    .expect("Processing inbox should not fail!");
1062                chain_client
1063            });
1064        }
1065
1066        for chain_client in join_set.join_all().await {
1067            self.update_wallet_from_client(&chain_client).await.unwrap();
1068        }
1069    }
1070
1071    async fn process_inbox_without_updating_wallet(
1072        chain_client: &ChainClient<Env>,
1073    ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
1074        // Try processing the inbox optimistically without waiting for validator notifications.
1075        chain_client.synchronize_from_validators().await?;
1076        let (certificates, maybe_timeout) = chain_client.process_inbox_without_prepare().await?;
1077        assert!(
1078            maybe_timeout.is_none(),
1079            "Should not timeout within benchmark!"
1080        );
1081
1082        Ok(certificates)
1083    }
1084
1085    /// Creates chains if necessary, and returns a map of exactly `num_chains` chain IDs
1086    /// with key pairs, as well as a map of the chain clients.
1087    ///
1088    /// If `close_chains` is true, chains are not looked up from or stored in the wallet,
1089    /// since they will be closed after the benchmark and shouldn't be reused.
1090    async fn make_benchmark_chains(
1091        &mut self,
1092        num_chains: usize,
1093        balance: Amount,
1094        pub_keys: Vec<AccountPublicKey>,
1095        wallet_only: bool,
1096        close_chains: bool,
1097    ) -> Result<(Vec<(ChainId, AccountOwner)>, Vec<ChainClient<Env>>), Error> {
1098        let mut chains_found_in_wallet = 0;
1099        let mut benchmark_chains = Vec::with_capacity(num_chains);
1100        let mut chain_clients = Vec::with_capacity(num_chains);
1101        let start = Instant::now();
1102
1103        // When close_chains is true and we're creating our own chains (not wallet_only),
1104        // skip wallet lookup to avoid picking up existing chains that would then be closed.
1105        // When wallet_only is true, chains were pre-created by the parent process and must
1106        // be read from the wallet.
1107        if !close_chains || wallet_only {
1108            let mut owned_chain_ids = std::pin::pin!(self.wallet().owned_chain_ids());
1109            while let Some(chain_id) = owned_chain_ids.next().await {
1110                let chain_id = chain_id.map_err(error::Inner::wallet)?;
1111                if chains_found_in_wallet == num_chains {
1112                    break;
1113                }
1114                let chain_client = self.make_chain_client(chain_id).await?;
1115                let ownership = chain_client.chain_info().await?.manager.ownership;
1116                if !ownership.owners.is_empty() || ownership.super_owners.len() != 1 {
1117                    continue;
1118                }
1119                let owner = *ownership.super_owners.first().unwrap();
1120                chain_client.process_inbox().await?;
1121                benchmark_chains.push((chain_id, owner));
1122                chain_clients.push(chain_client);
1123                chains_found_in_wallet += 1;
1124            }
1125            info!(
1126                "Got {} chains from the wallet in {} ms",
1127                benchmark_chains.len(),
1128                start.elapsed().as_millis()
1129            );
1130        }
1131
1132        let num_chains_to_create = num_chains - chains_found_in_wallet;
1133
1134        let default_chain_client = self.make_chain_client(self.default_chain()).await?;
1135
1136        if num_chains_to_create > 0 {
1137            if wallet_only {
1138                return Err(
1139                    error::Inner::Benchmark(BenchmarkError::NotEnoughChainsInWallet(
1140                        num_chains,
1141                        chains_found_in_wallet,
1142                    ))
1143                    .into(),
1144                );
1145            }
1146            let mut pub_keys_iter = pub_keys.into_iter().take(num_chains_to_create);
1147            let operations_per_block = 900; // Over this we seem to hit the block size limits.
1148            for i in (0..num_chains_to_create).step_by(operations_per_block) {
1149                let num_new_chains = operations_per_block.min(num_chains_to_create - i);
1150                // Each chain gets its own unique owner (previously all chains in a batch
1151                // shared one owner, which could cause conflicts during benchmarking).
1152                let owners: Vec<AccountOwner> = (&mut pub_keys_iter)
1153                    .take(num_new_chains)
1154                    .map(|pk| pk.into())
1155                    .collect();
1156
1157                let certificate = Self::execute_open_chains_operations(
1158                    &default_chain_client,
1159                    balance,
1160                    owners.clone(),
1161                )
1162                .await?;
1163                info!("Block executed successfully");
1164
1165                let block = certificate.block();
1166                for (i, owner) in owners.into_iter().enumerate() {
1167                    let chain_id = block.body.blobs[i]
1168                        .iter()
1169                        .find(|blob| blob.id().blob_type == BlobType::ChainDescription)
1170                        .map(|blob| ChainId(blob.id().hash))
1171                        .expect("failed to create a new chain");
1172                    self.client
1173                        .extend_chain_mode(chain_id, ListeningMode::FullChain);
1174
1175                    let mut chain_client = self.client.create_chain_client(
1176                        chain_id,
1177                        None,
1178                        BlockHeight::ZERO,
1179                        &None,
1180                        Some(owner),
1181                        self.timing_sender(),
1182                        false,
1183                    );
1184                    chain_client.set_preferred_owner(owner);
1185                    chain_client.process_inbox().await?;
1186                    benchmark_chains.push((chain_id, owner));
1187                    chain_clients.push(chain_client);
1188                }
1189            }
1190
1191            info!(
1192                "Created {} chains in {} ms",
1193                num_chains_to_create,
1194                start.elapsed().as_millis()
1195            );
1196        }
1197
1198        // Only update wallet if chains will be reused (not closed after benchmark)
1199        if !close_chains {
1200            info!("Updating wallet from client");
1201            self.update_wallet_from_client(&default_chain_client)
1202                .await?;
1203        }
1204        info!("Retrying pending outgoing messages");
1205        default_chain_client
1206            .retry_pending_outgoing_messages()
1207            .await
1208            .context("outgoing messages to create the new chains should be delivered")?;
1209        info!("Processing default chain inbox");
1210        default_chain_client.process_inbox().await?;
1211
1212        assert_eq!(
1213            benchmark_chains.len(),
1214            chain_clients.len(),
1215            "benchmark_chains and chain_clients must have the same size"
1216        );
1217
1218        Ok((benchmark_chains, chain_clients))
1219    }
1220
1221    async fn execute_open_chains_operations(
1222        chain_client: &ChainClient<Env>,
1223        balance: Amount,
1224        owners: Vec<AccountOwner>,
1225    ) -> Result<ConfirmedBlockCertificate, Error> {
1226        let operations: Vec<_> = owners
1227            .iter()
1228            .map(|owner| {
1229                let config = OpenChainConfig {
1230                    ownership: ChainOwnership::single_super(*owner),
1231                    balance,
1232                    application_permissions: Default::default(),
1233                };
1234                Operation::system(SystemOperation::OpenChain(config))
1235            })
1236            .collect();
1237        info!("Executing {} OpenChain operations", operations.len());
1238        Ok(chain_client
1239            .execute_operations(operations, vec![])
1240            .await?
1241            .expect("should execute block with OpenChain operations"))
1242    }
1243
1244    /// Supplies fungible tokens to the chains.
1245    async fn supply_fungible_tokens(
1246        &mut self,
1247        key_pairs: &[(ChainId, AccountOwner)],
1248        application_id: ApplicationId,
1249    ) -> Result<(), Error> {
1250        let default_chain_id = self.default_chain();
1251        let default_key = self
1252            .wallet()
1253            .get(default_chain_id)
1254            .await
1255            .unwrap()
1256            .unwrap()
1257            .owner
1258            .unwrap();
1259        // This should be enough to run the benchmark at 1M TPS for an hour.
1260        let amount = Amount::from_nanos(4);
1261        let operations: Vec<Operation> = key_pairs
1262            .iter()
1263            .map(|(chain_id, owner)| {
1264                fungible_transfer(application_id, *chain_id, default_key, *owner, amount)
1265            })
1266            .collect();
1267        let chain_client = self.make_chain_client(default_chain_id).await?;
1268        // Put at most 1000 fungible token operations in each block.
1269        for operation_chunk in operations.chunks(1000) {
1270            chain_client
1271                .execute_operations(operation_chunk.to_vec(), vec![])
1272                .await?
1273                .expect("should execute block with Transfer operations");
1274        }
1275        self.update_wallet_from_client(&chain_client).await?;
1276
1277        Ok(())
1278    }
1279}