linera_client/
client_context.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use futures::Future;
7use linera_base::{
8    crypto::{CryptoHash, ValidatorPublicKey},
9    data_types::{BlockHeight, Timestamp},
10    identifiers::{Account, AccountOwner, ChainId},
11    ownership::ChainOwnership,
12    time::{Duration, Instant},
13};
14use linera_chain::types::ConfirmedBlockCertificate;
15use linera_core::{
16    client::{ChainClient, Client},
17    data_types::{ChainInfoQuery, ClientOutcome},
18    join_set_ext::JoinSet,
19    node::ValidatorNode,
20    Environment, JoinSetExt as _,
21};
22use linera_persistent::{Persist, PersistExt as _};
23use linera_rpc::node_provider::{NodeOptions, NodeProvider};
24use linera_version::VersionInfo;
25use thiserror_context::Context;
26use tracing::{debug, info};
27#[cfg(feature = "benchmark")]
28use {
29    crate::benchmark::{Benchmark, BenchmarkError},
30    futures::{stream, StreamExt, TryStreamExt},
31    linera_base::{
32        crypto::AccountPublicKey,
33        data_types::Amount,
34        identifiers::{ApplicationId, BlobType},
35    },
36    linera_core::client::ChainClientError,
37    linera_execution::{
38        committee::Committee,
39        system::{OpenChainConfig, SystemOperation},
40        Operation,
41    },
42    std::iter,
43    tokio::task,
44};
45#[cfg(feature = "fs")]
46use {
47    linera_base::{
48        data_types::{BlobContent, Bytecode},
49        identifiers::ModuleId,
50        vm::VmRuntime,
51    },
52    linera_core::client::create_bytecode_blobs,
53    std::{fs, path::PathBuf},
54};
55
56use crate::{
57    chain_listener::{self, ClientContext as _},
58    client_options::{ChainOwnershipConfig, ClientContextOptions},
59    error, util,
60    wallet::{UserChain, Wallet},
61    Error,
62};
63
64pub struct ClientContext<Env: Environment, W> {
65    pub wallet: W,
66    pub client: Arc<Client<Env>>,
67    pub send_timeout: Duration,
68    pub recv_timeout: Duration,
69    pub retry_delay: Duration,
70    pub max_retries: u32,
71    pub chain_listeners: JoinSet,
72}
73
74impl<Env: Environment, W> chain_listener::ClientContext for ClientContext<Env, W>
75where
76    W: Persist<Target = Wallet>,
77{
78    type Environment = Env;
79
80    fn wallet(&self) -> &Wallet {
81        &self.wallet
82    }
83
84    fn storage(&self) -> &Env::Storage {
85        self.client.storage_client()
86    }
87
88    fn client(&self) -> &Arc<Client<Env>> {
89        &self.client
90    }
91
92    async fn update_wallet_for_new_chain(
93        &mut self,
94        chain_id: ChainId,
95        owner: Option<AccountOwner>,
96        timestamp: Timestamp,
97    ) -> Result<(), Error> {
98        self.update_wallet_for_new_chain(chain_id, owner, timestamp)
99            .await
100    }
101
102    async fn update_wallet(&mut self, client: &ChainClient<Env>) -> Result<(), Error> {
103        self.update_wallet_from_client(client).await
104    }
105}
106
107impl<S, Si, W> ClientContext<linera_core::environment::Impl<S, NodeProvider, Si>, W>
108where
109    S: linera_core::environment::Storage,
110    Si: linera_core::environment::Signer,
111    W: Persist<Target = Wallet>,
112{
113    pub fn new(storage: S, options: ClientContextOptions, wallet: W, signer: Si) -> Self {
114        let node_provider = NodeProvider::new(NodeOptions {
115            send_timeout: options.send_timeout,
116            recv_timeout: options.recv_timeout,
117            retry_delay: options.retry_delay,
118            max_retries: options.max_retries,
119        });
120        let chain_ids = wallet.chain_ids();
121        let name = match chain_ids.len() {
122            0 => "Client node".to_string(),
123            1 => format!("Client node for {:.8}", chain_ids[0]),
124            n => format!("Client node for {:.8} and {} others", chain_ids[0], n - 1),
125        };
126        let client = Client::new(
127            linera_core::environment::Impl {
128                network: node_provider,
129                storage,
130                signer,
131            },
132            wallet.genesis_admin_chain(),
133            options.long_lived_services,
134            chain_ids,
135            name,
136            Duration::from_secs(30),
137            options.to_chain_client_options(),
138        );
139
140        ClientContext {
141            client: Arc::new(client),
142            wallet,
143            send_timeout: options.send_timeout,
144            recv_timeout: options.recv_timeout,
145            retry_delay: options.retry_delay,
146            max_retries: options.max_retries,
147            chain_listeners: JoinSet::default(),
148        }
149    }
150
151    #[cfg(with_testing)]
152    pub fn new_test_client_context(storage: S, wallet: W, signer: Si) -> Self {
153        use linera_core::{client::ChainClientOptions, node::CrossChainMessageDelivery};
154
155        let send_recv_timeout = Duration::from_millis(4000);
156        let retry_delay = Duration::from_millis(1000);
157        let max_retries = 10;
158
159        let node_options = NodeOptions {
160            send_timeout: send_recv_timeout,
161            recv_timeout: send_recv_timeout,
162            retry_delay,
163            max_retries,
164        };
165        let chain_ids = wallet.chain_ids();
166        let name = match chain_ids.len() {
167            0 => "Client node".to_string(),
168            1 => format!("Client node for {:.8}", chain_ids[0]),
169            n => format!("Client node for {:.8} and {} others", chain_ids[0], n - 1),
170        };
171        let client = Client::new(
172            linera_core::environment::Impl {
173                storage,
174                network: NodeProvider::new(node_options),
175                signer,
176            },
177            wallet.genesis_admin_chain(),
178            false,
179            chain_ids,
180            name,
181            Duration::from_secs(30),
182            ChainClientOptions {
183                cross_chain_message_delivery: CrossChainMessageDelivery::Blocking,
184                ..ChainClientOptions::test_default()
185            },
186        );
187
188        ClientContext {
189            client: Arc::new(client),
190            wallet,
191            send_timeout: send_recv_timeout,
192            recv_timeout: send_recv_timeout,
193            retry_delay,
194            max_retries,
195            chain_listeners: JoinSet::default(),
196        }
197    }
198}
199
200impl<Env: Environment, W: Persist<Target = Wallet>> ClientContext<Env, W> {
201    /// Returns a reference to the wallet.
202    pub fn wallet(&self) -> &Wallet {
203        &self.wallet
204    }
205
206    /// Returns the wallet as a mutable reference.
207    pub fn wallet_mut(&mut self) -> &mut W {
208        &mut self.wallet
209    }
210
211    pub async fn mutate_wallet<R: Send>(
212        &mut self,
213        mutation: impl FnOnce(&mut Wallet) -> R + Send,
214    ) -> Result<R, Error> {
215        self.wallet
216            .mutate(mutation)
217            .await
218            .map_err(|e| error::Inner::Persistence(Box::new(e)).into())
219    }
220
221    /// Retrieve the default account. Current this is the common account of the default
222    /// chain.
223    pub fn default_account(&self) -> Account {
224        Account::chain(self.default_chain())
225    }
226
227    /// Retrieve the default chain.
228    pub fn default_chain(&self) -> ChainId {
229        self.wallet
230            .default_chain()
231            .expect("No chain specified in wallet with no default chain")
232    }
233
234    pub fn first_non_admin_chain(&self) -> ChainId {
235        self.wallet
236            .first_non_admin_chain()
237            .expect("No non-admin chain specified in wallet with no non-admin chain")
238    }
239
240    pub fn make_node_provider(&self) -> NodeProvider {
241        NodeProvider::new(self.make_node_options())
242    }
243
244    fn make_node_options(&self) -> NodeOptions {
245        NodeOptions {
246            send_timeout: self.send_timeout,
247            recv_timeout: self.recv_timeout,
248            retry_delay: self.retry_delay,
249            max_retries: self.max_retries,
250        }
251    }
252
253    pub async fn save_wallet(&mut self) -> Result<(), Error> {
254        self.wallet
255            .persist()
256            .await
257            .map_err(|e| error::Inner::Persistence(Box::new(e)).into())
258    }
259
260    pub async fn update_wallet_from_client<Env_: Environment>(
261        &mut self,
262        client: &ChainClient<Env_>,
263    ) -> Result<(), Error> {
264        let info = client.chain_info().await?;
265        let client_owner = client.preferred_owner();
266        let pending_proposal = client.pending_proposal().clone();
267        self.wallet
268            .as_mut()
269            .update_from_info(pending_proposal, client_owner, &info);
270        self.save_wallet().await
271    }
272
273    /// Remembers the new chain and its owner (if any) in the wallet.
274    pub async fn update_wallet_for_new_chain(
275        &mut self,
276        chain_id: ChainId,
277        owner: Option<AccountOwner>,
278        timestamp: Timestamp,
279    ) -> Result<(), Error> {
280        if self.wallet.get(chain_id).is_none() {
281            self.mutate_wallet(|w| {
282                w.insert(UserChain {
283                    chain_id,
284                    owner,
285                    block_hash: None,
286                    timestamp,
287                    next_block_height: BlockHeight::ZERO,
288                    pending_proposal: None,
289                })
290            })
291            .await?;
292        }
293
294        Ok(())
295    }
296
297    pub async fn process_inbox(
298        &mut self,
299        chain_client: &ChainClient<Env>,
300    ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
301        let mut certificates = Vec::new();
302        // Try processing the inbox optimistically without waiting for validator notifications.
303        let (new_certificates, maybe_timeout) = {
304            chain_client.synchronize_from_validators().await?;
305            let result = chain_client.process_inbox_without_prepare().await;
306            self.update_wallet_from_client(chain_client).await?;
307            if result.is_err() {
308                self.save_wallet().await?;
309            }
310            result?
311        };
312        certificates.extend(new_certificates);
313        if maybe_timeout.is_none() {
314            self.save_wallet().await?;
315            return Ok(certificates);
316        }
317
318        // Start listening for notifications, so we learn about new rounds and blocks.
319        let (listener, _listen_handle, mut notification_stream) = chain_client.listen().await?;
320        self.chain_listeners.spawn_task(listener);
321
322        loop {
323            let (new_certificates, maybe_timeout) = {
324                let result = chain_client.process_inbox().await;
325                self.update_wallet_from_client(chain_client).await?;
326                if result.is_err() {
327                    self.save_wallet().await?;
328                }
329                result?
330            };
331            certificates.extend(new_certificates);
332            if let Some(timestamp) = maybe_timeout {
333                util::wait_for_next_round(&mut notification_stream, timestamp).await
334            } else {
335                self.save_wallet().await?;
336                return Ok(certificates);
337            }
338        }
339    }
340
341    pub async fn assign_new_chain_to_key(
342        &mut self,
343        chain_id: ChainId,
344        owner: AccountOwner,
345    ) -> Result<(), Error> {
346        self.client.track_chain(chain_id);
347        let client = self.make_chain_client(chain_id);
348        let chain_description = client.get_chain_description().await?;
349        let config = chain_description.config();
350
351        if !config.ownership.verify_owner(&owner) {
352            tracing::error!(
353                "The chain with the ID returned by the faucet is not owned by you. \
354                Please make sure you are connecting to a genuine faucet."
355            );
356            return Err(error::Inner::ChainOwnership.into());
357        }
358
359        self.wallet_mut()
360            .mutate(|w| w.assign_new_chain_to_owner(owner, chain_id, chain_description.timestamp()))
361            .await
362            .map_err(|e| error::Inner::Persistence(Box::new(e)))?
363            .context("assigning new chain")?;
364        Ok(())
365    }
366
367    /// Applies the given function to the chain client.
368    ///
369    /// Updates the wallet regardless of the outcome. As long as the function returns a round
370    /// timeout, it will wait and retry.
371    pub async fn apply_client_command<E, F, Fut, T>(
372        &mut self,
373        client: &ChainClient<Env>,
374        mut f: F,
375    ) -> Result<T, Error>
376    where
377        F: FnMut(&ChainClient<Env>) -> Fut,
378        Fut: Future<Output = Result<ClientOutcome<T>, E>>,
379        Error: From<E>,
380    {
381        client.prepare_chain().await?;
382        // Try applying f optimistically without validator notifications. Return if committed.
383        let result = f(client).await;
384        self.update_wallet_from_client(client).await?;
385        if let ClientOutcome::Committed(t) = result? {
386            return Ok(t);
387        }
388
389        // Start listening for notifications, so we learn about new rounds and blocks.
390        let (listener, _listen_handle, mut notification_stream) = client.listen().await?;
391        self.chain_listeners.spawn_task(listener);
392
393        loop {
394            // Try applying f. Return if committed.
395            client.prepare_chain().await?;
396            let result = f(client).await;
397            self.update_wallet_from_client(client).await?;
398            let timeout = match result? {
399                ClientOutcome::Committed(t) => return Ok(t),
400                ClientOutcome::WaitForTimeout(timeout) => timeout,
401            };
402            // Otherwise wait and try again in the next round.
403            util::wait_for_next_round(&mut notification_stream, timeout).await;
404        }
405    }
406
407    pub async fn change_ownership(
408        &mut self,
409        chain_id: Option<ChainId>,
410        ownership_config: ChainOwnershipConfig,
411    ) -> Result<(), Error> {
412        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
413        let chain_client = self.make_chain_client(chain_id);
414        info!(
415            ?ownership_config, %chain_id, preferred_owner=?chain_client.preferred_owner(),
416            "Changing ownership of a chain"
417        );
418        let time_start = Instant::now();
419        let ownership = ChainOwnership::try_from(ownership_config)?;
420
421        let certificate = self
422            .apply_client_command(&chain_client, |chain_client| {
423                let ownership = ownership.clone();
424                let chain_client = chain_client.clone();
425                async move {
426                    chain_client
427                        .change_ownership(ownership)
428                        .await
429                        .map_err(Error::from)
430                        .context("Failed to change ownership")
431                }
432            })
433            .await?;
434        let time_total = time_start.elapsed();
435        info!("Operation confirmed after {} ms", time_total.as_millis());
436        debug!("{:?}", certificate);
437        Ok(())
438    }
439
440    pub async fn set_preferred_owner(
441        &mut self,
442        chain_id: Option<ChainId>,
443        preferred_owner: AccountOwner,
444    ) -> Result<(), Error> {
445        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
446        let mut chain_client = self.make_chain_client(chain_id);
447        let old_owner = chain_client.preferred_owner();
448        info!(%chain_id, ?old_owner, %preferred_owner, "Changing preferred owner for chain");
449        chain_client.set_preferred_owner(preferred_owner);
450        self.update_wallet_from_client(&chain_client).await?;
451        info!("New preferred owner set");
452        Ok(())
453    }
454
455    pub async fn check_compatible_version_info(
456        &self,
457        address: &str,
458        node: &impl ValidatorNode,
459    ) -> Result<VersionInfo, Error> {
460        match node.get_version_info().await {
461            Ok(version_info) if version_info.is_compatible_with(&linera_version::VERSION_INFO) => {
462                info!(
463                    "Version information for validator {address}: {}",
464                    version_info
465                );
466                Ok(version_info)
467            }
468            Ok(version_info) => Err(error::Inner::UnexpectedVersionInfo {
469                remote: Box::new(version_info),
470                local: Box::new(linera_version::VERSION_INFO.clone()),
471            }
472            .into()),
473            Err(error) => Err(error::Inner::UnavailableVersionInfo {
474                address: address.to_string(),
475                error: Box::new(error),
476            }
477            .into()),
478        }
479    }
480
481    pub async fn check_matching_network_description(
482        &self,
483        address: &str,
484        node: &impl ValidatorNode,
485    ) -> Result<CryptoHash, Error> {
486        let network_description = self.wallet().genesis_config().network_description();
487        match node.get_network_description().await {
488            Ok(description) => {
489                if description == network_description {
490                    Ok(description.genesis_config_hash)
491                } else {
492                    Err(error::Inner::UnexpectedNetworkDescription {
493                        remote: Box::new(description),
494                        local: Box::new(network_description),
495                    }
496                    .into())
497                }
498            }
499            Err(error) => Err(error::Inner::UnavailableNetworkDescription {
500                address: address.to_string(),
501                error: Box::new(error),
502            }
503            .into()),
504        }
505    }
506
507    pub async fn check_validator_chain_info_response(
508        &self,
509        public_key: Option<&ValidatorPublicKey>,
510        address: &str,
511        node: &impl ValidatorNode,
512        chain_id: ChainId,
513    ) -> Result<(), Error> {
514        let query = ChainInfoQuery::new(chain_id);
515        match node.handle_chain_info_query(query).await {
516            Ok(response) => {
517                info!(
518                    "Validator {address} sees chain {chain_id} at block height {} and epoch {:?}",
519                    response.info.next_block_height, response.info.epoch,
520                );
521                if let Some(public_key) = public_key {
522                    if response.check(*public_key).is_ok() {
523                        info!("Signature for public key {public_key} is OK.");
524                    } else {
525                        return Err(error::Inner::InvalidSignature {
526                            public_key: *public_key,
527                        }
528                        .into());
529                    }
530                }
531                Ok(())
532            }
533            Err(error) => Err(error::Inner::UnavailableChainInfo {
534                address: address.to_string(),
535                chain_id,
536                error: Box::new(error),
537            }
538            .into()),
539        }
540    }
541}
542
543#[cfg(feature = "fs")]
544impl<Env: Environment, W> ClientContext<Env, W>
545where
546    W: Persist<Target = Wallet>,
547{
548    pub async fn publish_module(
549        &mut self,
550        chain_client: &ChainClient<Env>,
551        contract: PathBuf,
552        service: PathBuf,
553        vm_runtime: VmRuntime,
554    ) -> Result<ModuleId, Error> {
555        info!("Loading bytecode files");
556        let contract_bytecode = Bytecode::load_from_file(&contract)
557            .await
558            .with_context(|| format!("failed to load contract bytecode from {:?}", &contract))?;
559        let service_bytecode = Bytecode::load_from_file(&service)
560            .await
561            .with_context(|| format!("failed to load service bytecode from {:?}", &service))?;
562
563        info!("Publishing module");
564        let (blobs, module_id) =
565            create_bytecode_blobs(contract_bytecode, service_bytecode, vm_runtime).await;
566        let (module_id, _) = self
567            .apply_client_command(chain_client, |chain_client| {
568                let blobs = blobs.clone();
569                let chain_client = chain_client.clone();
570                async move {
571                    chain_client
572                        .publish_module_blobs(blobs, module_id)
573                        .await
574                        .context("Failed to publish module")
575                }
576            })
577            .await?;
578
579        info!("{}", "Module published successfully!");
580
581        info!("Synchronizing client and processing inbox");
582        self.process_inbox(chain_client).await?;
583        Ok(module_id)
584    }
585
586    pub async fn publish_data_blob(
587        &mut self,
588        chain_client: &ChainClient<Env>,
589        blob_path: PathBuf,
590    ) -> Result<CryptoHash, Error> {
591        info!("Loading data blob file");
592        let blob_bytes = fs::read(&blob_path).context(format!(
593            "failed to load data blob bytes from {:?}",
594            &blob_path
595        ))?;
596
597        info!("Publishing data blob");
598        self.apply_client_command(chain_client, |chain_client| {
599            let blob_bytes = blob_bytes.clone();
600            let chain_client = chain_client.clone();
601            async move {
602                chain_client
603                    .publish_data_blob(blob_bytes)
604                    .await
605                    .context("Failed to publish data blob")
606            }
607        })
608        .await?;
609
610        info!("{}", "Data blob published successfully!");
611        Ok(CryptoHash::new(&BlobContent::new_data(blob_bytes)))
612    }
613
614    // TODO(#2490): Consider removing or renaming this.
615    pub async fn read_data_blob(
616        &mut self,
617        chain_client: &ChainClient<Env>,
618        hash: CryptoHash,
619    ) -> Result<(), Error> {
620        info!("Verifying data blob");
621        self.apply_client_command(chain_client, |chain_client| {
622            let chain_client = chain_client.clone();
623            async move {
624                chain_client
625                    .read_data_blob(hash)
626                    .await
627                    .context("Failed to verify data blob")
628            }
629        })
630        .await?;
631
632        info!("{}", "Data blob verified successfully!");
633        Ok(())
634    }
635}
636
637#[cfg(feature = "benchmark")]
638impl<Env: Environment, W> ClientContext<Env, W>
639where
640    W: Persist<Target = Wallet>,
641{
642    pub async fn prepare_for_benchmark(
643        &mut self,
644        num_chain_groups: usize,
645        num_chains_per_chain_group: usize,
646        transactions_per_block: usize,
647        tokens_per_chain: Amount,
648        fungible_application_id: Option<ApplicationId>,
649        pub_keys: Vec<AccountPublicKey>,
650    ) -> Result<
651        (
652            Vec<Vec<ChainClient<Env>>>,
653            Vec<Vec<(Vec<Operation>, AccountOwner)>>,
654            Committee,
655        ),
656        Error,
657    > {
658        let start = Instant::now();
659        // Below all block proposals are supposed to succeed without retries, we
660        // must make sure that all incoming payments have been accepted on-chain
661        // and that no validator is missing user certificates.
662        self.process_inboxes_and_force_validator_updates().await;
663        info!(
664            "Processed inboxes and forced validator updates in {} ms",
665            start.elapsed().as_millis()
666        );
667
668        let start = Instant::now();
669        let (benchmark_chains, chain_clients) = self
670            .make_benchmark_chains(
671                num_chain_groups,
672                num_chains_per_chain_group,
673                tokens_per_chain,
674                pub_keys,
675            )
676            .await?;
677        info!(
678            "Got {} chains in {} ms",
679            num_chain_groups * num_chains_per_chain_group,
680            start.elapsed().as_millis()
681        );
682
683        if let Some(id) = fungible_application_id {
684            let start = Instant::now();
685            self.supply_fungible_tokens(&benchmark_chains, id).await?;
686            info!(
687                "Supplied fungible tokens in {} ms",
688                start.elapsed().as_millis()
689            );
690            // Need to process inboxes to make sure the chains receive the supplied tokens.
691            let start = Instant::now();
692            for chain_client in chain_clients.iter().flatten() {
693                chain_client.process_inbox().await?;
694            }
695            info!(
696                "Processed inboxes after supplying fungible tokens in {} ms",
697                start.elapsed().as_millis()
698            );
699        }
700
701        let default_chain_id = self
702            .wallet
703            .default_chain()
704            .expect("should have default chain");
705        let default_chain_client = self.make_chain_client(default_chain_id);
706        let committee = default_chain_client.admin_committee().await?.1;
707        let blocks_infos = Benchmark::<Env>::make_benchmark_block_info(
708            benchmark_chains,
709            transactions_per_block,
710            fungible_application_id,
711        );
712
713        Ok((chain_clients, blocks_infos, committee))
714    }
715
716    pub async fn wrap_up_benchmark(
717        &mut self,
718        chain_clients: Vec<Vec<ChainClient<Env>>>,
719        close_chains: bool,
720        wrap_up_max_in_flight: usize,
721    ) -> Result<(), Error> {
722        if close_chains {
723            info!("Closing chains...");
724            let stream = stream::iter(chain_clients.into_iter().flatten())
725                .map(|chain_client| async move {
726                    Benchmark::<Env>::close_benchmark_chain(&chain_client).await?;
727                    info!("Closed chain {:?}", chain_client.chain_id());
728                    Ok::<(), BenchmarkError>(())
729                })
730                .buffer_unordered(wrap_up_max_in_flight);
731            stream.try_collect::<Vec<_>>().await?;
732        } else {
733            info!("Processing inbox for all chains...");
734            let stream = stream::iter(chain_clients.iter().flatten().cloned())
735                .map(|chain_client| async move {
736                    chain_client.process_inbox().await?;
737                    info!("Processed inbox for chain {:?}", chain_client.chain_id());
738                    Ok::<(), ChainClientError>(())
739                })
740                .buffer_unordered(wrap_up_max_in_flight);
741            stream.try_collect::<Vec<_>>().await?;
742
743            info!("Updating wallet from chain clients...");
744            for chain_client in chain_clients.iter().flatten() {
745                let info = chain_client.chain_info().await?;
746                let client_owner = chain_client.preferred_owner();
747                let pending_proposal = chain_client.pending_proposal().clone();
748                self.wallet
749                    .as_mut()
750                    .update_from_info(pending_proposal, client_owner, &info);
751            }
752            self.save_wallet().await?;
753        }
754
755        Ok(())
756    }
757
758    async fn process_inboxes_and_force_validator_updates(&mut self) {
759        let mut chain_clients = vec![];
760        for chain_id in &self.wallet.owned_chain_ids() {
761            chain_clients.push(self.make_chain_client(*chain_id));
762        }
763
764        let mut join_set = task::JoinSet::new();
765        for chain_client in chain_clients {
766            join_set.spawn(async move {
767                Self::process_inbox_without_updating_wallet(&chain_client)
768                    .await
769                    .expect("Processing inbox should not fail!");
770                chain_client
771            });
772        }
773
774        let chain_clients = join_set.join_all().await;
775        for chain_client in &chain_clients {
776            self.update_wallet_from_client(chain_client).await.unwrap();
777        }
778    }
779
780    async fn process_inbox_without_updating_wallet(
781        chain_client: &ChainClient<Env>,
782    ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
783        // Try processing the inbox optimistically without waiting for validator notifications.
784        chain_client.synchronize_from_validators().await?;
785        let (certificates, maybe_timeout) = chain_client.process_inbox_without_prepare().await?;
786        assert!(
787            maybe_timeout.is_none(),
788            "Should not timeout within benchmark!"
789        );
790
791        Ok(certificates)
792    }
793
794    #[expect(clippy::too_many_arguments)]
795    fn insert_chain(
796        benchmark_chains: &mut Vec<Vec<(ChainId, AccountOwner)>>,
797        chain_clients: &mut Vec<Vec<ChainClient<Env>>>,
798        chain_group: &mut Vec<(ChainId, AccountOwner)>,
799        chain_clients_group: &mut Vec<ChainClient<Env>>,
800        num_chains_per_chain_group: usize,
801        chain_id: ChainId,
802        owner: AccountOwner,
803        chain_client: ChainClient<Env>,
804    ) {
805        chain_group.push((chain_id, owner));
806        chain_clients_group.push(chain_client);
807
808        if chain_group.len() == num_chains_per_chain_group
809            && chain_clients_group.len() == num_chains_per_chain_group
810        {
811            benchmark_chains.push(chain_group.clone());
812            chain_clients.push(chain_clients_group.clone());
813            chain_group.clear();
814            chain_clients_group.clear();
815        }
816    }
817
818    /// Creates chains if necessary, and returns a map of exactly `num_chains` chain IDs
819    /// with key pairs, as well as a map of the chain clients.
820    async fn make_benchmark_chains(
821        &mut self,
822        num_chain_groups: usize,
823        num_chains_per_chain_group: usize,
824        balance: Amount,
825        pub_keys: Vec<AccountPublicKey>,
826    ) -> Result<
827        (
828            Vec<Vec<(ChainId, AccountOwner)>>,
829            Vec<Vec<ChainClient<Env>>>,
830        ),
831        Error,
832    > {
833        let total_chains_to_create = num_chain_groups * num_chains_per_chain_group;
834        let mut chains_found_in_wallet = 0;
835        let mut benchmark_chains = Vec::with_capacity(num_chain_groups);
836        let mut chain_clients = Vec::with_capacity(num_chain_groups);
837        let mut chain_group = Vec::with_capacity(num_chains_per_chain_group);
838        let mut chain_clients_group = Vec::with_capacity(num_chains_per_chain_group);
839        let start = Instant::now();
840        for chain_id in self.wallet.owned_chain_ids() {
841            if chains_found_in_wallet == total_chains_to_create {
842                break;
843            }
844            // This should never panic, because `owned_chain_ids` only returns the owned chains that
845            // we have a key pair for.
846            let owner = self
847                .wallet
848                .get(chain_id)
849                .and_then(|chain| chain.owner)
850                .unwrap();
851            let chain_client = self.make_chain_client(chain_id);
852            let ownership = chain_client.chain_info().await?.manager.ownership;
853            if !ownership.owners.is_empty() || ownership.super_owners.len() != 1 {
854                continue;
855            }
856            chain_client.process_inbox().await?;
857            Self::insert_chain(
858                &mut benchmark_chains,
859                &mut chain_clients,
860                &mut chain_group,
861                &mut chain_clients_group,
862                num_chains_per_chain_group,
863                chain_id,
864                owner,
865                chain_client,
866            );
867            chains_found_in_wallet += 1;
868        }
869        info!(
870            "Got {} chains from the wallet in {} ms",
871            benchmark_chains.len(),
872            start.elapsed().as_millis()
873        );
874
875        let num_chains_to_create = total_chains_to_create - chains_found_in_wallet;
876
877        let default_chain_id = self
878            .wallet
879            .default_chain()
880            .expect("should have default chain");
881        let operations_per_block = 900; // Over this we seem to hit the block size limits.
882
883        let mut pub_keys_iter = pub_keys.into_iter().take(num_chains_to_create);
884        let default_chain_client = self.make_chain_client(default_chain_id);
885
886        for i in (0..num_chains_to_create).step_by(operations_per_block) {
887            let num_new_chains = operations_per_block.min(num_chains_to_create - i);
888            let pub_key = pub_keys_iter.next().unwrap();
889            let owner = pub_key.into();
890
891            let certificate = Self::execute_open_chains_operations(
892                num_new_chains,
893                &default_chain_client,
894                balance,
895                owner,
896            )
897            .await?;
898            info!("Block executed successfully");
899
900            let block = certificate.block();
901            for i in 0..num_new_chains {
902                let chain_id = block.body.blobs[i]
903                    .iter()
904                    .find(|blob| blob.id().blob_type == BlobType::ChainDescription)
905                    .map(|blob| ChainId(blob.id().hash))
906                    .expect("failed to create a new chain");
907                self.client.track_chain(chain_id);
908
909                let mut chain_client = self.client.create_chain_client(
910                    chain_id,
911                    None,
912                    BlockHeight::ZERO,
913                    None,
914                    Some(owner),
915                );
916                chain_client.set_preferred_owner(owner);
917                chain_client.process_inbox().await?;
918                Self::insert_chain(
919                    &mut benchmark_chains,
920                    &mut chain_clients,
921                    &mut chain_group,
922                    &mut chain_clients_group,
923                    num_chains_per_chain_group,
924                    chain_id,
925                    owner,
926                    chain_client,
927                );
928            }
929        }
930
931        if num_chains_to_create > 0 {
932            info!(
933                "Created {} chains in {} ms",
934                num_chains_to_create,
935                start.elapsed().as_millis()
936            );
937        }
938
939        info!("Updating wallet from client");
940        self.update_wallet_from_client(&default_chain_client)
941            .await?;
942        info!("Retrying pending outgoing messages");
943        default_chain_client
944            .retry_pending_outgoing_messages()
945            .await
946            .context("outgoing messages to create the new chains should be delivered")?;
947        info!("Processing default chain inbox");
948        default_chain_client.process_inbox().await?;
949
950        Ok((benchmark_chains, chain_clients))
951    }
952
953    async fn execute_open_chains_operations(
954        num_new_chains: usize,
955        chain_client: &ChainClient<Env>,
956        balance: Amount,
957        owner: AccountOwner,
958    ) -> Result<ConfirmedBlockCertificate, Error> {
959        let config = OpenChainConfig {
960            ownership: ChainOwnership::single_super(owner),
961            balance,
962            application_permissions: Default::default(),
963        };
964        let operations = iter::repeat_n(
965            Operation::system(SystemOperation::OpenChain(config)),
966            num_new_chains,
967        )
968        .collect();
969        info!("Executing {} OpenChain operations", num_new_chains);
970        Ok(chain_client
971            .execute_operations(operations, vec![])
972            .await?
973            .expect("should execute block with OpenChain operations"))
974    }
975
976    /// Supplies fungible tokens to the chains.
977    async fn supply_fungible_tokens(
978        &mut self,
979        key_pairs: &[Vec<(ChainId, AccountOwner)>],
980        application_id: ApplicationId,
981    ) -> Result<(), Error> {
982        let default_chain_id = self
983            .wallet
984            .default_chain()
985            .expect("should have default chain");
986        let default_key = self.wallet.get(default_chain_id).unwrap().owner.unwrap();
987        // This should be enough to run the benchmark at 1M TPS for an hour.
988        let amount = Amount::from_nanos(4);
989        let operations: Vec<Operation> = key_pairs
990            .iter()
991            .flat_map(|chain_group| {
992                chain_group
993                    .iter()
994                    .map(|(chain_id, owner)| {
995                        Benchmark::<Env>::fungible_transfer(
996                            application_id,
997                            *chain_id,
998                            default_key,
999                            *owner,
1000                            amount,
1001                        )
1002                    })
1003                    .collect::<Vec<_>>()
1004            })
1005            .collect();
1006        let chain_client = self.make_chain_client(default_chain_id);
1007        // Put at most 1000 fungible token operations in each block.
1008        for operation_chunk in operations.chunks(1000) {
1009            chain_client
1010                .execute_operations(operation_chunk.to_vec(), vec![])
1011                .await?
1012                .expect("should execute block with Transfer operations");
1013        }
1014        self.update_wallet_from_client(&chain_client).await?;
1015
1016        Ok(())
1017    }
1018}