1use std::sync::Arc;
5
6use futures::Future;
7use linera_base::{
8 crypto::{CryptoHash, ValidatorPublicKey},
9 data_types::{BlockHeight, Timestamp},
10 identifiers::{Account, AccountOwner, ChainId},
11 ownership::ChainOwnership,
12 time::{Duration, Instant},
13};
14use linera_chain::types::ConfirmedBlockCertificate;
15use linera_core::{
16 client::{ChainClient, Client},
17 data_types::{ChainInfoQuery, ClientOutcome},
18 join_set_ext::JoinSet,
19 node::ValidatorNode,
20 Environment, JoinSetExt as _,
21};
22use linera_persistent::{Persist, PersistExt as _};
23use linera_rpc::node_provider::{NodeOptions, NodeProvider};
24use linera_version::VersionInfo;
25use thiserror_context::Context;
26use tracing::{debug, info};
27#[cfg(feature = "benchmark")]
28use {
29 crate::benchmark::{Benchmark, BenchmarkError},
30 futures::{stream, StreamExt, TryStreamExt},
31 linera_base::{
32 crypto::AccountPublicKey,
33 data_types::Amount,
34 identifiers::{ApplicationId, BlobType},
35 },
36 linera_core::client::ChainClientError,
37 linera_execution::{
38 committee::Committee,
39 system::{OpenChainConfig, SystemOperation},
40 Operation,
41 },
42 std::iter,
43 tokio::task,
44};
45#[cfg(feature = "fs")]
46use {
47 linera_base::{
48 data_types::{BlobContent, Bytecode},
49 identifiers::ModuleId,
50 vm::VmRuntime,
51 },
52 linera_core::client::create_bytecode_blobs,
53 std::{fs, path::PathBuf},
54};
55
56use crate::{
57 chain_listener::{self, ClientContext as _},
58 client_options::{ChainOwnershipConfig, ClientContextOptions},
59 error, util,
60 wallet::{UserChain, Wallet},
61 Error,
62};
63
64pub struct ClientContext<Env: Environment, W> {
65 pub wallet: W,
66 pub client: Arc<Client<Env>>,
67 pub send_timeout: Duration,
68 pub recv_timeout: Duration,
69 pub retry_delay: Duration,
70 pub max_retries: u32,
71 pub chain_listeners: JoinSet,
72}
73
74impl<Env: Environment, W> chain_listener::ClientContext for ClientContext<Env, W>
75where
76 W: Persist<Target = Wallet>,
77{
78 type Environment = Env;
79
80 fn wallet(&self) -> &Wallet {
81 &self.wallet
82 }
83
84 fn storage(&self) -> &Env::Storage {
85 self.client.storage_client()
86 }
87
88 fn client(&self) -> &Arc<Client<Env>> {
89 &self.client
90 }
91
92 async fn update_wallet_for_new_chain(
93 &mut self,
94 chain_id: ChainId,
95 owner: Option<AccountOwner>,
96 timestamp: Timestamp,
97 ) -> Result<(), Error> {
98 self.update_wallet_for_new_chain(chain_id, owner, timestamp)
99 .await
100 }
101
102 async fn update_wallet(&mut self, client: &ChainClient<Env>) -> Result<(), Error> {
103 self.update_wallet_from_client(client).await
104 }
105}
106
107impl<S, Si, W> ClientContext<linera_core::environment::Impl<S, NodeProvider, Si>, W>
108where
109 S: linera_core::environment::Storage,
110 Si: linera_core::environment::Signer,
111 W: Persist<Target = Wallet>,
112{
113 pub fn new(storage: S, options: ClientContextOptions, wallet: W, signer: Si) -> Self {
114 let node_provider = NodeProvider::new(NodeOptions {
115 send_timeout: options.send_timeout,
116 recv_timeout: options.recv_timeout,
117 retry_delay: options.retry_delay,
118 max_retries: options.max_retries,
119 });
120 let chain_ids = wallet.chain_ids();
121 let name = match chain_ids.len() {
122 0 => "Client node".to_string(),
123 1 => format!("Client node for {:.8}", chain_ids[0]),
124 n => format!("Client node for {:.8} and {} others", chain_ids[0], n - 1),
125 };
126 let client = Client::new(
127 linera_core::environment::Impl {
128 network: node_provider,
129 storage,
130 signer,
131 },
132 wallet.genesis_admin_chain(),
133 options.long_lived_services,
134 chain_ids,
135 name,
136 Duration::from_secs(30),
137 options.to_chain_client_options(),
138 );
139
140 ClientContext {
141 client: Arc::new(client),
142 wallet,
143 send_timeout: options.send_timeout,
144 recv_timeout: options.recv_timeout,
145 retry_delay: options.retry_delay,
146 max_retries: options.max_retries,
147 chain_listeners: JoinSet::default(),
148 }
149 }
150
151 #[cfg(with_testing)]
152 pub fn new_test_client_context(storage: S, wallet: W, signer: Si) -> Self {
153 use linera_core::{client::ChainClientOptions, node::CrossChainMessageDelivery};
154
155 let send_recv_timeout = Duration::from_millis(4000);
156 let retry_delay = Duration::from_millis(1000);
157 let max_retries = 10;
158
159 let node_options = NodeOptions {
160 send_timeout: send_recv_timeout,
161 recv_timeout: send_recv_timeout,
162 retry_delay,
163 max_retries,
164 };
165 let chain_ids = wallet.chain_ids();
166 let name = match chain_ids.len() {
167 0 => "Client node".to_string(),
168 1 => format!("Client node for {:.8}", chain_ids[0]),
169 n => format!("Client node for {:.8} and {} others", chain_ids[0], n - 1),
170 };
171 let client = Client::new(
172 linera_core::environment::Impl {
173 storage,
174 network: NodeProvider::new(node_options),
175 signer,
176 },
177 wallet.genesis_admin_chain(),
178 false,
179 chain_ids,
180 name,
181 Duration::from_secs(30),
182 ChainClientOptions {
183 cross_chain_message_delivery: CrossChainMessageDelivery::Blocking,
184 ..ChainClientOptions::test_default()
185 },
186 );
187
188 ClientContext {
189 client: Arc::new(client),
190 wallet,
191 send_timeout: send_recv_timeout,
192 recv_timeout: send_recv_timeout,
193 retry_delay,
194 max_retries,
195 chain_listeners: JoinSet::default(),
196 }
197 }
198}
199
200impl<Env: Environment, W: Persist<Target = Wallet>> ClientContext<Env, W> {
201 pub fn wallet(&self) -> &Wallet {
203 &self.wallet
204 }
205
206 pub fn wallet_mut(&mut self) -> &mut W {
208 &mut self.wallet
209 }
210
211 pub async fn mutate_wallet<R: Send>(
212 &mut self,
213 mutation: impl FnOnce(&mut Wallet) -> R + Send,
214 ) -> Result<R, Error> {
215 self.wallet
216 .mutate(mutation)
217 .await
218 .map_err(|e| error::Inner::Persistence(Box::new(e)).into())
219 }
220
221 pub fn default_account(&self) -> Account {
224 Account::chain(self.default_chain())
225 }
226
227 pub fn default_chain(&self) -> ChainId {
229 self.wallet
230 .default_chain()
231 .expect("No chain specified in wallet with no default chain")
232 }
233
234 pub fn first_non_admin_chain(&self) -> ChainId {
235 self.wallet
236 .first_non_admin_chain()
237 .expect("No non-admin chain specified in wallet with no non-admin chain")
238 }
239
240 pub fn make_node_provider(&self) -> NodeProvider {
241 NodeProvider::new(self.make_node_options())
242 }
243
244 fn make_node_options(&self) -> NodeOptions {
245 NodeOptions {
246 send_timeout: self.send_timeout,
247 recv_timeout: self.recv_timeout,
248 retry_delay: self.retry_delay,
249 max_retries: self.max_retries,
250 }
251 }
252
253 pub async fn save_wallet(&mut self) -> Result<(), Error> {
254 self.wallet
255 .persist()
256 .await
257 .map_err(|e| error::Inner::Persistence(Box::new(e)).into())
258 }
259
260 pub async fn update_wallet_from_client<Env_: Environment>(
261 &mut self,
262 client: &ChainClient<Env_>,
263 ) -> Result<(), Error> {
264 let info = client.chain_info().await?;
265 let client_owner = client.preferred_owner();
266 let pending_proposal = client.pending_proposal().clone();
267 self.wallet
268 .as_mut()
269 .update_from_info(pending_proposal, client_owner, &info);
270 self.save_wallet().await
271 }
272
273 pub async fn update_wallet_for_new_chain(
275 &mut self,
276 chain_id: ChainId,
277 owner: Option<AccountOwner>,
278 timestamp: Timestamp,
279 ) -> Result<(), Error> {
280 if self.wallet.get(chain_id).is_none() {
281 self.mutate_wallet(|w| {
282 w.insert(UserChain {
283 chain_id,
284 owner,
285 block_hash: None,
286 timestamp,
287 next_block_height: BlockHeight::ZERO,
288 pending_proposal: None,
289 })
290 })
291 .await?;
292 }
293
294 Ok(())
295 }
296
297 pub async fn process_inbox(
298 &mut self,
299 chain_client: &ChainClient<Env>,
300 ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
301 let mut certificates = Vec::new();
302 let (new_certificates, maybe_timeout) = {
304 chain_client.synchronize_from_validators().await?;
305 let result = chain_client.process_inbox_without_prepare().await;
306 self.update_wallet_from_client(chain_client).await?;
307 if result.is_err() {
308 self.save_wallet().await?;
309 }
310 result?
311 };
312 certificates.extend(new_certificates);
313 if maybe_timeout.is_none() {
314 self.save_wallet().await?;
315 return Ok(certificates);
316 }
317
318 let (listener, _listen_handle, mut notification_stream) = chain_client.listen().await?;
320 self.chain_listeners.spawn_task(listener);
321
322 loop {
323 let (new_certificates, maybe_timeout) = {
324 let result = chain_client.process_inbox().await;
325 self.update_wallet_from_client(chain_client).await?;
326 if result.is_err() {
327 self.save_wallet().await?;
328 }
329 result?
330 };
331 certificates.extend(new_certificates);
332 if let Some(timestamp) = maybe_timeout {
333 util::wait_for_next_round(&mut notification_stream, timestamp).await
334 } else {
335 self.save_wallet().await?;
336 return Ok(certificates);
337 }
338 }
339 }
340
341 pub async fn assign_new_chain_to_key(
342 &mut self,
343 chain_id: ChainId,
344 owner: AccountOwner,
345 ) -> Result<(), Error> {
346 self.client.track_chain(chain_id);
347 let client = self.make_chain_client(chain_id);
348 let chain_description = client.get_chain_description().await?;
349 let config = chain_description.config();
350
351 if !config.ownership.verify_owner(&owner) {
352 tracing::error!(
353 "The chain with the ID returned by the faucet is not owned by you. \
354 Please make sure you are connecting to a genuine faucet."
355 );
356 return Err(error::Inner::ChainOwnership.into());
357 }
358
359 self.wallet_mut()
360 .mutate(|w| w.assign_new_chain_to_owner(owner, chain_id, chain_description.timestamp()))
361 .await
362 .map_err(|e| error::Inner::Persistence(Box::new(e)))?
363 .context("assigning new chain")?;
364 Ok(())
365 }
366
367 pub async fn apply_client_command<E, F, Fut, T>(
372 &mut self,
373 client: &ChainClient<Env>,
374 mut f: F,
375 ) -> Result<T, Error>
376 where
377 F: FnMut(&ChainClient<Env>) -> Fut,
378 Fut: Future<Output = Result<ClientOutcome<T>, E>>,
379 Error: From<E>,
380 {
381 client.prepare_chain().await?;
382 let result = f(client).await;
384 self.update_wallet_from_client(client).await?;
385 if let ClientOutcome::Committed(t) = result? {
386 return Ok(t);
387 }
388
389 let (listener, _listen_handle, mut notification_stream) = client.listen().await?;
391 self.chain_listeners.spawn_task(listener);
392
393 loop {
394 client.prepare_chain().await?;
396 let result = f(client).await;
397 self.update_wallet_from_client(client).await?;
398 let timeout = match result? {
399 ClientOutcome::Committed(t) => return Ok(t),
400 ClientOutcome::WaitForTimeout(timeout) => timeout,
401 };
402 util::wait_for_next_round(&mut notification_stream, timeout).await;
404 }
405 }
406
407 pub async fn change_ownership(
408 &mut self,
409 chain_id: Option<ChainId>,
410 ownership_config: ChainOwnershipConfig,
411 ) -> Result<(), Error> {
412 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
413 let chain_client = self.make_chain_client(chain_id);
414 info!(
415 ?ownership_config, %chain_id, preferred_owner=?chain_client.preferred_owner(),
416 "Changing ownership of a chain"
417 );
418 let time_start = Instant::now();
419 let ownership = ChainOwnership::try_from(ownership_config)?;
420
421 let certificate = self
422 .apply_client_command(&chain_client, |chain_client| {
423 let ownership = ownership.clone();
424 let chain_client = chain_client.clone();
425 async move {
426 chain_client
427 .change_ownership(ownership)
428 .await
429 .map_err(Error::from)
430 .context("Failed to change ownership")
431 }
432 })
433 .await?;
434 let time_total = time_start.elapsed();
435 info!("Operation confirmed after {} ms", time_total.as_millis());
436 debug!("{:?}", certificate);
437 Ok(())
438 }
439
440 pub async fn set_preferred_owner(
441 &mut self,
442 chain_id: Option<ChainId>,
443 preferred_owner: AccountOwner,
444 ) -> Result<(), Error> {
445 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
446 let mut chain_client = self.make_chain_client(chain_id);
447 let old_owner = chain_client.preferred_owner();
448 info!(%chain_id, ?old_owner, %preferred_owner, "Changing preferred owner for chain");
449 chain_client.set_preferred_owner(preferred_owner);
450 self.update_wallet_from_client(&chain_client).await?;
451 info!("New preferred owner set");
452 Ok(())
453 }
454
455 pub async fn check_compatible_version_info(
456 &self,
457 address: &str,
458 node: &impl ValidatorNode,
459 ) -> Result<VersionInfo, Error> {
460 match node.get_version_info().await {
461 Ok(version_info) if version_info.is_compatible_with(&linera_version::VERSION_INFO) => {
462 info!(
463 "Version information for validator {address}: {}",
464 version_info
465 );
466 Ok(version_info)
467 }
468 Ok(version_info) => Err(error::Inner::UnexpectedVersionInfo {
469 remote: Box::new(version_info),
470 local: Box::new(linera_version::VERSION_INFO.clone()),
471 }
472 .into()),
473 Err(error) => Err(error::Inner::UnavailableVersionInfo {
474 address: address.to_string(),
475 error: Box::new(error),
476 }
477 .into()),
478 }
479 }
480
481 pub async fn check_matching_network_description(
482 &self,
483 address: &str,
484 node: &impl ValidatorNode,
485 ) -> Result<CryptoHash, Error> {
486 let network_description = self.wallet().genesis_config().network_description();
487 match node.get_network_description().await {
488 Ok(description) => {
489 if description == network_description {
490 Ok(description.genesis_config_hash)
491 } else {
492 Err(error::Inner::UnexpectedNetworkDescription {
493 remote: Box::new(description),
494 local: Box::new(network_description),
495 }
496 .into())
497 }
498 }
499 Err(error) => Err(error::Inner::UnavailableNetworkDescription {
500 address: address.to_string(),
501 error: Box::new(error),
502 }
503 .into()),
504 }
505 }
506
507 pub async fn check_validator_chain_info_response(
508 &self,
509 public_key: Option<&ValidatorPublicKey>,
510 address: &str,
511 node: &impl ValidatorNode,
512 chain_id: ChainId,
513 ) -> Result<(), Error> {
514 let query = ChainInfoQuery::new(chain_id);
515 match node.handle_chain_info_query(query).await {
516 Ok(response) => {
517 info!(
518 "Validator {address} sees chain {chain_id} at block height {} and epoch {:?}",
519 response.info.next_block_height, response.info.epoch,
520 );
521 if let Some(public_key) = public_key {
522 if response.check(*public_key).is_ok() {
523 info!("Signature for public key {public_key} is OK.");
524 } else {
525 return Err(error::Inner::InvalidSignature {
526 public_key: *public_key,
527 }
528 .into());
529 }
530 }
531 Ok(())
532 }
533 Err(error) => Err(error::Inner::UnavailableChainInfo {
534 address: address.to_string(),
535 chain_id,
536 error: Box::new(error),
537 }
538 .into()),
539 }
540 }
541}
542
543#[cfg(feature = "fs")]
544impl<Env: Environment, W> ClientContext<Env, W>
545where
546 W: Persist<Target = Wallet>,
547{
548 pub async fn publish_module(
549 &mut self,
550 chain_client: &ChainClient<Env>,
551 contract: PathBuf,
552 service: PathBuf,
553 vm_runtime: VmRuntime,
554 ) -> Result<ModuleId, Error> {
555 info!("Loading bytecode files");
556 let contract_bytecode = Bytecode::load_from_file(&contract)
557 .await
558 .with_context(|| format!("failed to load contract bytecode from {:?}", &contract))?;
559 let service_bytecode = Bytecode::load_from_file(&service)
560 .await
561 .with_context(|| format!("failed to load service bytecode from {:?}", &service))?;
562
563 info!("Publishing module");
564 let (blobs, module_id) =
565 create_bytecode_blobs(contract_bytecode, service_bytecode, vm_runtime).await;
566 let (module_id, _) = self
567 .apply_client_command(chain_client, |chain_client| {
568 let blobs = blobs.clone();
569 let chain_client = chain_client.clone();
570 async move {
571 chain_client
572 .publish_module_blobs(blobs, module_id)
573 .await
574 .context("Failed to publish module")
575 }
576 })
577 .await?;
578
579 info!("{}", "Module published successfully!");
580
581 info!("Synchronizing client and processing inbox");
582 self.process_inbox(chain_client).await?;
583 Ok(module_id)
584 }
585
586 pub async fn publish_data_blob(
587 &mut self,
588 chain_client: &ChainClient<Env>,
589 blob_path: PathBuf,
590 ) -> Result<CryptoHash, Error> {
591 info!("Loading data blob file");
592 let blob_bytes = fs::read(&blob_path).context(format!(
593 "failed to load data blob bytes from {:?}",
594 &blob_path
595 ))?;
596
597 info!("Publishing data blob");
598 self.apply_client_command(chain_client, |chain_client| {
599 let blob_bytes = blob_bytes.clone();
600 let chain_client = chain_client.clone();
601 async move {
602 chain_client
603 .publish_data_blob(blob_bytes)
604 .await
605 .context("Failed to publish data blob")
606 }
607 })
608 .await?;
609
610 info!("{}", "Data blob published successfully!");
611 Ok(CryptoHash::new(&BlobContent::new_data(blob_bytes)))
612 }
613
614 pub async fn read_data_blob(
616 &mut self,
617 chain_client: &ChainClient<Env>,
618 hash: CryptoHash,
619 ) -> Result<(), Error> {
620 info!("Verifying data blob");
621 self.apply_client_command(chain_client, |chain_client| {
622 let chain_client = chain_client.clone();
623 async move {
624 chain_client
625 .read_data_blob(hash)
626 .await
627 .context("Failed to verify data blob")
628 }
629 })
630 .await?;
631
632 info!("{}", "Data blob verified successfully!");
633 Ok(())
634 }
635}
636
637#[cfg(feature = "benchmark")]
638impl<Env: Environment, W> ClientContext<Env, W>
639where
640 W: Persist<Target = Wallet>,
641{
642 pub async fn prepare_for_benchmark(
643 &mut self,
644 num_chain_groups: usize,
645 num_chains_per_chain_group: usize,
646 transactions_per_block: usize,
647 tokens_per_chain: Amount,
648 fungible_application_id: Option<ApplicationId>,
649 pub_keys: Vec<AccountPublicKey>,
650 ) -> Result<
651 (
652 Vec<Vec<ChainClient<Env>>>,
653 Vec<Vec<(Vec<Operation>, AccountOwner)>>,
654 Committee,
655 ),
656 Error,
657 > {
658 let start = Instant::now();
659 self.process_inboxes_and_force_validator_updates().await;
663 info!(
664 "Processed inboxes and forced validator updates in {} ms",
665 start.elapsed().as_millis()
666 );
667
668 let start = Instant::now();
669 let (benchmark_chains, chain_clients) = self
670 .make_benchmark_chains(
671 num_chain_groups,
672 num_chains_per_chain_group,
673 tokens_per_chain,
674 pub_keys,
675 )
676 .await?;
677 info!(
678 "Got {} chains in {} ms",
679 num_chain_groups * num_chains_per_chain_group,
680 start.elapsed().as_millis()
681 );
682
683 if let Some(id) = fungible_application_id {
684 let start = Instant::now();
685 self.supply_fungible_tokens(&benchmark_chains, id).await?;
686 info!(
687 "Supplied fungible tokens in {} ms",
688 start.elapsed().as_millis()
689 );
690 let start = Instant::now();
692 for chain_client in chain_clients.iter().flatten() {
693 chain_client.process_inbox().await?;
694 }
695 info!(
696 "Processed inboxes after supplying fungible tokens in {} ms",
697 start.elapsed().as_millis()
698 );
699 }
700
701 let default_chain_id = self
702 .wallet
703 .default_chain()
704 .expect("should have default chain");
705 let default_chain_client = self.make_chain_client(default_chain_id);
706 let committee = default_chain_client.admin_committee().await?.1;
707 let blocks_infos = Benchmark::<Env>::make_benchmark_block_info(
708 benchmark_chains,
709 transactions_per_block,
710 fungible_application_id,
711 );
712
713 Ok((chain_clients, blocks_infos, committee))
714 }
715
716 pub async fn wrap_up_benchmark(
717 &mut self,
718 chain_clients: Vec<Vec<ChainClient<Env>>>,
719 close_chains: bool,
720 wrap_up_max_in_flight: usize,
721 ) -> Result<(), Error> {
722 if close_chains {
723 info!("Closing chains...");
724 let stream = stream::iter(chain_clients.into_iter().flatten())
725 .map(|chain_client| async move {
726 Benchmark::<Env>::close_benchmark_chain(&chain_client).await?;
727 info!("Closed chain {:?}", chain_client.chain_id());
728 Ok::<(), BenchmarkError>(())
729 })
730 .buffer_unordered(wrap_up_max_in_flight);
731 stream.try_collect::<Vec<_>>().await?;
732 } else {
733 info!("Processing inbox for all chains...");
734 let stream = stream::iter(chain_clients.iter().flatten().cloned())
735 .map(|chain_client| async move {
736 chain_client.process_inbox().await?;
737 info!("Processed inbox for chain {:?}", chain_client.chain_id());
738 Ok::<(), ChainClientError>(())
739 })
740 .buffer_unordered(wrap_up_max_in_flight);
741 stream.try_collect::<Vec<_>>().await?;
742
743 info!("Updating wallet from chain clients...");
744 for chain_client in chain_clients.iter().flatten() {
745 let info = chain_client.chain_info().await?;
746 let client_owner = chain_client.preferred_owner();
747 let pending_proposal = chain_client.pending_proposal().clone();
748 self.wallet
749 .as_mut()
750 .update_from_info(pending_proposal, client_owner, &info);
751 }
752 self.save_wallet().await?;
753 }
754
755 Ok(())
756 }
757
758 async fn process_inboxes_and_force_validator_updates(&mut self) {
759 let mut chain_clients = vec![];
760 for chain_id in &self.wallet.owned_chain_ids() {
761 chain_clients.push(self.make_chain_client(*chain_id));
762 }
763
764 let mut join_set = task::JoinSet::new();
765 for chain_client in chain_clients {
766 join_set.spawn(async move {
767 Self::process_inbox_without_updating_wallet(&chain_client)
768 .await
769 .expect("Processing inbox should not fail!");
770 chain_client
771 });
772 }
773
774 let chain_clients = join_set.join_all().await;
775 for chain_client in &chain_clients {
776 self.update_wallet_from_client(chain_client).await.unwrap();
777 }
778 }
779
780 async fn process_inbox_without_updating_wallet(
781 chain_client: &ChainClient<Env>,
782 ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
783 chain_client.synchronize_from_validators().await?;
785 let (certificates, maybe_timeout) = chain_client.process_inbox_without_prepare().await?;
786 assert!(
787 maybe_timeout.is_none(),
788 "Should not timeout within benchmark!"
789 );
790
791 Ok(certificates)
792 }
793
794 #[expect(clippy::too_many_arguments)]
795 fn insert_chain(
796 benchmark_chains: &mut Vec<Vec<(ChainId, AccountOwner)>>,
797 chain_clients: &mut Vec<Vec<ChainClient<Env>>>,
798 chain_group: &mut Vec<(ChainId, AccountOwner)>,
799 chain_clients_group: &mut Vec<ChainClient<Env>>,
800 num_chains_per_chain_group: usize,
801 chain_id: ChainId,
802 owner: AccountOwner,
803 chain_client: ChainClient<Env>,
804 ) {
805 chain_group.push((chain_id, owner));
806 chain_clients_group.push(chain_client);
807
808 if chain_group.len() == num_chains_per_chain_group
809 && chain_clients_group.len() == num_chains_per_chain_group
810 {
811 benchmark_chains.push(chain_group.clone());
812 chain_clients.push(chain_clients_group.clone());
813 chain_group.clear();
814 chain_clients_group.clear();
815 }
816 }
817
818 async fn make_benchmark_chains(
821 &mut self,
822 num_chain_groups: usize,
823 num_chains_per_chain_group: usize,
824 balance: Amount,
825 pub_keys: Vec<AccountPublicKey>,
826 ) -> Result<
827 (
828 Vec<Vec<(ChainId, AccountOwner)>>,
829 Vec<Vec<ChainClient<Env>>>,
830 ),
831 Error,
832 > {
833 let total_chains_to_create = num_chain_groups * num_chains_per_chain_group;
834 let mut chains_found_in_wallet = 0;
835 let mut benchmark_chains = Vec::with_capacity(num_chain_groups);
836 let mut chain_clients = Vec::with_capacity(num_chain_groups);
837 let mut chain_group = Vec::with_capacity(num_chains_per_chain_group);
838 let mut chain_clients_group = Vec::with_capacity(num_chains_per_chain_group);
839 let start = Instant::now();
840 for chain_id in self.wallet.owned_chain_ids() {
841 if chains_found_in_wallet == total_chains_to_create {
842 break;
843 }
844 let owner = self
847 .wallet
848 .get(chain_id)
849 .and_then(|chain| chain.owner)
850 .unwrap();
851 let chain_client = self.make_chain_client(chain_id);
852 let ownership = chain_client.chain_info().await?.manager.ownership;
853 if !ownership.owners.is_empty() || ownership.super_owners.len() != 1 {
854 continue;
855 }
856 chain_client.process_inbox().await?;
857 Self::insert_chain(
858 &mut benchmark_chains,
859 &mut chain_clients,
860 &mut chain_group,
861 &mut chain_clients_group,
862 num_chains_per_chain_group,
863 chain_id,
864 owner,
865 chain_client,
866 );
867 chains_found_in_wallet += 1;
868 }
869 info!(
870 "Got {} chains from the wallet in {} ms",
871 benchmark_chains.len(),
872 start.elapsed().as_millis()
873 );
874
875 let num_chains_to_create = total_chains_to_create - chains_found_in_wallet;
876
877 let default_chain_id = self
878 .wallet
879 .default_chain()
880 .expect("should have default chain");
881 let operations_per_block = 900; let mut pub_keys_iter = pub_keys.into_iter().take(num_chains_to_create);
884 let default_chain_client = self.make_chain_client(default_chain_id);
885
886 for i in (0..num_chains_to_create).step_by(operations_per_block) {
887 let num_new_chains = operations_per_block.min(num_chains_to_create - i);
888 let pub_key = pub_keys_iter.next().unwrap();
889 let owner = pub_key.into();
890
891 let certificate = Self::execute_open_chains_operations(
892 num_new_chains,
893 &default_chain_client,
894 balance,
895 owner,
896 )
897 .await?;
898 info!("Block executed successfully");
899
900 let block = certificate.block();
901 for i in 0..num_new_chains {
902 let chain_id = block.body.blobs[i]
903 .iter()
904 .find(|blob| blob.id().blob_type == BlobType::ChainDescription)
905 .map(|blob| ChainId(blob.id().hash))
906 .expect("failed to create a new chain");
907 self.client.track_chain(chain_id);
908
909 let mut chain_client = self.client.create_chain_client(
910 chain_id,
911 None,
912 BlockHeight::ZERO,
913 None,
914 Some(owner),
915 );
916 chain_client.set_preferred_owner(owner);
917 chain_client.process_inbox().await?;
918 Self::insert_chain(
919 &mut benchmark_chains,
920 &mut chain_clients,
921 &mut chain_group,
922 &mut chain_clients_group,
923 num_chains_per_chain_group,
924 chain_id,
925 owner,
926 chain_client,
927 );
928 }
929 }
930
931 if num_chains_to_create > 0 {
932 info!(
933 "Created {} chains in {} ms",
934 num_chains_to_create,
935 start.elapsed().as_millis()
936 );
937 }
938
939 info!("Updating wallet from client");
940 self.update_wallet_from_client(&default_chain_client)
941 .await?;
942 info!("Retrying pending outgoing messages");
943 default_chain_client
944 .retry_pending_outgoing_messages()
945 .await
946 .context("outgoing messages to create the new chains should be delivered")?;
947 info!("Processing default chain inbox");
948 default_chain_client.process_inbox().await?;
949
950 Ok((benchmark_chains, chain_clients))
951 }
952
953 async fn execute_open_chains_operations(
954 num_new_chains: usize,
955 chain_client: &ChainClient<Env>,
956 balance: Amount,
957 owner: AccountOwner,
958 ) -> Result<ConfirmedBlockCertificate, Error> {
959 let config = OpenChainConfig {
960 ownership: ChainOwnership::single_super(owner),
961 balance,
962 application_permissions: Default::default(),
963 };
964 let operations = iter::repeat_n(
965 Operation::system(SystemOperation::OpenChain(config)),
966 num_new_chains,
967 )
968 .collect();
969 info!("Executing {} OpenChain operations", num_new_chains);
970 Ok(chain_client
971 .execute_operations(operations, vec![])
972 .await?
973 .expect("should execute block with OpenChain operations"))
974 }
975
976 async fn supply_fungible_tokens(
978 &mut self,
979 key_pairs: &[Vec<(ChainId, AccountOwner)>],
980 application_id: ApplicationId,
981 ) -> Result<(), Error> {
982 let default_chain_id = self
983 .wallet
984 .default_chain()
985 .expect("should have default chain");
986 let default_key = self.wallet.get(default_chain_id).unwrap().owner.unwrap();
987 let amount = Amount::from_nanos(4);
989 let operations: Vec<Operation> = key_pairs
990 .iter()
991 .flat_map(|chain_group| {
992 chain_group
993 .iter()
994 .map(|(chain_id, owner)| {
995 Benchmark::<Env>::fungible_transfer(
996 application_id,
997 *chain_id,
998 default_key,
999 *owner,
1000 amount,
1001 )
1002 })
1003 .collect::<Vec<_>>()
1004 })
1005 .collect();
1006 let chain_client = self.make_chain_client(default_chain_id);
1007 for operation_chunk in operations.chunks(1000) {
1009 chain_client
1010 .execute_operations(operation_chunk.to_vec(), vec![])
1011 .await?
1012 .expect("should execute block with Transfer operations");
1013 }
1014 self.update_wallet_from_client(&chain_client).await?;
1015
1016 Ok(())
1017 }
1018}