linera_client/
client_context.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4#[cfg(with_testing)]
5use std::num::NonZeroUsize;
6use std::sync::Arc;
7
8use futures::Future;
9use linera_base::{
10    crypto::{CryptoHash, ValidatorPublicKey},
11    data_types::{BlockHeight, Timestamp},
12    identifiers::{Account, AccountOwner, ChainId},
13    ownership::ChainOwnership,
14    time::{Duration, Instant},
15};
16use linera_chain::types::ConfirmedBlockCertificate;
17use linera_core::{
18    client::{ChainClient, Client},
19    data_types::{ChainInfoQuery, ClientOutcome},
20    join_set_ext::JoinSet,
21    node::ValidatorNode,
22    Environment, JoinSetExt as _,
23};
24use linera_persistent::{Persist, PersistExt as _};
25use linera_rpc::node_provider::{NodeOptions, NodeProvider};
26use linera_version::VersionInfo;
27use thiserror_context::Context;
28use tracing::{debug, info};
29#[cfg(feature = "benchmark")]
30use {
31    crate::benchmark::{Benchmark, BenchmarkError},
32    futures::{stream, StreamExt, TryStreamExt},
33    linera_base::{
34        crypto::AccountPublicKey,
35        data_types::{Amount, Epoch},
36        identifiers::ApplicationId,
37    },
38    linera_core::client::ChainClientError,
39    linera_execution::{
40        committee::Committee,
41        system::{OpenChainConfig, SystemOperation},
42        Operation,
43    },
44    std::{collections::HashMap, iter},
45    tokio::task,
46};
47#[cfg(feature = "fs")]
48use {
49    linera_base::{
50        data_types::{BlobContent, Bytecode},
51        identifiers::ModuleId,
52        vm::VmRuntime,
53    },
54    linera_core::client::create_bytecode_blobs,
55    std::{fs, path::PathBuf},
56};
57
58use crate::{
59    chain_listener::{self, ClientContext as _, ClientContextExt as _},
60    client_options::{ChainOwnershipConfig, ClientContextOptions},
61    error, util,
62    wallet::{UserChain, Wallet},
63    Error,
64};
65
66pub struct ClientContext<Env: Environment, W> {
67    pub wallet: W,
68    pub client: Arc<Client<Env>>,
69    pub send_timeout: Duration,
70    pub recv_timeout: Duration,
71    pub retry_delay: Duration,
72    pub max_retries: u32,
73    pub chain_listeners: JoinSet,
74}
75
76impl<Env: Environment, W> chain_listener::ClientContext for ClientContext<Env, W>
77where
78    W: Persist<Target = Wallet>,
79{
80    type Environment = Env;
81
82    fn wallet(&self) -> &Wallet {
83        &self.wallet
84    }
85
86    fn storage(&self) -> &Env::Storage {
87        self.client.storage_client()
88    }
89
90    fn client(&self) -> &Arc<Client<Env>> {
91        &self.client
92    }
93
94    async fn update_wallet_for_new_chain(
95        &mut self,
96        chain_id: ChainId,
97        owner: Option<AccountOwner>,
98        timestamp: Timestamp,
99    ) -> Result<(), Error> {
100        self.update_wallet_for_new_chain(chain_id, owner, timestamp)
101            .await
102    }
103
104    async fn update_wallet(&mut self, client: &ChainClient<Env>) -> Result<(), Error> {
105        self.update_wallet_from_client(client).await
106    }
107}
108
109impl<S, Si, W> ClientContext<linera_core::environment::Impl<S, NodeProvider, Si>, W>
110where
111    S: linera_core::environment::Storage,
112    Si: linera_core::environment::Signer,
113    W: Persist<Target = Wallet>,
114{
115    pub fn new(storage: S, options: ClientContextOptions, wallet: W, signer: Si) -> Self {
116        let node_provider = NodeProvider::new(NodeOptions {
117            send_timeout: options.send_timeout,
118            recv_timeout: options.recv_timeout,
119            retry_delay: options.retry_delay,
120            max_retries: options.max_retries,
121        });
122        let chain_ids = wallet.chain_ids();
123        let name = match chain_ids.len() {
124            0 => "Client node".to_string(),
125            1 => format!("Client node for {:.8}", chain_ids[0]),
126            n => format!("Client node for {:.8} and {} others", chain_ids[0], n - 1),
127        };
128        let client = Client::new(
129            linera_core::environment::Impl {
130                network: node_provider,
131                storage,
132                signer,
133            },
134            wallet.genesis_admin_chain(),
135            options.long_lived_services,
136            chain_ids,
137            name,
138            options.max_loaded_chains,
139            options.to_chain_client_options(),
140        );
141
142        ClientContext {
143            client: Arc::new(client),
144            wallet,
145            send_timeout: options.send_timeout,
146            recv_timeout: options.recv_timeout,
147            retry_delay: options.retry_delay,
148            max_retries: options.max_retries,
149            chain_listeners: JoinSet::default(),
150        }
151    }
152
153    #[cfg(with_testing)]
154    pub fn new_test_client_context(storage: S, wallet: W, signer: Si) -> Self {
155        use linera_core::{client::ChainClientOptions, node::CrossChainMessageDelivery};
156
157        let send_recv_timeout = Duration::from_millis(4000);
158        let retry_delay = Duration::from_millis(1000);
159        let max_retries = 10;
160
161        let node_options = NodeOptions {
162            send_timeout: send_recv_timeout,
163            recv_timeout: send_recv_timeout,
164            retry_delay,
165            max_retries,
166        };
167        let chain_ids = wallet.chain_ids();
168        let name = match chain_ids.len() {
169            0 => "Client node".to_string(),
170            1 => format!("Client node for {:.8}", chain_ids[0]),
171            n => format!("Client node for {:.8} and {} others", chain_ids[0], n - 1),
172        };
173        let client = Client::new(
174            linera_core::environment::Impl {
175                storage,
176                network: NodeProvider::new(node_options),
177                signer,
178            },
179            wallet.genesis_admin_chain(),
180            false,
181            chain_ids,
182            name,
183            NonZeroUsize::new(20).expect("Chain worker limit should not be zero"),
184            ChainClientOptions {
185                cross_chain_message_delivery: CrossChainMessageDelivery::Blocking,
186                ..ChainClientOptions::test_default()
187            },
188        );
189
190        ClientContext {
191            client: Arc::new(client),
192            wallet,
193            send_timeout: send_recv_timeout,
194            recv_timeout: send_recv_timeout,
195            retry_delay,
196            max_retries,
197            chain_listeners: JoinSet::default(),
198        }
199    }
200}
201
202impl<Env: Environment, W: Persist<Target = Wallet>> ClientContext<Env, W> {
203    /// Returns a reference to the wallet.
204    pub fn wallet(&self) -> &Wallet {
205        &self.wallet
206    }
207
208    /// Returns the wallet as a mutable reference.
209    pub fn wallet_mut(&mut self) -> &mut W {
210        &mut self.wallet
211    }
212
213    pub async fn mutate_wallet<R: Send>(
214        &mut self,
215        mutation: impl FnOnce(&mut Wallet) -> R + Send,
216    ) -> Result<R, Error> {
217        self.wallet
218            .mutate(mutation)
219            .await
220            .map_err(|e| error::Inner::Persistence(Box::new(e)).into())
221    }
222
223    /// Retrieve the default account. Current this is the common account of the default
224    /// chain.
225    pub fn default_account(&self) -> Account {
226        Account::chain(self.default_chain())
227    }
228
229    /// Retrieve the default chain.
230    pub fn default_chain(&self) -> ChainId {
231        self.wallet
232            .default_chain()
233            .expect("No chain specified in wallet with no default chain")
234    }
235
236    pub fn first_non_admin_chain(&self) -> ChainId {
237        self.wallet
238            .first_non_admin_chain()
239            .expect("No non-admin chain specified in wallet with no non-admin chain")
240    }
241
242    pub fn make_node_provider(&self) -> NodeProvider {
243        NodeProvider::new(self.make_node_options())
244    }
245
246    fn make_node_options(&self) -> NodeOptions {
247        NodeOptions {
248            send_timeout: self.send_timeout,
249            recv_timeout: self.recv_timeout,
250            retry_delay: self.retry_delay,
251            max_retries: self.max_retries,
252        }
253    }
254
255    pub async fn save_wallet(&mut self) -> Result<(), Error> {
256        self.wallet
257            .persist()
258            .await
259            .map_err(|e| error::Inner::Persistence(Box::new(e)).into())
260    }
261
262    pub async fn update_wallet_from_client<Env_: Environment>(
263        &mut self,
264        client: &ChainClient<Env_>,
265    ) -> Result<(), Error> {
266        let info = client.chain_info().await?;
267        let client_owner = client.preferred_owner();
268        let pending_proposal = client.pending_proposal().clone();
269        self.wallet
270            .as_mut()
271            .update_from_info(pending_proposal, client_owner, &info);
272        self.save_wallet().await
273    }
274
275    /// Remembers the new chain and its owner (if any) in the wallet.
276    pub async fn update_wallet_for_new_chain(
277        &mut self,
278        chain_id: ChainId,
279        owner: Option<AccountOwner>,
280        timestamp: Timestamp,
281    ) -> Result<(), Error> {
282        if self.wallet.get(chain_id).is_none() {
283            self.mutate_wallet(|w| {
284                w.insert(UserChain {
285                    chain_id,
286                    owner,
287                    block_hash: None,
288                    timestamp,
289                    next_block_height: BlockHeight::ZERO,
290                    pending_proposal: None,
291                })
292            })
293            .await?;
294        }
295
296        Ok(())
297    }
298
299    pub async fn process_inbox(
300        &mut self,
301        chain_client: &ChainClient<Env>,
302    ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
303        let mut certificates = Vec::new();
304        // Try processing the inbox optimistically without waiting for validator notifications.
305        let (new_certificates, maybe_timeout) = {
306            chain_client.synchronize_from_validators().await?;
307            let result = chain_client.process_inbox_without_prepare().await;
308            self.update_wallet_from_client(chain_client).await?;
309            if result.is_err() {
310                self.save_wallet().await?;
311            }
312            result?
313        };
314        certificates.extend(new_certificates);
315        if maybe_timeout.is_none() {
316            self.save_wallet().await?;
317            return Ok(certificates);
318        }
319
320        // Start listening for notifications, so we learn about new rounds and blocks.
321        let (listener, _listen_handle, mut notification_stream) = chain_client.listen().await?;
322        self.chain_listeners.spawn_task(listener);
323
324        loop {
325            let (new_certificates, maybe_timeout) = {
326                let result = chain_client.process_inbox().await;
327                self.update_wallet_from_client(chain_client).await?;
328                if result.is_err() {
329                    self.save_wallet().await?;
330                }
331                result?
332            };
333            certificates.extend(new_certificates);
334            if let Some(timestamp) = maybe_timeout {
335                util::wait_for_next_round(&mut notification_stream, timestamp).await
336            } else {
337                self.save_wallet().await?;
338                return Ok(certificates);
339            }
340        }
341    }
342
343    pub async fn assign_new_chain_to_key(
344        &mut self,
345        chain_id: ChainId,
346        owner: AccountOwner,
347    ) -> Result<(), Error> {
348        self.client.track_chain(chain_id);
349        let chain_description = self.chain_description(chain_id).await?;
350        let config = chain_description.config();
351
352        if !config.ownership.verify_owner(&owner) {
353            tracing::error!(
354                "The chain with the ID returned by the faucet is not owned by you. \
355                Please make sure you are connecting to a genuine faucet."
356            );
357            return Err(error::Inner::ChainOwnership.into());
358        }
359
360        self.wallet_mut()
361            .mutate(|w| w.assign_new_chain_to_owner(owner, chain_id, chain_description.timestamp()))
362            .await
363            .map_err(|e| error::Inner::Persistence(Box::new(e)))?
364            .context("assigning new chain")?;
365        Ok(())
366    }
367
368    /// Applies the given function to the chain client.
369    ///
370    /// Updates the wallet regardless of the outcome. As long as the function returns a round
371    /// timeout, it will wait and retry.
372    pub async fn apply_client_command<E, F, Fut, T>(
373        &mut self,
374        client: &ChainClient<Env>,
375        mut f: F,
376    ) -> Result<T, Error>
377    where
378        F: FnMut(&ChainClient<Env>) -> Fut,
379        Fut: Future<Output = Result<ClientOutcome<T>, E>>,
380        Error: From<E>,
381    {
382        client.prepare_chain().await?;
383        // Try applying f optimistically without validator notifications. Return if committed.
384        let result = f(client).await;
385        self.update_wallet_from_client(client).await?;
386        if let ClientOutcome::Committed(t) = result? {
387            return Ok(t);
388        }
389
390        // Start listening for notifications, so we learn about new rounds and blocks.
391        let (listener, _listen_handle, mut notification_stream) = client.listen().await?;
392        self.chain_listeners.spawn_task(listener);
393
394        loop {
395            // Try applying f. Return if committed.
396            client.prepare_chain().await?;
397            let result = f(client).await;
398            self.update_wallet_from_client(client).await?;
399            let timeout = match result? {
400                ClientOutcome::Committed(t) => return Ok(t),
401                ClientOutcome::WaitForTimeout(timeout) => timeout,
402            };
403            // Otherwise wait and try again in the next round.
404            util::wait_for_next_round(&mut notification_stream, timeout).await;
405        }
406    }
407
408    pub async fn change_ownership(
409        &mut self,
410        chain_id: Option<ChainId>,
411        ownership_config: ChainOwnershipConfig,
412    ) -> Result<(), Error> {
413        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
414        let chain_client = self.make_chain_client(chain_id);
415        info!(
416            ?ownership_config, %chain_id, preferred_owner=?chain_client.preferred_owner(),
417            "Changing ownership of a chain"
418        );
419        let time_start = Instant::now();
420        let ownership = ChainOwnership::try_from(ownership_config)?;
421
422        let certificate = self
423            .apply_client_command(&chain_client, |chain_client| {
424                let ownership = ownership.clone();
425                let chain_client = chain_client.clone();
426                async move {
427                    chain_client
428                        .change_ownership(ownership)
429                        .await
430                        .map_err(Error::from)
431                        .context("Failed to change ownership")
432                }
433            })
434            .await?;
435        let time_total = time_start.elapsed();
436        info!("Operation confirmed after {} ms", time_total.as_millis());
437        debug!("{:?}", certificate);
438        Ok(())
439    }
440
441    pub async fn set_preferred_owner(
442        &mut self,
443        chain_id: Option<ChainId>,
444        preferred_owner: AccountOwner,
445    ) -> Result<(), Error> {
446        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
447        let mut chain_client = self.make_chain_client(chain_id);
448        let old_owner = chain_client.preferred_owner();
449        info!(%chain_id, ?old_owner, %preferred_owner, "Changing preferred owner for chain");
450        chain_client.set_preferred_owner(preferred_owner);
451        self.update_wallet_from_client(&chain_client).await?;
452        info!("New preferred owner set");
453        Ok(())
454    }
455
456    pub async fn check_compatible_version_info(
457        &self,
458        address: &str,
459        node: &impl ValidatorNode,
460    ) -> Result<VersionInfo, Error> {
461        match node.get_version_info().await {
462            Ok(version_info) if version_info.is_compatible_with(&linera_version::VERSION_INFO) => {
463                info!(
464                    "Version information for validator {address}: {}",
465                    version_info
466                );
467                Ok(version_info)
468            }
469            Ok(version_info) => Err(error::Inner::UnexpectedVersionInfo {
470                remote: Box::new(version_info),
471                local: Box::new(linera_version::VERSION_INFO.clone()),
472            }
473            .into()),
474            Err(error) => Err(error::Inner::UnavailableVersionInfo {
475                address: address.to_string(),
476                error: Box::new(error),
477            }
478            .into()),
479        }
480    }
481
482    pub async fn check_matching_network_description(
483        &self,
484        address: &str,
485        node: &impl ValidatorNode,
486    ) -> Result<CryptoHash, Error> {
487        let network_description = self.wallet().genesis_config().network_description();
488        match node.get_network_description().await {
489            Ok(description) => {
490                if description == network_description {
491                    Ok(description.genesis_config_hash)
492                } else {
493                    Err(error::Inner::UnexpectedNetworkDescription {
494                        remote: Box::new(description),
495                        local: Box::new(network_description),
496                    }
497                    .into())
498                }
499            }
500            Err(error) => Err(error::Inner::UnavailableNetworkDescription {
501                address: address.to_string(),
502                error: Box::new(error),
503            }
504            .into()),
505        }
506    }
507
508    pub async fn check_validator_chain_info_response(
509        &self,
510        public_key: Option<&ValidatorPublicKey>,
511        address: &str,
512        node: &impl ValidatorNode,
513        chain_id: ChainId,
514    ) -> Result<(), Error> {
515        let query = ChainInfoQuery::new(chain_id);
516        match node.handle_chain_info_query(query).await {
517            Ok(response) => {
518                info!(
519                    "Validator {address} sees chain {chain_id} at block height {} and epoch {:?}",
520                    response.info.next_block_height, response.info.epoch,
521                );
522                if let Some(public_key) = public_key {
523                    if response.check(public_key).is_ok() {
524                        info!("Signature for public key {public_key} is OK.");
525                    } else {
526                        return Err(error::Inner::InvalidSignature {
527                            public_key: *public_key,
528                        }
529                        .into());
530                    }
531                }
532                Ok(())
533            }
534            Err(error) => Err(error::Inner::UnavailableChainInfo {
535                address: address.to_string(),
536                chain_id,
537                error: Box::new(error),
538            }
539            .into()),
540        }
541    }
542}
543
544#[cfg(feature = "fs")]
545impl<Env: Environment, W> ClientContext<Env, W>
546where
547    W: Persist<Target = Wallet>,
548{
549    pub async fn publish_module(
550        &mut self,
551        chain_client: &ChainClient<Env>,
552        contract: PathBuf,
553        service: PathBuf,
554        vm_runtime: VmRuntime,
555    ) -> Result<ModuleId, Error> {
556        info!("Loading bytecode files");
557        let contract_bytecode = Bytecode::load_from_file(&contract)
558            .await
559            .with_context(|| format!("failed to load contract bytecode from {:?}", &contract))?;
560        let service_bytecode = Bytecode::load_from_file(&service)
561            .await
562            .with_context(|| format!("failed to load service bytecode from {:?}", &service))?;
563
564        info!("Publishing module");
565        let (blobs, module_id) =
566            create_bytecode_blobs(contract_bytecode, service_bytecode, vm_runtime).await;
567        let (module_id, _) = self
568            .apply_client_command(chain_client, |chain_client| {
569                let blobs = blobs.clone();
570                let chain_client = chain_client.clone();
571                async move {
572                    chain_client
573                        .publish_module_blobs(blobs, module_id)
574                        .await
575                        .context("Failed to publish module")
576                }
577            })
578            .await?;
579
580        info!("{}", "Module published successfully!");
581
582        info!("Synchronizing client and processing inbox");
583        self.process_inbox(chain_client).await?;
584        Ok(module_id)
585    }
586
587    pub async fn publish_data_blob(
588        &mut self,
589        chain_client: &ChainClient<Env>,
590        blob_path: PathBuf,
591    ) -> Result<CryptoHash, Error> {
592        info!("Loading data blob file");
593        let blob_bytes = fs::read(&blob_path).context(format!(
594            "failed to load data blob bytes from {:?}",
595            &blob_path
596        ))?;
597
598        info!("Publishing data blob");
599        self.apply_client_command(chain_client, |chain_client| {
600            let blob_bytes = blob_bytes.clone();
601            let chain_client = chain_client.clone();
602            async move {
603                chain_client
604                    .publish_data_blob(blob_bytes)
605                    .await
606                    .context("Failed to publish data blob")
607            }
608        })
609        .await?;
610
611        info!("{}", "Data blob published successfully!");
612        Ok(CryptoHash::new(&BlobContent::new_data(blob_bytes)))
613    }
614
615    // TODO(#2490): Consider removing or renaming this.
616    pub async fn read_data_blob(
617        &mut self,
618        chain_client: &ChainClient<Env>,
619        hash: CryptoHash,
620    ) -> Result<(), Error> {
621        info!("Verifying data blob");
622        self.apply_client_command(chain_client, |chain_client| {
623            let chain_client = chain_client.clone();
624            async move {
625                chain_client
626                    .read_data_blob(hash)
627                    .await
628                    .context("Failed to verify data blob")
629            }
630        })
631        .await?;
632
633        info!("{}", "Data blob verified successfully!");
634        Ok(())
635    }
636}
637
638#[cfg(feature = "benchmark")]
639impl<Env: Environment, W> ClientContext<Env, W>
640where
641    W: Persist<Target = Wallet>,
642{
643    pub async fn prepare_for_benchmark(
644        &mut self,
645        num_chains: usize,
646        transactions_per_block: usize,
647        tokens_per_chain: Amount,
648        fungible_application_id: Option<ApplicationId>,
649        pub_keys: Vec<AccountPublicKey>,
650    ) -> Result<
651        (
652            HashMap<ChainId, ChainClient<Env>>,
653            Epoch,
654            Vec<(ChainId, Vec<Operation>, AccountOwner)>,
655            Committee,
656        ),
657        Error,
658    > {
659        let start = Instant::now();
660        // Below all block proposals are supposed to succeed without retries, we
661        // must make sure that all incoming payments have been accepted on-chain
662        // and that no validator is missing user certificates.
663        self.process_inboxes_and_force_validator_updates().await;
664        info!(
665            "Processed inboxes and forced validator updates in {} ms",
666            start.elapsed().as_millis()
667        );
668
669        let start = Instant::now();
670        let (key_pairs, chain_clients) = self
671            .make_benchmark_chains(num_chains, tokens_per_chain, pub_keys)
672            .await?;
673        info!(
674            "Got {} chains in {} ms",
675            key_pairs.len(),
676            start.elapsed().as_millis()
677        );
678
679        if let Some(id) = fungible_application_id {
680            let start = Instant::now();
681            self.supply_fungible_tokens(&key_pairs, id).await?;
682            info!(
683                "Supplied fungible tokens in {} ms",
684                start.elapsed().as_millis()
685            );
686        }
687
688        let default_chain_id = self
689            .wallet
690            .default_chain()
691            .expect("should have default chain");
692        let default_chain_client = self.make_chain_client(default_chain_id);
693        let (epoch, committee) = default_chain_client.admin_committee().await?;
694        let blocks_infos = Benchmark::<Env>::make_benchmark_block_info(
695            key_pairs,
696            transactions_per_block,
697            fungible_application_id,
698        );
699
700        Ok((chain_clients, epoch, blocks_infos, committee))
701    }
702
703    pub async fn wrap_up_benchmark(
704        &mut self,
705        chain_clients: HashMap<ChainId, ChainClient<Env>>,
706        close_chains: bool,
707        wrap_up_max_in_flight: usize,
708    ) -> Result<(), Error> {
709        if close_chains {
710            info!("Closing chains...");
711            let stream = stream::iter(chain_clients.values().cloned())
712                .map(|chain_client| async move {
713                    Benchmark::<Env>::close_benchmark_chain(&chain_client).await?;
714                    info!("Closed chain {:?}", chain_client.chain_id());
715                    Ok::<(), BenchmarkError>(())
716                })
717                .buffer_unordered(wrap_up_max_in_flight);
718            stream.try_collect::<Vec<_>>().await?;
719        } else {
720            info!("Processing inbox for all chains...");
721            let stream = stream::iter(chain_clients.values().cloned())
722                .map(|chain_client| async move {
723                    chain_client.process_inbox().await?;
724                    info!("Processed inbox for chain {:?}", chain_client.chain_id());
725                    Ok::<(), ChainClientError>(())
726                })
727                .buffer_unordered(wrap_up_max_in_flight);
728            stream.try_collect::<Vec<_>>().await?;
729
730            info!("Updating wallet from chain clients...");
731            for chain_client in chain_clients.values() {
732                let info = chain_client.chain_info().await?;
733                let client_owner = chain_client.preferred_owner();
734                let pending_proposal = chain_client.pending_proposal().clone();
735                self.wallet
736                    .as_mut()
737                    .update_from_info(pending_proposal, client_owner, &info);
738            }
739            self.save_wallet().await?;
740        }
741
742        Ok(())
743    }
744
745    async fn process_inboxes_and_force_validator_updates(&mut self) {
746        let mut chain_clients = vec![];
747        for chain_id in &self.wallet.owned_chain_ids() {
748            chain_clients.push(self.make_chain_client(*chain_id));
749        }
750
751        let mut join_set = task::JoinSet::new();
752        for chain_client in chain_clients {
753            join_set.spawn(async move {
754                Self::process_inbox_without_updating_wallet(&chain_client)
755                    .await
756                    .expect("Processing inbox should not fail!");
757                chain_client
758            });
759        }
760
761        let chain_clients = join_set.join_all().await;
762        for chain_client in &chain_clients {
763            self.update_wallet_from_client(chain_client).await.unwrap();
764        }
765    }
766
767    async fn process_inbox_without_updating_wallet(
768        chain_client: &ChainClient<Env>,
769    ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
770        // Try processing the inbox optimistically without waiting for validator notifications.
771        chain_client.synchronize_from_validators().await?;
772        let (certificates, maybe_timeout) = chain_client.process_inbox_without_prepare().await?;
773        assert!(
774            maybe_timeout.is_none(),
775            "Should not timeout within benchmark!"
776        );
777
778        Ok(certificates)
779    }
780
781    /// Creates chains if necessary, and returns a map of exactly `num_chains` chain IDs
782    /// with key pairs, as well as a map of the chain clients.
783    async fn make_benchmark_chains(
784        &mut self,
785        num_chains: usize,
786        balance: Amount,
787        pub_keys: Vec<AccountPublicKey>,
788    ) -> Result<
789        (
790            HashMap<ChainId, AccountOwner>,
791            HashMap<ChainId, ChainClient<Env>>,
792        ),
793        Error,
794    > {
795        use linera_base::identifiers::BlobType;
796        let mut benchmark_chains = HashMap::new();
797        let mut chain_clients = HashMap::new();
798        let start = Instant::now();
799        for chain_id in self.wallet.owned_chain_ids() {
800            if benchmark_chains.len() == num_chains {
801                break;
802            }
803            // This should never panic, because `owned_chain_ids` only returns the owned chains that
804            // we have a key pair for.
805            let owner = self
806                .wallet
807                .get(chain_id)
808                .and_then(|chain| chain.owner)
809                .unwrap();
810            let chain_client = self.make_chain_client(chain_id);
811            let ownership = chain_client.chain_info().await?.manager.ownership;
812            if !ownership.owners.is_empty() || ownership.super_owners.len() != 1 {
813                continue;
814            }
815            benchmark_chains.insert(chain_client.chain_id(), owner);
816            chain_client.process_inbox().await?;
817            chain_clients.insert(chain_id, chain_client);
818        }
819        info!(
820            "Got {} chains from the wallet in {} ms",
821            benchmark_chains.len(),
822            start.elapsed().as_millis()
823        );
824
825        let chains_from_wallet = benchmark_chains.len();
826        let num_chains_to_create = num_chains - chains_from_wallet;
827
828        let default_chain_id = self
829            .wallet
830            .default_chain()
831            .expect("should have default chain");
832        let operations_per_block = 900; // Over this we seem to hit the block size limits.
833
834        let mut pub_keys_iter = pub_keys.into_iter().take(num_chains_to_create);
835        let default_chain_client = self.make_chain_client(default_chain_id);
836
837        for i in (0..num_chains_to_create).step_by(operations_per_block) {
838            let num_new_chains = operations_per_block.min(num_chains_to_create - i);
839            let pub_key = pub_keys_iter.next().unwrap();
840
841            let certificate = Self::execute_open_chains_operations(
842                num_new_chains,
843                &default_chain_client,
844                balance,
845                pub_key.into(),
846            )
847            .await?;
848            info!("Block executed successfully");
849
850            let block = certificate.block();
851            for i in 0..num_new_chains {
852                let chain_id = block.body.blobs[i]
853                    .iter()
854                    .find(|blob| blob.id().blob_type == BlobType::ChainDescription)
855                    .map(|blob| ChainId(blob.id().hash))
856                    .expect("failed to create a new chain");
857                benchmark_chains.insert(chain_id, pub_key.into());
858                self.client.track_chain(chain_id);
859
860                let mut chain_client = self.client.create_chain_client(
861                    chain_id,
862                    None,
863                    BlockHeight::ZERO,
864                    None,
865                    Some(pub_key.into()),
866                );
867                chain_client.set_preferred_owner(pub_key.into());
868                chain_client.process_inbox().await?;
869                chain_clients.insert(chain_id, chain_client);
870            }
871        }
872
873        if num_chains_to_create > 0 {
874            info!(
875                "Created {} chains in {} ms",
876                num_chains_to_create,
877                start.elapsed().as_millis()
878            );
879        }
880
881        info!("Updating wallet from client");
882        self.update_wallet_from_client(&default_chain_client)
883            .await?;
884        info!("Retrying pending outgoing messages");
885        default_chain_client
886            .retry_pending_outgoing_messages()
887            .await
888            .context("outgoing messages to create the new chains should be delivered")?;
889        info!("Processing default chain inbox");
890        default_chain_client.process_inbox().await?;
891
892        Ok((benchmark_chains, chain_clients))
893    }
894
895    async fn execute_open_chains_operations(
896        num_new_chains: usize,
897        chain_client: &ChainClient<Env>,
898        balance: Amount,
899        owner: AccountOwner,
900    ) -> Result<ConfirmedBlockCertificate, Error> {
901        let config = OpenChainConfig {
902            ownership: ChainOwnership::single_super(owner),
903            balance,
904            application_permissions: Default::default(),
905        };
906        let operations = iter::repeat_n(
907            Operation::system(SystemOperation::OpenChain(config)),
908            num_new_chains,
909        )
910        .collect();
911        info!("Executing {} OpenChain operations", num_new_chains);
912        Ok(chain_client
913            .execute_operations(operations, vec![])
914            .await?
915            .expect("should execute block with OpenChain operations"))
916    }
917
918    /// Supplies fungible tokens to the chains.
919    async fn supply_fungible_tokens(
920        &mut self,
921        key_pairs: &HashMap<ChainId, AccountOwner>,
922        application_id: ApplicationId,
923    ) -> Result<(), Error> {
924        let default_chain_id = self
925            .wallet
926            .default_chain()
927            .expect("should have default chain");
928        let default_key = self.wallet.get(default_chain_id).unwrap().owner.unwrap();
929        let amount = Amount::from(1_000_000);
930        let operations: Vec<_> = key_pairs
931            .iter()
932            .map(|(chain_id, owner)| {
933                Benchmark::<Env>::fungible_transfer(
934                    application_id,
935                    *chain_id,
936                    default_key,
937                    *owner,
938                    amount,
939                )
940            })
941            .collect();
942        let chain_client = self.make_chain_client(default_chain_id);
943        // Put at most 1000 fungible token operations in each block.
944        for operation_chunk in operations.chunks(1000) {
945            chain_client
946                .execute_operations(operation_chunk.to_vec(), vec![])
947                .await?
948                .expect("should execute block with Transfer operations");
949        }
950        self.update_wallet_from_client(&chain_client).await?;
951
952        Ok(())
953    }
954}