linera_client/
client_context.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use futures::Future;
7use linera_base::{
8    crypto::{CryptoHash, ValidatorPublicKey},
9    data_types::{BlockHeight, Epoch, Timestamp},
10    identifiers::{Account, AccountOwner, ChainId},
11    ownership::ChainOwnership,
12    time::{Duration, Instant},
13};
14use linera_chain::types::ConfirmedBlockCertificate;
15use linera_core::{
16    client::{ChainClient, Client, ListeningMode},
17    data_types::{ChainInfoQuery, ClientOutcome},
18    join_set_ext::JoinSet,
19    node::ValidatorNode,
20    Environment, JoinSetExt as _,
21};
22use linera_persistent::{Persist, PersistExt as _};
23use linera_rpc::node_provider::{NodeOptions, NodeProvider};
24use linera_version::VersionInfo;
25use thiserror_context::Context;
26use tracing::{debug, info};
27#[cfg(not(web))]
28use {
29    crate::{
30        benchmark::{Benchmark, BenchmarkError},
31        client_metrics::ClientMetrics,
32    },
33    futures::{stream, StreamExt, TryStreamExt},
34    linera_base::{
35        crypto::AccountPublicKey,
36        data_types::Amount,
37        identifiers::{ApplicationId, BlobType},
38    },
39    linera_core::client::ChainClientError,
40    linera_execution::{
41        system::{OpenChainConfig, SystemOperation},
42        Operation,
43    },
44    std::{collections::HashSet, iter, path::Path},
45    tokio::{sync::mpsc, task},
46};
47#[cfg(feature = "fs")]
48use {
49    linera_base::{
50        data_types::{BlobContent, Bytecode},
51        identifiers::ModuleId,
52        vm::VmRuntime,
53    },
54    linera_core::client::create_bytecode_blobs,
55    std::{fs, path::PathBuf},
56};
57
58use crate::{
59    chain_listener::{self, ClientContext as _},
60    client_options::{ChainOwnershipConfig, ClientContextOptions},
61    error, util,
62    wallet::{UserChain, Wallet},
63    Error,
64};
65
66pub struct ClientContext<Env: Environment, W> {
67    pub wallet: W,
68    pub client: Arc<Client<Env>>,
69    pub send_timeout: Duration,
70    pub recv_timeout: Duration,
71    pub retry_delay: Duration,
72    pub max_retries: u32,
73    pub chain_listeners: JoinSet,
74    #[cfg(not(web))]
75    pub client_metrics: Option<ClientMetrics>,
76}
77
78impl<Env: Environment, W> chain_listener::ClientContext for ClientContext<Env, W>
79where
80    W: Persist<Target = Wallet>,
81{
82    type Environment = Env;
83
84    fn wallet(&self) -> &Wallet {
85        &self.wallet
86    }
87
88    fn storage(&self) -> &Env::Storage {
89        self.client.storage_client()
90    }
91
92    fn client(&self) -> &Arc<Client<Env>> {
93        &self.client
94    }
95
96    #[cfg(not(web))]
97    fn timing_sender(
98        &self,
99    ) -> Option<mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
100        self.client_metrics
101            .as_ref()
102            .map(|metrics| metrics.timing_sender.clone())
103    }
104
105    async fn update_wallet_for_new_chain(
106        &mut self,
107        chain_id: ChainId,
108        owner: Option<AccountOwner>,
109        timestamp: Timestamp,
110        epoch: Epoch,
111    ) -> Result<(), Error> {
112        self.update_wallet_for_new_chain(chain_id, owner, timestamp, epoch)
113            .await
114    }
115
116    async fn update_wallet(&mut self, client: &ChainClient<Env>) -> Result<(), Error> {
117        self.update_wallet_from_client(client).await
118    }
119}
120
121impl<S, Si, W> ClientContext<linera_core::environment::Impl<S, NodeProvider, Si>, W>
122where
123    S: linera_core::environment::Storage,
124    Si: linera_core::environment::Signer,
125    W: Persist<Target = Wallet>,
126{
127    pub fn new(storage: S, options: ClientContextOptions, wallet: W, signer: Si) -> Self {
128        #[cfg(not(web))]
129        let timing_config = options.to_timing_config();
130        let node_provider = NodeProvider::new(NodeOptions {
131            send_timeout: options.send_timeout,
132            recv_timeout: options.recv_timeout,
133            retry_delay: options.retry_delay,
134            max_retries: options.max_retries,
135        });
136        let chain_ids = wallet.chain_ids();
137        let name = match chain_ids.len() {
138            0 => "Client node".to_string(),
139            1 => format!("Client node for {:.8}", chain_ids[0]),
140            n => format!("Client node for {:.8} and {} others", chain_ids[0], n - 1),
141        };
142        let client = Client::new(
143            linera_core::environment::Impl {
144                network: node_provider,
145                storage,
146                signer,
147            },
148            wallet.genesis_admin_chain(),
149            options.long_lived_services,
150            chain_ids,
151            name,
152            options.chain_worker_ttl,
153            options.to_chain_client_options(),
154        );
155
156        #[cfg(not(web))]
157        let client_metrics = if timing_config.enabled {
158            Some(ClientMetrics::new(timing_config))
159        } else {
160            None
161        };
162
163        ClientContext {
164            client: Arc::new(client),
165            wallet,
166            send_timeout: options.send_timeout,
167            recv_timeout: options.recv_timeout,
168            retry_delay: options.retry_delay,
169            max_retries: options.max_retries,
170            chain_listeners: JoinSet::default(),
171            #[cfg(not(web))]
172            client_metrics,
173        }
174    }
175
176    #[cfg(with_testing)]
177    pub fn new_test_client_context(storage: S, wallet: W, signer: Si) -> Self {
178        use linera_core::{client::ChainClientOptions, node::CrossChainMessageDelivery};
179
180        let send_recv_timeout = Duration::from_millis(4000);
181        let retry_delay = Duration::from_millis(1000);
182        let max_retries = 10;
183
184        let node_options = NodeOptions {
185            send_timeout: send_recv_timeout,
186            recv_timeout: send_recv_timeout,
187            retry_delay,
188            max_retries,
189        };
190        let chain_ids = wallet.chain_ids();
191        let name = match chain_ids.len() {
192            0 => "Client node".to_string(),
193            1 => format!("Client node for {:.8}", chain_ids[0]),
194            n => format!("Client node for {:.8} and {} others", chain_ids[0], n - 1),
195        };
196        let client = Client::new(
197            linera_core::environment::Impl {
198                storage,
199                network: NodeProvider::new(node_options),
200                signer,
201            },
202            wallet.genesis_admin_chain(),
203            false,
204            chain_ids,
205            name,
206            Duration::from_secs(30),
207            ChainClientOptions {
208                cross_chain_message_delivery: CrossChainMessageDelivery::Blocking,
209                ..ChainClientOptions::test_default()
210            },
211        );
212
213        ClientContext {
214            client: Arc::new(client),
215            wallet,
216            send_timeout: send_recv_timeout,
217            recv_timeout: send_recv_timeout,
218            retry_delay,
219            max_retries,
220            chain_listeners: JoinSet::default(),
221            client_metrics: None,
222        }
223    }
224}
225
226impl<Env: Environment, W: Persist<Target = Wallet>> ClientContext<Env, W> {
227    /// Returns a reference to the wallet.
228    pub fn wallet(&self) -> &Wallet {
229        &self.wallet
230    }
231
232    /// Returns the wallet as a mutable reference.
233    pub fn wallet_mut(&mut self) -> &mut W {
234        &mut self.wallet
235    }
236
237    pub async fn mutate_wallet<R: Send>(
238        &mut self,
239        mutation: impl FnOnce(&mut Wallet) -> R + Send,
240    ) -> Result<R, Error> {
241        self.wallet
242            .mutate(mutation)
243            .await
244            .map_err(|e| error::Inner::Persistence(Box::new(e)).into())
245    }
246
247    /// Retrieve the default account. Current this is the common account of the default
248    /// chain.
249    pub fn default_account(&self) -> Account {
250        Account::chain(self.default_chain())
251    }
252
253    /// Retrieve the default chain.
254    pub fn default_chain(&self) -> ChainId {
255        self.wallet
256            .default_chain()
257            .expect("No chain specified in wallet with no default chain")
258    }
259
260    pub fn first_non_admin_chain(&self) -> ChainId {
261        self.wallet
262            .first_non_admin_chain()
263            .expect("No non-admin chain specified in wallet with no non-admin chain")
264    }
265
266    pub fn make_node_provider(&self) -> NodeProvider {
267        NodeProvider::new(self.make_node_options())
268    }
269
270    fn make_node_options(&self) -> NodeOptions {
271        NodeOptions {
272            send_timeout: self.send_timeout,
273            recv_timeout: self.recv_timeout,
274            retry_delay: self.retry_delay,
275            max_retries: self.max_retries,
276        }
277    }
278
279    #[cfg(not(web))]
280    pub fn client_metrics(&self) -> Option<&ClientMetrics> {
281        self.client_metrics.as_ref()
282    }
283
284    pub async fn save_wallet(&mut self) -> Result<(), Error> {
285        self.wallet
286            .persist()
287            .await
288            .map_err(|e| error::Inner::Persistence(Box::new(e)).into())
289    }
290
291    pub async fn update_wallet_from_client<Env_: Environment>(
292        &mut self,
293        client: &ChainClient<Env_>,
294    ) -> Result<(), Error> {
295        let info = client.chain_info().await?;
296        let client_owner = client.preferred_owner();
297        let pending_proposal = client.pending_proposal().clone();
298        self.wallet
299            .as_mut()
300            .update_from_info(pending_proposal, client_owner, &info);
301        self.save_wallet().await
302    }
303
304    /// Remembers the new chain and its owner (if any) in the wallet.
305    pub async fn update_wallet_for_new_chain(
306        &mut self,
307        chain_id: ChainId,
308        owner: Option<AccountOwner>,
309        timestamp: Timestamp,
310        epoch: Epoch,
311    ) -> Result<(), Error> {
312        if self.wallet.get(chain_id).is_none() {
313            self.mutate_wallet(|w| {
314                w.insert(UserChain {
315                    chain_id,
316                    owner,
317                    block_hash: None,
318                    timestamp,
319                    next_block_height: BlockHeight::ZERO,
320                    pending_proposal: None,
321                    epoch: Some(epoch),
322                })
323            })
324            .await?;
325        }
326
327        Ok(())
328    }
329
330    pub async fn process_inbox(
331        &mut self,
332        chain_client: &ChainClient<Env>,
333    ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
334        let mut certificates = Vec::new();
335        // Try processing the inbox optimistically without waiting for validator notifications.
336        let (new_certificates, maybe_timeout) = {
337            chain_client.synchronize_from_validators().await?;
338            let result = chain_client.process_inbox_without_prepare().await;
339            self.update_wallet_from_client(chain_client).await?;
340            if result.is_err() {
341                self.save_wallet().await?;
342            }
343            result?
344        };
345        certificates.extend(new_certificates);
346        if maybe_timeout.is_none() {
347            self.save_wallet().await?;
348            return Ok(certificates);
349        }
350
351        // Start listening for notifications, so we learn about new rounds and blocks.
352        let (listener, _listen_handle, mut notification_stream) =
353            chain_client.listen(ListeningMode::FullChain).await?;
354        self.chain_listeners.spawn_task(listener);
355
356        loop {
357            let (new_certificates, maybe_timeout) = {
358                let result = chain_client.process_inbox().await;
359                self.update_wallet_from_client(chain_client).await?;
360                if result.is_err() {
361                    self.save_wallet().await?;
362                }
363                result?
364            };
365            certificates.extend(new_certificates);
366            if let Some(timestamp) = maybe_timeout {
367                util::wait_for_next_round(&mut notification_stream, timestamp).await
368            } else {
369                self.save_wallet().await?;
370                return Ok(certificates);
371            }
372        }
373    }
374
375    pub async fn assign_new_chain_to_key(
376        &mut self,
377        chain_id: ChainId,
378        owner: AccountOwner,
379    ) -> Result<(), Error> {
380        self.client.track_chain(chain_id);
381        let client = self.make_chain_client(chain_id);
382        let chain_description = client.get_chain_description().await?;
383        let config = chain_description.config();
384
385        if !config.ownership.verify_owner(&owner) {
386            tracing::error!(
387                "The chain with the ID returned by the faucet is not owned by you. \
388                Please make sure you are connecting to a genuine faucet."
389            );
390            return Err(error::Inner::ChainOwnership.into());
391        }
392
393        self.wallet_mut()
394            .mutate(|w| {
395                w.assign_new_chain_to_owner(
396                    owner,
397                    chain_id,
398                    chain_description.timestamp(),
399                    chain_description.config().epoch,
400                )
401            })
402            .await
403            .map_err(|e| error::Inner::Persistence(Box::new(e)))?
404            .context("assigning new chain")?;
405        Ok(())
406    }
407
408    /// Applies the given function to the chain client.
409    ///
410    /// Updates the wallet regardless of the outcome. As long as the function returns a round
411    /// timeout, it will wait and retry.
412    pub async fn apply_client_command<E, F, Fut, T>(
413        &mut self,
414        client: &ChainClient<Env>,
415        mut f: F,
416    ) -> Result<T, Error>
417    where
418        F: FnMut(&ChainClient<Env>) -> Fut,
419        Fut: Future<Output = Result<ClientOutcome<T>, E>>,
420        Error: From<E>,
421    {
422        client.prepare_chain().await?;
423        // Try applying f optimistically without validator notifications. Return if committed.
424        let result = f(client).await;
425        self.update_wallet_from_client(client).await?;
426        if let ClientOutcome::Committed(t) = result? {
427            return Ok(t);
428        }
429
430        // Start listening for notifications, so we learn about new rounds and blocks.
431        let (listener, _listen_handle, mut notification_stream) =
432            client.listen(ListeningMode::FullChain).await?;
433        self.chain_listeners.spawn_task(listener);
434
435        loop {
436            // Try applying f. Return if committed.
437            let result = f(client).await;
438            self.update_wallet_from_client(client).await?;
439            let timeout = match result? {
440                ClientOutcome::Committed(t) => return Ok(t),
441                ClientOutcome::WaitForTimeout(timeout) => timeout,
442            };
443            // Otherwise wait and try again in the next round.
444            util::wait_for_next_round(&mut notification_stream, timeout).await;
445        }
446    }
447
448    pub async fn change_ownership(
449        &mut self,
450        chain_id: Option<ChainId>,
451        ownership_config: ChainOwnershipConfig,
452    ) -> Result<(), Error> {
453        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
454        let chain_client = self.make_chain_client(chain_id);
455        info!(
456            ?ownership_config, %chain_id, preferred_owner=?chain_client.preferred_owner(),
457            "Changing ownership of a chain"
458        );
459        let time_start = Instant::now();
460        let ownership = ChainOwnership::try_from(ownership_config)?;
461
462        let certificate = self
463            .apply_client_command(&chain_client, |chain_client| {
464                let ownership = ownership.clone();
465                let chain_client = chain_client.clone();
466                async move {
467                    chain_client
468                        .change_ownership(ownership)
469                        .await
470                        .map_err(Error::from)
471                        .context("Failed to change ownership")
472                }
473            })
474            .await?;
475        let time_total = time_start.elapsed();
476        info!("Operation confirmed after {} ms", time_total.as_millis());
477        debug!("{:?}", certificate);
478        Ok(())
479    }
480
481    pub async fn set_preferred_owner(
482        &mut self,
483        chain_id: Option<ChainId>,
484        preferred_owner: AccountOwner,
485    ) -> Result<(), Error> {
486        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
487        let mut chain_client = self.make_chain_client(chain_id);
488        let old_owner = chain_client.preferred_owner();
489        info!(%chain_id, ?old_owner, %preferred_owner, "Changing preferred owner for chain");
490        chain_client.set_preferred_owner(preferred_owner);
491        self.update_wallet_from_client(&chain_client).await?;
492        info!("New preferred owner set");
493        Ok(())
494    }
495
496    pub async fn check_compatible_version_info(
497        &self,
498        address: &str,
499        node: &impl ValidatorNode,
500    ) -> Result<VersionInfo, Error> {
501        match node.get_version_info().await {
502            Ok(version_info) if version_info.is_compatible_with(&linera_version::VERSION_INFO) => {
503                info!(
504                    "Version information for validator {address}: {}",
505                    version_info
506                );
507                Ok(version_info)
508            }
509            Ok(version_info) => Err(error::Inner::UnexpectedVersionInfo {
510                remote: Box::new(version_info),
511                local: Box::new(linera_version::VERSION_INFO.clone()),
512            }
513            .into()),
514            Err(error) => Err(error::Inner::UnavailableVersionInfo {
515                address: address.to_string(),
516                error: Box::new(error),
517            }
518            .into()),
519        }
520    }
521
522    pub async fn check_matching_network_description(
523        &self,
524        address: &str,
525        node: &impl ValidatorNode,
526    ) -> Result<CryptoHash, Error> {
527        let network_description = self.wallet().genesis_config().network_description();
528        match node.get_network_description().await {
529            Ok(description) => {
530                if description == network_description {
531                    Ok(description.genesis_config_hash)
532                } else {
533                    Err(error::Inner::UnexpectedNetworkDescription {
534                        remote: Box::new(description),
535                        local: Box::new(network_description),
536                    }
537                    .into())
538                }
539            }
540            Err(error) => Err(error::Inner::UnavailableNetworkDescription {
541                address: address.to_string(),
542                error: Box::new(error),
543            }
544            .into()),
545        }
546    }
547
548    pub async fn check_validator_chain_info_response(
549        &self,
550        public_key: Option<&ValidatorPublicKey>,
551        address: &str,
552        node: &impl ValidatorNode,
553        chain_id: ChainId,
554    ) -> Result<(), Error> {
555        let query = ChainInfoQuery::new(chain_id);
556        match node.handle_chain_info_query(query).await {
557            Ok(response) => {
558                info!(
559                    "Validator {address} sees chain {chain_id} at block height {} and epoch {:?}",
560                    response.info.next_block_height, response.info.epoch,
561                );
562                if let Some(public_key) = public_key {
563                    if response.check(*public_key).is_ok() {
564                        info!("Signature for public key {public_key} is OK.");
565                    } else {
566                        return Err(error::Inner::InvalidSignature {
567                            public_key: *public_key,
568                        }
569                        .into());
570                    }
571                }
572                Ok(())
573            }
574            Err(error) => Err(error::Inner::UnavailableChainInfo {
575                address: address.to_string(),
576                chain_id,
577                error: Box::new(error),
578            }
579            .into()),
580        }
581    }
582}
583
584#[cfg(feature = "fs")]
585impl<Env: Environment, W> ClientContext<Env, W>
586where
587    W: Persist<Target = Wallet>,
588{
589    pub async fn publish_module(
590        &mut self,
591        chain_client: &ChainClient<Env>,
592        contract: PathBuf,
593        service: PathBuf,
594        vm_runtime: VmRuntime,
595    ) -> Result<ModuleId, Error> {
596        info!("Loading bytecode files");
597        let contract_bytecode = Bytecode::load_from_file(&contract)
598            .await
599            .with_context(|| format!("failed to load contract bytecode from {:?}", &contract))?;
600        let service_bytecode = Bytecode::load_from_file(&service)
601            .await
602            .with_context(|| format!("failed to load service bytecode from {:?}", &service))?;
603
604        info!("Publishing module");
605        let (blobs, module_id) =
606            create_bytecode_blobs(contract_bytecode, service_bytecode, vm_runtime).await;
607        let (module_id, _) = self
608            .apply_client_command(chain_client, |chain_client| {
609                let blobs = blobs.clone();
610                let chain_client = chain_client.clone();
611                async move {
612                    chain_client
613                        .publish_module_blobs(blobs, module_id)
614                        .await
615                        .context("Failed to publish module")
616                }
617            })
618            .await?;
619
620        info!("{}", "Module published successfully!");
621
622        info!("Synchronizing client and processing inbox");
623        self.process_inbox(chain_client).await?;
624        Ok(module_id)
625    }
626
627    pub async fn publish_data_blob(
628        &mut self,
629        chain_client: &ChainClient<Env>,
630        blob_path: PathBuf,
631    ) -> Result<CryptoHash, Error> {
632        info!("Loading data blob file");
633        let blob_bytes = fs::read(&blob_path).context(format!(
634            "failed to load data blob bytes from {:?}",
635            &blob_path
636        ))?;
637
638        info!("Publishing data blob");
639        self.apply_client_command(chain_client, |chain_client| {
640            let blob_bytes = blob_bytes.clone();
641            let chain_client = chain_client.clone();
642            async move {
643                chain_client
644                    .publish_data_blob(blob_bytes)
645                    .await
646                    .context("Failed to publish data blob")
647            }
648        })
649        .await?;
650
651        info!("{}", "Data blob published successfully!");
652        Ok(CryptoHash::new(&BlobContent::new_data(blob_bytes)))
653    }
654
655    // TODO(#2490): Consider removing or renaming this.
656    pub async fn read_data_blob(
657        &mut self,
658        chain_client: &ChainClient<Env>,
659        hash: CryptoHash,
660    ) -> Result<(), Error> {
661        info!("Verifying data blob");
662        self.apply_client_command(chain_client, |chain_client| {
663            let chain_client = chain_client.clone();
664            async move {
665                chain_client
666                    .read_data_blob(hash)
667                    .await
668                    .context("Failed to verify data blob")
669            }
670        })
671        .await?;
672
673        info!("{}", "Data blob verified successfully!");
674        Ok(())
675    }
676}
677
678#[cfg(not(web))]
679impl<Env: Environment, W> ClientContext<Env, W>
680where
681    W: Persist<Target = Wallet>,
682{
683    pub async fn prepare_for_benchmark(
684        &mut self,
685        num_chains: usize,
686        tokens_per_chain: Amount,
687        fungible_application_id: Option<ApplicationId>,
688        pub_keys: Vec<AccountPublicKey>,
689        chains_config_path: Option<&Path>,
690    ) -> Result<(Vec<ChainClient<Env>>, Vec<ChainId>), Error> {
691        let start = Instant::now();
692        // Below all block proposals are supposed to succeed without retries, we
693        // must make sure that all incoming payments have been accepted on-chain
694        // and that no validator is missing user certificates.
695        self.process_inboxes_and_force_validator_updates().await;
696        info!(
697            "Processed inboxes and forced validator updates in {} ms",
698            start.elapsed().as_millis()
699        );
700
701        let start = Instant::now();
702        let (benchmark_chains, chain_clients) = self
703            .make_benchmark_chains(
704                num_chains,
705                tokens_per_chain,
706                pub_keys,
707                chains_config_path.is_some(),
708            )
709            .await?;
710        info!(
711            "Got {} chains in {} ms",
712            num_chains,
713            start.elapsed().as_millis()
714        );
715
716        if let Some(id) = fungible_application_id {
717            let start = Instant::now();
718            self.supply_fungible_tokens(&benchmark_chains, id).await?;
719            info!(
720                "Supplied fungible tokens in {} ms",
721                start.elapsed().as_millis()
722            );
723            // Need to process inboxes to make sure the chains receive the supplied tokens.
724            let start = Instant::now();
725            for chain_client in &chain_clients {
726                chain_client.process_inbox().await?;
727            }
728            info!(
729                "Processed inboxes after supplying fungible tokens in {} ms",
730                start.elapsed().as_millis()
731            );
732        }
733
734        let all_chains = Benchmark::<Env>::get_all_chains(chains_config_path, &benchmark_chains)?;
735        let known_chain_ids: HashSet<_> = benchmark_chains.iter().map(|(id, _)| *id).collect();
736        let unknown_chain_ids: Vec<_> = all_chains
737            .iter()
738            .filter(|id| !known_chain_ids.contains(id))
739            .copied()
740            .collect();
741        if !unknown_chain_ids.is_empty() {
742            // The current client won't have the blobs for the chains in the other wallets. Even
743            // though it will eventually get those blobs, we're getting a head start here and
744            // fetching those blobs in advance.
745            for chain_id in &unknown_chain_ids {
746                self.client.get_chain_description(*chain_id).await?;
747            }
748        }
749
750        Ok((chain_clients, all_chains))
751    }
752
753    pub async fn wrap_up_benchmark(
754        &mut self,
755        chain_clients: Vec<ChainClient<Env>>,
756        close_chains: bool,
757        wrap_up_max_in_flight: usize,
758    ) -> Result<(), Error> {
759        if close_chains {
760            info!("Closing chains...");
761            let stream = stream::iter(chain_clients)
762                .map(|chain_client| async move {
763                    Benchmark::<Env>::close_benchmark_chain(&chain_client).await?;
764                    info!("Closed chain {:?}", chain_client.chain_id());
765                    Ok::<(), BenchmarkError>(())
766                })
767                .buffer_unordered(wrap_up_max_in_flight);
768            stream.try_collect::<Vec<_>>().await?;
769        } else {
770            info!("Processing inbox for all chains...");
771            let stream = stream::iter(chain_clients.clone())
772                .map(|chain_client| async move {
773                    chain_client.process_inbox().await?;
774                    info!("Processed inbox for chain {:?}", chain_client.chain_id());
775                    Ok::<(), ChainClientError>(())
776                })
777                .buffer_unordered(wrap_up_max_in_flight);
778            stream.try_collect::<Vec<_>>().await?;
779
780            info!("Updating wallet from chain clients...");
781            for chain_client in chain_clients {
782                let info = chain_client.chain_info().await?;
783                let client_owner = chain_client.preferred_owner();
784                let pending_proposal = chain_client.pending_proposal().clone();
785                self.wallet
786                    .as_mut()
787                    .update_from_info(pending_proposal, client_owner, &info);
788            }
789            self.save_wallet().await?;
790        }
791
792        Ok(())
793    }
794
795    async fn process_inboxes_and_force_validator_updates(&mut self) {
796        let mut chain_clients = vec![];
797        for chain_id in &self.wallet.owned_chain_ids() {
798            chain_clients.push(self.make_chain_client(*chain_id));
799        }
800
801        let mut join_set = task::JoinSet::new();
802        for chain_client in chain_clients {
803            join_set.spawn(async move {
804                Self::process_inbox_without_updating_wallet(&chain_client)
805                    .await
806                    .expect("Processing inbox should not fail!");
807                chain_client
808            });
809        }
810
811        let chain_clients = join_set.join_all().await;
812        for chain_client in &chain_clients {
813            self.update_wallet_from_client(chain_client).await.unwrap();
814        }
815    }
816
817    async fn process_inbox_without_updating_wallet(
818        chain_client: &ChainClient<Env>,
819    ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
820        // Try processing the inbox optimistically without waiting for validator notifications.
821        chain_client.synchronize_from_validators().await?;
822        let (certificates, maybe_timeout) = chain_client.process_inbox_without_prepare().await?;
823        assert!(
824            maybe_timeout.is_none(),
825            "Should not timeout within benchmark!"
826        );
827
828        Ok(certificates)
829    }
830
831    /// Creates chains if necessary, and returns a map of exactly `num_chains` chain IDs
832    /// with key pairs, as well as a map of the chain clients.
833    async fn make_benchmark_chains(
834        &mut self,
835        num_chains: usize,
836        balance: Amount,
837        pub_keys: Vec<AccountPublicKey>,
838        wallet_only: bool,
839    ) -> Result<(Vec<(ChainId, AccountOwner)>, Vec<ChainClient<Env>>), Error> {
840        let mut chains_found_in_wallet = 0;
841        let mut benchmark_chains = Vec::with_capacity(num_chains);
842        let mut chain_clients = Vec::with_capacity(num_chains);
843        let start = Instant::now();
844        for chain_id in self.wallet.owned_chain_ids() {
845            if chains_found_in_wallet == num_chains {
846                break;
847            }
848            let chain_client = self.make_chain_client(chain_id);
849            let ownership = chain_client.chain_info().await?.manager.ownership;
850            if !ownership.owners.is_empty() || ownership.super_owners.len() != 1 {
851                continue;
852            }
853            chain_client.process_inbox().await?;
854            benchmark_chains.push((
855                chain_id,
856                *ownership
857                    .super_owners
858                    .first()
859                    .expect("should have a super owner"),
860            ));
861            chain_clients.push(chain_client);
862            chains_found_in_wallet += 1;
863        }
864        info!(
865            "Got {} chains from the wallet in {} ms",
866            benchmark_chains.len(),
867            start.elapsed().as_millis()
868        );
869
870        let num_chains_to_create = num_chains - chains_found_in_wallet;
871
872        let default_chain_id = self
873            .wallet
874            .default_chain()
875            .expect("should have default chain");
876        let default_chain_client = self.make_chain_client(default_chain_id);
877
878        if num_chains_to_create > 0 {
879            if wallet_only {
880                return Err(
881                    error::Inner::Benchmark(BenchmarkError::NotEnoughChainsInWallet(
882                        num_chains,
883                        chains_found_in_wallet,
884                    ))
885                    .into(),
886                );
887            }
888            let mut pub_keys_iter = pub_keys.into_iter().take(num_chains_to_create);
889            let operations_per_block = 900; // Over this we seem to hit the block size limits.
890            for i in (0..num_chains_to_create).step_by(operations_per_block) {
891                let num_new_chains = operations_per_block.min(num_chains_to_create - i);
892                let pub_key = pub_keys_iter.next().unwrap();
893                let owner = pub_key.into();
894
895                let certificate = Self::execute_open_chains_operations(
896                    num_new_chains,
897                    &default_chain_client,
898                    balance,
899                    owner,
900                )
901                .await?;
902                info!("Block executed successfully");
903
904                let block = certificate.block();
905                for i in 0..num_new_chains {
906                    let chain_id = block.body.blobs[i]
907                        .iter()
908                        .find(|blob| blob.id().blob_type == BlobType::ChainDescription)
909                        .map(|blob| ChainId(blob.id().hash))
910                        .expect("failed to create a new chain");
911                    self.client.track_chain(chain_id);
912
913                    let mut chain_client = self.client.create_chain_client(
914                        chain_id,
915                        None,
916                        BlockHeight::ZERO,
917                        None,
918                        Some(owner),
919                        self.timing_sender(),
920                    );
921                    chain_client.set_preferred_owner(owner);
922                    chain_client.process_inbox().await?;
923                    benchmark_chains.push((chain_id, owner));
924                    chain_clients.push(chain_client);
925                }
926            }
927
928            info!(
929                "Created {} chains in {} ms",
930                num_chains_to_create,
931                start.elapsed().as_millis()
932            );
933        }
934
935        info!("Updating wallet from client");
936        self.update_wallet_from_client(&default_chain_client)
937            .await?;
938        info!("Retrying pending outgoing messages");
939        default_chain_client
940            .retry_pending_outgoing_messages()
941            .await
942            .context("outgoing messages to create the new chains should be delivered")?;
943        info!("Processing default chain inbox");
944        default_chain_client.process_inbox().await?;
945
946        assert_eq!(
947            benchmark_chains.len(),
948            chain_clients.len(),
949            "benchmark_chains and chain_clients must have the same size"
950        );
951
952        Ok((benchmark_chains, chain_clients))
953    }
954
955    async fn execute_open_chains_operations(
956        num_new_chains: usize,
957        chain_client: &ChainClient<Env>,
958        balance: Amount,
959        owner: AccountOwner,
960    ) -> Result<ConfirmedBlockCertificate, Error> {
961        let config = OpenChainConfig {
962            ownership: ChainOwnership::single_super(owner),
963            balance,
964            application_permissions: Default::default(),
965        };
966        let operations = iter::repeat_n(
967            Operation::system(SystemOperation::OpenChain(config)),
968            num_new_chains,
969        )
970        .collect();
971        info!("Executing {} OpenChain operations", num_new_chains);
972        Ok(chain_client
973            .execute_operations(operations, vec![])
974            .await?
975            .expect("should execute block with OpenChain operations"))
976    }
977
978    /// Supplies fungible tokens to the chains.
979    async fn supply_fungible_tokens(
980        &mut self,
981        key_pairs: &[(ChainId, AccountOwner)],
982        application_id: ApplicationId,
983    ) -> Result<(), Error> {
984        let default_chain_id = self
985            .wallet
986            .default_chain()
987            .expect("should have default chain");
988        let default_key = self.wallet.get(default_chain_id).unwrap().owner.unwrap();
989        // This should be enough to run the benchmark at 1M TPS for an hour.
990        let amount = Amount::from_nanos(4);
991        let operations: Vec<Operation> = key_pairs
992            .iter()
993            .map(|(chain_id, owner)| {
994                Benchmark::<Env>::fungible_transfer(
995                    application_id,
996                    *chain_id,
997                    default_key,
998                    *owner,
999                    amount,
1000                )
1001            })
1002            .collect();
1003        let chain_client = self.make_chain_client(default_chain_id);
1004        // Put at most 1000 fungible token operations in each block.
1005        for operation_chunk in operations.chunks(1000) {
1006            chain_client
1007                .execute_operations(operation_chunk.to_vec(), vec![])
1008                .await?
1009                .expect("should execute block with Transfer operations");
1010        }
1011        self.update_wallet_from_client(&chain_client).await?;
1012
1013        Ok(())
1014    }
1015}