1#[cfg(with_testing)]
5use std::num::NonZeroUsize;
6use std::sync::Arc;
7
8use futures::Future;
9use linera_base::{
10 crypto::{CryptoHash, ValidatorPublicKey},
11 data_types::{BlockHeight, Timestamp},
12 identifiers::{Account, AccountOwner, ChainId},
13 ownership::ChainOwnership,
14 time::{Duration, Instant},
15};
16use linera_chain::types::ConfirmedBlockCertificate;
17use linera_core::{
18 client::{ChainClient, Client},
19 data_types::{ChainInfoQuery, ClientOutcome},
20 join_set_ext::JoinSet,
21 node::ValidatorNode,
22 Environment, JoinSetExt as _,
23};
24use linera_persistent::{Persist, PersistExt as _};
25use linera_rpc::node_provider::{NodeOptions, NodeProvider};
26use linera_version::VersionInfo;
27use thiserror_context::Context;
28use tracing::{debug, info};
29#[cfg(feature = "benchmark")]
30use {
31 crate::benchmark::{Benchmark, BenchmarkError},
32 futures::{stream, StreamExt, TryStreamExt},
33 linera_base::{
34 crypto::AccountPublicKey,
35 data_types::{Amount, Epoch},
36 identifiers::ApplicationId,
37 },
38 linera_core::client::ChainClientError,
39 linera_execution::{
40 committee::Committee,
41 system::{OpenChainConfig, SystemOperation},
42 Operation,
43 },
44 std::{collections::HashMap, iter},
45 tokio::task,
46};
47#[cfg(feature = "fs")]
48use {
49 linera_base::{
50 data_types::{BlobContent, Bytecode},
51 identifiers::ModuleId,
52 vm::VmRuntime,
53 },
54 linera_core::client::create_bytecode_blobs,
55 std::{fs, path::PathBuf},
56};
57
58use crate::{
59 chain_listener::{self, ClientContext as _, ClientContextExt as _},
60 client_options::{ChainOwnershipConfig, ClientContextOptions},
61 error, util,
62 wallet::{UserChain, Wallet},
63 Error,
64};
65
66pub struct ClientContext<Env: Environment, W> {
67 pub wallet: W,
68 pub client: Arc<Client<Env>>,
69 pub send_timeout: Duration,
70 pub recv_timeout: Duration,
71 pub retry_delay: Duration,
72 pub max_retries: u32,
73 pub chain_listeners: JoinSet,
74}
75
76impl<Env: Environment, W> chain_listener::ClientContext for ClientContext<Env, W>
77where
78 W: Persist<Target = Wallet>,
79{
80 type Environment = Env;
81
82 fn wallet(&self) -> &Wallet {
83 &self.wallet
84 }
85
86 fn storage(&self) -> &Env::Storage {
87 self.client.storage_client()
88 }
89
90 fn client(&self) -> &Arc<Client<Env>> {
91 &self.client
92 }
93
94 async fn update_wallet_for_new_chain(
95 &mut self,
96 chain_id: ChainId,
97 owner: Option<AccountOwner>,
98 timestamp: Timestamp,
99 ) -> Result<(), Error> {
100 self.update_wallet_for_new_chain(chain_id, owner, timestamp)
101 .await
102 }
103
104 async fn update_wallet(&mut self, client: &ChainClient<Env>) -> Result<(), Error> {
105 self.update_wallet_from_client(client).await
106 }
107}
108
109impl<S, Si, W> ClientContext<linera_core::environment::Impl<S, NodeProvider, Si>, W>
110where
111 S: linera_core::environment::Storage,
112 Si: linera_core::environment::Signer,
113 W: Persist<Target = Wallet>,
114{
115 pub fn new(storage: S, options: ClientContextOptions, wallet: W, signer: Si) -> Self {
116 let node_provider = NodeProvider::new(NodeOptions {
117 send_timeout: options.send_timeout,
118 recv_timeout: options.recv_timeout,
119 retry_delay: options.retry_delay,
120 max_retries: options.max_retries,
121 });
122 let chain_ids = wallet.chain_ids();
123 let name = match chain_ids.len() {
124 0 => "Client node".to_string(),
125 1 => format!("Client node for {:.8}", chain_ids[0]),
126 n => format!("Client node for {:.8} and {} others", chain_ids[0], n - 1),
127 };
128 let client = Client::new(
129 linera_core::environment::Impl {
130 network: node_provider,
131 storage,
132 signer,
133 },
134 wallet.genesis_admin_chain(),
135 options.long_lived_services,
136 chain_ids,
137 name,
138 options.max_loaded_chains,
139 options.to_chain_client_options(),
140 );
141
142 ClientContext {
143 client: Arc::new(client),
144 wallet,
145 send_timeout: options.send_timeout,
146 recv_timeout: options.recv_timeout,
147 retry_delay: options.retry_delay,
148 max_retries: options.max_retries,
149 chain_listeners: JoinSet::default(),
150 }
151 }
152
153 #[cfg(with_testing)]
154 pub fn new_test_client_context(storage: S, wallet: W, signer: Si) -> Self {
155 use linera_core::{client::ChainClientOptions, node::CrossChainMessageDelivery};
156
157 let send_recv_timeout = Duration::from_millis(4000);
158 let retry_delay = Duration::from_millis(1000);
159 let max_retries = 10;
160
161 let node_options = NodeOptions {
162 send_timeout: send_recv_timeout,
163 recv_timeout: send_recv_timeout,
164 retry_delay,
165 max_retries,
166 };
167 let chain_ids = wallet.chain_ids();
168 let name = match chain_ids.len() {
169 0 => "Client node".to_string(),
170 1 => format!("Client node for {:.8}", chain_ids[0]),
171 n => format!("Client node for {:.8} and {} others", chain_ids[0], n - 1),
172 };
173 let client = Client::new(
174 linera_core::environment::Impl {
175 storage,
176 network: NodeProvider::new(node_options),
177 signer,
178 },
179 wallet.genesis_admin_chain(),
180 false,
181 chain_ids,
182 name,
183 NonZeroUsize::new(20).expect("Chain worker limit should not be zero"),
184 ChainClientOptions {
185 cross_chain_message_delivery: CrossChainMessageDelivery::Blocking,
186 ..ChainClientOptions::test_default()
187 },
188 );
189
190 ClientContext {
191 client: Arc::new(client),
192 wallet,
193 send_timeout: send_recv_timeout,
194 recv_timeout: send_recv_timeout,
195 retry_delay,
196 max_retries,
197 chain_listeners: JoinSet::default(),
198 }
199 }
200}
201
202impl<Env: Environment, W: Persist<Target = Wallet>> ClientContext<Env, W> {
203 pub fn wallet(&self) -> &Wallet {
205 &self.wallet
206 }
207
208 pub fn wallet_mut(&mut self) -> &mut W {
210 &mut self.wallet
211 }
212
213 pub async fn mutate_wallet<R: Send>(
214 &mut self,
215 mutation: impl FnOnce(&mut Wallet) -> R + Send,
216 ) -> Result<R, Error> {
217 self.wallet
218 .mutate(mutation)
219 .await
220 .map_err(|e| error::Inner::Persistence(Box::new(e)).into())
221 }
222
223 pub fn default_account(&self) -> Account {
226 Account::chain(self.default_chain())
227 }
228
229 pub fn default_chain(&self) -> ChainId {
231 self.wallet
232 .default_chain()
233 .expect("No chain specified in wallet with no default chain")
234 }
235
236 pub fn first_non_admin_chain(&self) -> ChainId {
237 self.wallet
238 .first_non_admin_chain()
239 .expect("No non-admin chain specified in wallet with no non-admin chain")
240 }
241
242 pub fn make_node_provider(&self) -> NodeProvider {
243 NodeProvider::new(self.make_node_options())
244 }
245
246 fn make_node_options(&self) -> NodeOptions {
247 NodeOptions {
248 send_timeout: self.send_timeout,
249 recv_timeout: self.recv_timeout,
250 retry_delay: self.retry_delay,
251 max_retries: self.max_retries,
252 }
253 }
254
255 pub async fn save_wallet(&mut self) -> Result<(), Error> {
256 self.wallet
257 .persist()
258 .await
259 .map_err(|e| error::Inner::Persistence(Box::new(e)).into())
260 }
261
262 pub async fn update_wallet_from_client<Env_: Environment>(
263 &mut self,
264 client: &ChainClient<Env_>,
265 ) -> Result<(), Error> {
266 let info = client.chain_info().await?;
267 let client_owner = client.preferred_owner();
268 let pending_proposal = client.pending_proposal().clone();
269 self.wallet
270 .as_mut()
271 .update_from_info(pending_proposal, client_owner, &info);
272 self.save_wallet().await
273 }
274
275 pub async fn update_wallet_for_new_chain(
277 &mut self,
278 chain_id: ChainId,
279 owner: Option<AccountOwner>,
280 timestamp: Timestamp,
281 ) -> Result<(), Error> {
282 if self.wallet.get(chain_id).is_none() {
283 self.mutate_wallet(|w| {
284 w.insert(UserChain {
285 chain_id,
286 owner,
287 block_hash: None,
288 timestamp,
289 next_block_height: BlockHeight::ZERO,
290 pending_proposal: None,
291 })
292 })
293 .await?;
294 }
295
296 Ok(())
297 }
298
299 pub async fn process_inbox(
300 &mut self,
301 chain_client: &ChainClient<Env>,
302 ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
303 let mut certificates = Vec::new();
304 let (new_certificates, maybe_timeout) = {
306 chain_client.synchronize_from_validators().await?;
307 let result = chain_client.process_inbox_without_prepare().await;
308 self.update_wallet_from_client(chain_client).await?;
309 if result.is_err() {
310 self.save_wallet().await?;
311 }
312 result?
313 };
314 certificates.extend(new_certificates);
315 if maybe_timeout.is_none() {
316 self.save_wallet().await?;
317 return Ok(certificates);
318 }
319
320 let (listener, _listen_handle, mut notification_stream) = chain_client.listen().await?;
322 self.chain_listeners.spawn_task(listener);
323
324 loop {
325 let (new_certificates, maybe_timeout) = {
326 let result = chain_client.process_inbox().await;
327 self.update_wallet_from_client(chain_client).await?;
328 if result.is_err() {
329 self.save_wallet().await?;
330 }
331 result?
332 };
333 certificates.extend(new_certificates);
334 if let Some(timestamp) = maybe_timeout {
335 util::wait_for_next_round(&mut notification_stream, timestamp).await
336 } else {
337 self.save_wallet().await?;
338 return Ok(certificates);
339 }
340 }
341 }
342
343 pub async fn assign_new_chain_to_key(
344 &mut self,
345 chain_id: ChainId,
346 owner: AccountOwner,
347 ) -> Result<(), Error> {
348 self.client.track_chain(chain_id);
349 let chain_description = self.chain_description(chain_id).await?;
350 let config = chain_description.config();
351
352 if !config.ownership.verify_owner(&owner) {
353 tracing::error!(
354 "The chain with the ID returned by the faucet is not owned by you. \
355 Please make sure you are connecting to a genuine faucet."
356 );
357 return Err(error::Inner::ChainOwnership.into());
358 }
359
360 self.wallet_mut()
361 .mutate(|w| w.assign_new_chain_to_owner(owner, chain_id, chain_description.timestamp()))
362 .await
363 .map_err(|e| error::Inner::Persistence(Box::new(e)))?
364 .context("assigning new chain")?;
365 Ok(())
366 }
367
368 pub async fn apply_client_command<E, F, Fut, T>(
373 &mut self,
374 client: &ChainClient<Env>,
375 mut f: F,
376 ) -> Result<T, Error>
377 where
378 F: FnMut(&ChainClient<Env>) -> Fut,
379 Fut: Future<Output = Result<ClientOutcome<T>, E>>,
380 Error: From<E>,
381 {
382 client.prepare_chain().await?;
383 let result = f(client).await;
385 self.update_wallet_from_client(client).await?;
386 if let ClientOutcome::Committed(t) = result? {
387 return Ok(t);
388 }
389
390 let (listener, _listen_handle, mut notification_stream) = client.listen().await?;
392 self.chain_listeners.spawn_task(listener);
393
394 loop {
395 client.prepare_chain().await?;
397 let result = f(client).await;
398 self.update_wallet_from_client(client).await?;
399 let timeout = match result? {
400 ClientOutcome::Committed(t) => return Ok(t),
401 ClientOutcome::WaitForTimeout(timeout) => timeout,
402 };
403 util::wait_for_next_round(&mut notification_stream, timeout).await;
405 }
406 }
407
408 pub async fn change_ownership(
409 &mut self,
410 chain_id: Option<ChainId>,
411 ownership_config: ChainOwnershipConfig,
412 ) -> Result<(), Error> {
413 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
414 let chain_client = self.make_chain_client(chain_id);
415 info!(
416 ?ownership_config, %chain_id, preferred_owner=?chain_client.preferred_owner(),
417 "Changing ownership of a chain"
418 );
419 let time_start = Instant::now();
420 let ownership = ChainOwnership::try_from(ownership_config)?;
421
422 let certificate = self
423 .apply_client_command(&chain_client, |chain_client| {
424 let ownership = ownership.clone();
425 let chain_client = chain_client.clone();
426 async move {
427 chain_client
428 .change_ownership(ownership)
429 .await
430 .map_err(Error::from)
431 .context("Failed to change ownership")
432 }
433 })
434 .await?;
435 let time_total = time_start.elapsed();
436 info!("Operation confirmed after {} ms", time_total.as_millis());
437 debug!("{:?}", certificate);
438 Ok(())
439 }
440
441 pub async fn set_preferred_owner(
442 &mut self,
443 chain_id: Option<ChainId>,
444 preferred_owner: AccountOwner,
445 ) -> Result<(), Error> {
446 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
447 let mut chain_client = self.make_chain_client(chain_id);
448 let old_owner = chain_client.preferred_owner();
449 info!(%chain_id, ?old_owner, %preferred_owner, "Changing preferred owner for chain");
450 chain_client.set_preferred_owner(preferred_owner);
451 self.update_wallet_from_client(&chain_client).await?;
452 info!("New preferred owner set");
453 Ok(())
454 }
455
456 pub async fn check_compatible_version_info(
457 &self,
458 address: &str,
459 node: &impl ValidatorNode,
460 ) -> Result<VersionInfo, Error> {
461 match node.get_version_info().await {
462 Ok(version_info) if version_info.is_compatible_with(&linera_version::VERSION_INFO) => {
463 info!(
464 "Version information for validator {address}: {}",
465 version_info
466 );
467 Ok(version_info)
468 }
469 Ok(version_info) => Err(error::Inner::UnexpectedVersionInfo {
470 remote: Box::new(version_info),
471 local: Box::new(linera_version::VERSION_INFO.clone()),
472 }
473 .into()),
474 Err(error) => Err(error::Inner::UnavailableVersionInfo {
475 address: address.to_string(),
476 error: Box::new(error),
477 }
478 .into()),
479 }
480 }
481
482 pub async fn check_matching_network_description(
483 &self,
484 address: &str,
485 node: &impl ValidatorNode,
486 ) -> Result<CryptoHash, Error> {
487 let network_description = self.wallet().genesis_config().network_description();
488 match node.get_network_description().await {
489 Ok(description) => {
490 if description == network_description {
491 Ok(description.genesis_config_hash)
492 } else {
493 Err(error::Inner::UnexpectedNetworkDescription {
494 remote: Box::new(description),
495 local: Box::new(network_description),
496 }
497 .into())
498 }
499 }
500 Err(error) => Err(error::Inner::UnavailableNetworkDescription {
501 address: address.to_string(),
502 error: Box::new(error),
503 }
504 .into()),
505 }
506 }
507
508 pub async fn check_validator_chain_info_response(
509 &self,
510 public_key: Option<&ValidatorPublicKey>,
511 address: &str,
512 node: &impl ValidatorNode,
513 chain_id: ChainId,
514 ) -> Result<(), Error> {
515 let query = ChainInfoQuery::new(chain_id);
516 match node.handle_chain_info_query(query).await {
517 Ok(response) => {
518 info!(
519 "Validator {address} sees chain {chain_id} at block height {} and epoch {:?}",
520 response.info.next_block_height, response.info.epoch,
521 );
522 if let Some(public_key) = public_key {
523 if response.check(public_key).is_ok() {
524 info!("Signature for public key {public_key} is OK.");
525 } else {
526 return Err(error::Inner::InvalidSignature {
527 public_key: *public_key,
528 }
529 .into());
530 }
531 }
532 Ok(())
533 }
534 Err(error) => Err(error::Inner::UnavailableChainInfo {
535 address: address.to_string(),
536 chain_id,
537 error: Box::new(error),
538 }
539 .into()),
540 }
541 }
542}
543
544#[cfg(feature = "fs")]
545impl<Env: Environment, W> ClientContext<Env, W>
546where
547 W: Persist<Target = Wallet>,
548{
549 pub async fn publish_module(
550 &mut self,
551 chain_client: &ChainClient<Env>,
552 contract: PathBuf,
553 service: PathBuf,
554 vm_runtime: VmRuntime,
555 ) -> Result<ModuleId, Error> {
556 info!("Loading bytecode files");
557 let contract_bytecode = Bytecode::load_from_file(&contract)
558 .await
559 .with_context(|| format!("failed to load contract bytecode from {:?}", &contract))?;
560 let service_bytecode = Bytecode::load_from_file(&service)
561 .await
562 .with_context(|| format!("failed to load service bytecode from {:?}", &service))?;
563
564 info!("Publishing module");
565 let (blobs, module_id) =
566 create_bytecode_blobs(contract_bytecode, service_bytecode, vm_runtime).await;
567 let (module_id, _) = self
568 .apply_client_command(chain_client, |chain_client| {
569 let blobs = blobs.clone();
570 let chain_client = chain_client.clone();
571 async move {
572 chain_client
573 .publish_module_blobs(blobs, module_id)
574 .await
575 .context("Failed to publish module")
576 }
577 })
578 .await?;
579
580 info!("{}", "Module published successfully!");
581
582 info!("Synchronizing client and processing inbox");
583 self.process_inbox(chain_client).await?;
584 Ok(module_id)
585 }
586
587 pub async fn publish_data_blob(
588 &mut self,
589 chain_client: &ChainClient<Env>,
590 blob_path: PathBuf,
591 ) -> Result<CryptoHash, Error> {
592 info!("Loading data blob file");
593 let blob_bytes = fs::read(&blob_path).context(format!(
594 "failed to load data blob bytes from {:?}",
595 &blob_path
596 ))?;
597
598 info!("Publishing data blob");
599 self.apply_client_command(chain_client, |chain_client| {
600 let blob_bytes = blob_bytes.clone();
601 let chain_client = chain_client.clone();
602 async move {
603 chain_client
604 .publish_data_blob(blob_bytes)
605 .await
606 .context("Failed to publish data blob")
607 }
608 })
609 .await?;
610
611 info!("{}", "Data blob published successfully!");
612 Ok(CryptoHash::new(&BlobContent::new_data(blob_bytes)))
613 }
614
615 pub async fn read_data_blob(
617 &mut self,
618 chain_client: &ChainClient<Env>,
619 hash: CryptoHash,
620 ) -> Result<(), Error> {
621 info!("Verifying data blob");
622 self.apply_client_command(chain_client, |chain_client| {
623 let chain_client = chain_client.clone();
624 async move {
625 chain_client
626 .read_data_blob(hash)
627 .await
628 .context("Failed to verify data blob")
629 }
630 })
631 .await?;
632
633 info!("{}", "Data blob verified successfully!");
634 Ok(())
635 }
636}
637
638#[cfg(feature = "benchmark")]
639impl<Env: Environment, W> ClientContext<Env, W>
640where
641 W: Persist<Target = Wallet>,
642{
643 pub async fn prepare_for_benchmark(
644 &mut self,
645 num_chains: usize,
646 transactions_per_block: usize,
647 tokens_per_chain: Amount,
648 fungible_application_id: Option<ApplicationId>,
649 pub_keys: Vec<AccountPublicKey>,
650 ) -> Result<
651 (
652 HashMap<ChainId, ChainClient<Env>>,
653 Epoch,
654 Vec<(ChainId, Vec<Operation>, AccountOwner)>,
655 Committee,
656 ),
657 Error,
658 > {
659 let start = Instant::now();
660 self.process_inboxes_and_force_validator_updates().await;
664 info!(
665 "Processed inboxes and forced validator updates in {} ms",
666 start.elapsed().as_millis()
667 );
668
669 let start = Instant::now();
670 let (key_pairs, chain_clients) = self
671 .make_benchmark_chains(num_chains, tokens_per_chain, pub_keys)
672 .await?;
673 info!(
674 "Got {} chains in {} ms",
675 key_pairs.len(),
676 start.elapsed().as_millis()
677 );
678
679 if let Some(id) = fungible_application_id {
680 let start = Instant::now();
681 self.supply_fungible_tokens(&key_pairs, id).await?;
682 info!(
683 "Supplied fungible tokens in {} ms",
684 start.elapsed().as_millis()
685 );
686 }
687
688 let default_chain_id = self
689 .wallet
690 .default_chain()
691 .expect("should have default chain");
692 let default_chain_client = self.make_chain_client(default_chain_id);
693 let (epoch, committee) = default_chain_client.admin_committee().await?;
694 let blocks_infos = Benchmark::<Env>::make_benchmark_block_info(
695 key_pairs,
696 transactions_per_block,
697 fungible_application_id,
698 );
699
700 Ok((chain_clients, epoch, blocks_infos, committee))
701 }
702
703 pub async fn wrap_up_benchmark(
704 &mut self,
705 chain_clients: HashMap<ChainId, ChainClient<Env>>,
706 close_chains: bool,
707 wrap_up_max_in_flight: usize,
708 ) -> Result<(), Error> {
709 if close_chains {
710 info!("Closing chains...");
711 let stream = stream::iter(chain_clients.values().cloned())
712 .map(|chain_client| async move {
713 Benchmark::<Env>::close_benchmark_chain(&chain_client).await?;
714 info!("Closed chain {:?}", chain_client.chain_id());
715 Ok::<(), BenchmarkError>(())
716 })
717 .buffer_unordered(wrap_up_max_in_flight);
718 stream.try_collect::<Vec<_>>().await?;
719 } else {
720 info!("Processing inbox for all chains...");
721 let stream = stream::iter(chain_clients.values().cloned())
722 .map(|chain_client| async move {
723 chain_client.process_inbox().await?;
724 info!("Processed inbox for chain {:?}", chain_client.chain_id());
725 Ok::<(), ChainClientError>(())
726 })
727 .buffer_unordered(wrap_up_max_in_flight);
728 stream.try_collect::<Vec<_>>().await?;
729
730 info!("Updating wallet from chain clients...");
731 for chain_client in chain_clients.values() {
732 let info = chain_client.chain_info().await?;
733 let client_owner = chain_client.preferred_owner();
734 let pending_proposal = chain_client.pending_proposal().clone();
735 self.wallet
736 .as_mut()
737 .update_from_info(pending_proposal, client_owner, &info);
738 }
739 self.save_wallet().await?;
740 }
741
742 Ok(())
743 }
744
745 async fn process_inboxes_and_force_validator_updates(&mut self) {
746 let mut chain_clients = vec![];
747 for chain_id in &self.wallet.owned_chain_ids() {
748 chain_clients.push(self.make_chain_client(*chain_id));
749 }
750
751 let mut join_set = task::JoinSet::new();
752 for chain_client in chain_clients {
753 join_set.spawn(async move {
754 Self::process_inbox_without_updating_wallet(&chain_client)
755 .await
756 .expect("Processing inbox should not fail!");
757 chain_client
758 });
759 }
760
761 let chain_clients = join_set.join_all().await;
762 for chain_client in &chain_clients {
763 self.update_wallet_from_client(chain_client).await.unwrap();
764 }
765 }
766
767 async fn process_inbox_without_updating_wallet(
768 chain_client: &ChainClient<Env>,
769 ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
770 chain_client.synchronize_from_validators().await?;
772 let (certificates, maybe_timeout) = chain_client.process_inbox_without_prepare().await?;
773 assert!(
774 maybe_timeout.is_none(),
775 "Should not timeout within benchmark!"
776 );
777
778 Ok(certificates)
779 }
780
781 async fn make_benchmark_chains(
784 &mut self,
785 num_chains: usize,
786 balance: Amount,
787 pub_keys: Vec<AccountPublicKey>,
788 ) -> Result<
789 (
790 HashMap<ChainId, AccountOwner>,
791 HashMap<ChainId, ChainClient<Env>>,
792 ),
793 Error,
794 > {
795 use linera_base::identifiers::BlobType;
796 let mut benchmark_chains = HashMap::new();
797 let mut chain_clients = HashMap::new();
798 let start = Instant::now();
799 for chain_id in self.wallet.owned_chain_ids() {
800 if benchmark_chains.len() == num_chains {
801 break;
802 }
803 let owner = self
806 .wallet
807 .get(chain_id)
808 .and_then(|chain| chain.owner)
809 .unwrap();
810 let chain_client = self.make_chain_client(chain_id);
811 let ownership = chain_client.chain_info().await?.manager.ownership;
812 if !ownership.owners.is_empty() || ownership.super_owners.len() != 1 {
813 continue;
814 }
815 benchmark_chains.insert(chain_client.chain_id(), owner);
816 chain_client.process_inbox().await?;
817 chain_clients.insert(chain_id, chain_client);
818 }
819 info!(
820 "Got {} chains from the wallet in {} ms",
821 benchmark_chains.len(),
822 start.elapsed().as_millis()
823 );
824
825 let chains_from_wallet = benchmark_chains.len();
826 let num_chains_to_create = num_chains - chains_from_wallet;
827
828 let default_chain_id = self
829 .wallet
830 .default_chain()
831 .expect("should have default chain");
832 let operations_per_block = 900; let mut pub_keys_iter = pub_keys.into_iter().take(num_chains_to_create);
835 let default_chain_client = self.make_chain_client(default_chain_id);
836
837 for i in (0..num_chains_to_create).step_by(operations_per_block) {
838 let num_new_chains = operations_per_block.min(num_chains_to_create - i);
839 let pub_key = pub_keys_iter.next().unwrap();
840
841 let certificate = Self::execute_open_chains_operations(
842 num_new_chains,
843 &default_chain_client,
844 balance,
845 pub_key.into(),
846 )
847 .await?;
848 info!("Block executed successfully");
849
850 let block = certificate.block();
851 for i in 0..num_new_chains {
852 let chain_id = block.body.blobs[i]
853 .iter()
854 .find(|blob| blob.id().blob_type == BlobType::ChainDescription)
855 .map(|blob| ChainId(blob.id().hash))
856 .expect("failed to create a new chain");
857 benchmark_chains.insert(chain_id, pub_key.into());
858 self.client.track_chain(chain_id);
859
860 let mut chain_client = self.client.create_chain_client(
861 chain_id,
862 None,
863 BlockHeight::ZERO,
864 None,
865 Some(pub_key.into()),
866 );
867 chain_client.set_preferred_owner(pub_key.into());
868 chain_client.process_inbox().await?;
869 chain_clients.insert(chain_id, chain_client);
870 }
871 }
872
873 if num_chains_to_create > 0 {
874 info!(
875 "Created {} chains in {} ms",
876 num_chains_to_create,
877 start.elapsed().as_millis()
878 );
879 }
880
881 info!("Updating wallet from client");
882 self.update_wallet_from_client(&default_chain_client)
883 .await?;
884 info!("Retrying pending outgoing messages");
885 default_chain_client
886 .retry_pending_outgoing_messages()
887 .await
888 .context("outgoing messages to create the new chains should be delivered")?;
889 info!("Processing default chain inbox");
890 default_chain_client.process_inbox().await?;
891
892 Ok((benchmark_chains, chain_clients))
893 }
894
895 async fn execute_open_chains_operations(
896 num_new_chains: usize,
897 chain_client: &ChainClient<Env>,
898 balance: Amount,
899 owner: AccountOwner,
900 ) -> Result<ConfirmedBlockCertificate, Error> {
901 let config = OpenChainConfig {
902 ownership: ChainOwnership::single_super(owner),
903 balance,
904 application_permissions: Default::default(),
905 };
906 let operations = iter::repeat_n(
907 Operation::system(SystemOperation::OpenChain(config)),
908 num_new_chains,
909 )
910 .collect();
911 info!("Executing {} OpenChain operations", num_new_chains);
912 Ok(chain_client
913 .execute_operations(operations, vec![])
914 .await?
915 .expect("should execute block with OpenChain operations"))
916 }
917
918 async fn supply_fungible_tokens(
920 &mut self,
921 key_pairs: &HashMap<ChainId, AccountOwner>,
922 application_id: ApplicationId,
923 ) -> Result<(), Error> {
924 let default_chain_id = self
925 .wallet
926 .default_chain()
927 .expect("should have default chain");
928 let default_key = self.wallet.get(default_chain_id).unwrap().owner.unwrap();
929 let amount = Amount::from(1_000_000);
930 let operations: Vec<_> = key_pairs
931 .iter()
932 .map(|(chain_id, owner)| {
933 Benchmark::<Env>::fungible_transfer(
934 application_id,
935 *chain_id,
936 default_key,
937 *owner,
938 amount,
939 )
940 })
941 .collect();
942 let chain_client = self.make_chain_client(default_chain_id);
943 for operation_chunk in operations.chunks(1000) {
945 chain_client
946 .execute_operations(operation_chunk.to_vec(), vec![])
947 .await?
948 .expect("should execute block with Transfer operations");
949 }
950 self.update_wallet_from_client(&chain_client).await?;
951
952 Ok(())
953 }
954}