1use std::sync::Arc;
5
6use futures::Future;
7use linera_base::{
8 crypto::{CryptoHash, ValidatorPublicKey},
9 data_types::{BlockHeight, Epoch, Timestamp},
10 identifiers::{Account, AccountOwner, ChainId},
11 ownership::ChainOwnership,
12 time::{Duration, Instant},
13};
14use linera_chain::types::ConfirmedBlockCertificate;
15use linera_core::{
16 client::{ChainClient, Client, ListeningMode},
17 data_types::{ChainInfoQuery, ClientOutcome},
18 join_set_ext::JoinSet,
19 node::ValidatorNode,
20 Environment, JoinSetExt as _,
21};
22use linera_persistent::{Persist, PersistExt as _};
23use linera_rpc::node_provider::{NodeOptions, NodeProvider};
24use linera_version::VersionInfo;
25use thiserror_context::Context;
26use tracing::{debug, info};
27#[cfg(not(web))]
28use {
29 crate::{
30 benchmark::{Benchmark, BenchmarkError},
31 client_metrics::ClientMetrics,
32 },
33 futures::{stream, StreamExt, TryStreamExt},
34 linera_base::{
35 crypto::AccountPublicKey,
36 data_types::Amount,
37 identifiers::{ApplicationId, BlobType},
38 },
39 linera_core::client::ChainClientError,
40 linera_execution::{
41 system::{OpenChainConfig, SystemOperation},
42 Operation,
43 },
44 std::{collections::HashSet, iter, path::Path},
45 tokio::{sync::mpsc, task},
46};
47#[cfg(feature = "fs")]
48use {
49 linera_base::{
50 data_types::{BlobContent, Bytecode},
51 identifiers::ModuleId,
52 vm::VmRuntime,
53 },
54 linera_core::client::create_bytecode_blobs,
55 std::{fs, path::PathBuf},
56};
57
58use crate::{
59 chain_listener::{self, ClientContext as _},
60 client_options::{ChainOwnershipConfig, ClientContextOptions},
61 error, util,
62 wallet::{UserChain, Wallet},
63 Error,
64};
65
66pub struct ClientContext<Env: Environment, W> {
67 pub wallet: W,
68 pub client: Arc<Client<Env>>,
69 pub send_timeout: Duration,
70 pub recv_timeout: Duration,
71 pub retry_delay: Duration,
72 pub max_retries: u32,
73 pub chain_listeners: JoinSet,
74 #[cfg(not(web))]
75 pub client_metrics: Option<ClientMetrics>,
76}
77
78impl<Env: Environment, W> chain_listener::ClientContext for ClientContext<Env, W>
79where
80 W: Persist<Target = Wallet>,
81{
82 type Environment = Env;
83
84 fn wallet(&self) -> &Wallet {
85 &self.wallet
86 }
87
88 fn storage(&self) -> &Env::Storage {
89 self.client.storage_client()
90 }
91
92 fn client(&self) -> &Arc<Client<Env>> {
93 &self.client
94 }
95
96 #[cfg(not(web))]
97 fn timing_sender(
98 &self,
99 ) -> Option<mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
100 self.client_metrics
101 .as_ref()
102 .map(|metrics| metrics.timing_sender.clone())
103 }
104
105 async fn update_wallet_for_new_chain(
106 &mut self,
107 chain_id: ChainId,
108 owner: Option<AccountOwner>,
109 timestamp: Timestamp,
110 epoch: Epoch,
111 ) -> Result<(), Error> {
112 self.update_wallet_for_new_chain(chain_id, owner, timestamp, epoch)
113 .await
114 }
115
116 async fn update_wallet(&mut self, client: &ChainClient<Env>) -> Result<(), Error> {
117 self.update_wallet_from_client(client).await
118 }
119}
120
121impl<S, Si, W> ClientContext<linera_core::environment::Impl<S, NodeProvider, Si>, W>
122where
123 S: linera_core::environment::Storage,
124 Si: linera_core::environment::Signer,
125 W: Persist<Target = Wallet>,
126{
127 pub fn new(storage: S, options: ClientContextOptions, wallet: W, signer: Si) -> Self {
128 #[cfg(not(web))]
129 let timing_config = options.to_timing_config();
130 let node_provider = NodeProvider::new(NodeOptions {
131 send_timeout: options.send_timeout,
132 recv_timeout: options.recv_timeout,
133 retry_delay: options.retry_delay,
134 max_retries: options.max_retries,
135 });
136 let chain_ids = wallet.chain_ids();
137 let name = match chain_ids.len() {
138 0 => "Client node".to_string(),
139 1 => format!("Client node for {:.8}", chain_ids[0]),
140 n => format!("Client node for {:.8} and {} others", chain_ids[0], n - 1),
141 };
142 let client = Client::new(
143 linera_core::environment::Impl {
144 network: node_provider,
145 storage,
146 signer,
147 },
148 wallet.genesis_admin_chain(),
149 options.long_lived_services,
150 chain_ids,
151 name,
152 options.chain_worker_ttl,
153 options.to_chain_client_options(),
154 );
155
156 #[cfg(not(web))]
157 let client_metrics = if timing_config.enabled {
158 Some(ClientMetrics::new(timing_config))
159 } else {
160 None
161 };
162
163 ClientContext {
164 client: Arc::new(client),
165 wallet,
166 send_timeout: options.send_timeout,
167 recv_timeout: options.recv_timeout,
168 retry_delay: options.retry_delay,
169 max_retries: options.max_retries,
170 chain_listeners: JoinSet::default(),
171 #[cfg(not(web))]
172 client_metrics,
173 }
174 }
175
176 #[cfg(with_testing)]
177 pub fn new_test_client_context(storage: S, wallet: W, signer: Si) -> Self {
178 use linera_core::{client::ChainClientOptions, node::CrossChainMessageDelivery};
179
180 let send_recv_timeout = Duration::from_millis(4000);
181 let retry_delay = Duration::from_millis(1000);
182 let max_retries = 10;
183
184 let node_options = NodeOptions {
185 send_timeout: send_recv_timeout,
186 recv_timeout: send_recv_timeout,
187 retry_delay,
188 max_retries,
189 };
190 let chain_ids = wallet.chain_ids();
191 let name = match chain_ids.len() {
192 0 => "Client node".to_string(),
193 1 => format!("Client node for {:.8}", chain_ids[0]),
194 n => format!("Client node for {:.8} and {} others", chain_ids[0], n - 1),
195 };
196 let client = Client::new(
197 linera_core::environment::Impl {
198 storage,
199 network: NodeProvider::new(node_options),
200 signer,
201 },
202 wallet.genesis_admin_chain(),
203 false,
204 chain_ids,
205 name,
206 Duration::from_secs(30),
207 ChainClientOptions {
208 cross_chain_message_delivery: CrossChainMessageDelivery::Blocking,
209 ..ChainClientOptions::test_default()
210 },
211 );
212
213 ClientContext {
214 client: Arc::new(client),
215 wallet,
216 send_timeout: send_recv_timeout,
217 recv_timeout: send_recv_timeout,
218 retry_delay,
219 max_retries,
220 chain_listeners: JoinSet::default(),
221 client_metrics: None,
222 }
223 }
224}
225
226impl<Env: Environment, W: Persist<Target = Wallet>> ClientContext<Env, W> {
227 pub fn wallet(&self) -> &Wallet {
229 &self.wallet
230 }
231
232 pub fn wallet_mut(&mut self) -> &mut W {
234 &mut self.wallet
235 }
236
237 pub async fn mutate_wallet<R: Send>(
238 &mut self,
239 mutation: impl FnOnce(&mut Wallet) -> R + Send,
240 ) -> Result<R, Error> {
241 self.wallet
242 .mutate(mutation)
243 .await
244 .map_err(|e| error::Inner::Persistence(Box::new(e)).into())
245 }
246
247 pub fn default_account(&self) -> Account {
250 Account::chain(self.default_chain())
251 }
252
253 pub fn default_chain(&self) -> ChainId {
255 self.wallet
256 .default_chain()
257 .expect("No chain specified in wallet with no default chain")
258 }
259
260 pub fn first_non_admin_chain(&self) -> ChainId {
261 self.wallet
262 .first_non_admin_chain()
263 .expect("No non-admin chain specified in wallet with no non-admin chain")
264 }
265
266 pub fn make_node_provider(&self) -> NodeProvider {
267 NodeProvider::new(self.make_node_options())
268 }
269
270 fn make_node_options(&self) -> NodeOptions {
271 NodeOptions {
272 send_timeout: self.send_timeout,
273 recv_timeout: self.recv_timeout,
274 retry_delay: self.retry_delay,
275 max_retries: self.max_retries,
276 }
277 }
278
279 #[cfg(not(web))]
280 pub fn client_metrics(&self) -> Option<&ClientMetrics> {
281 self.client_metrics.as_ref()
282 }
283
284 pub async fn save_wallet(&mut self) -> Result<(), Error> {
285 self.wallet
286 .persist()
287 .await
288 .map_err(|e| error::Inner::Persistence(Box::new(e)).into())
289 }
290
291 pub async fn update_wallet_from_client<Env_: Environment>(
292 &mut self,
293 client: &ChainClient<Env_>,
294 ) -> Result<(), Error> {
295 let info = client.chain_info().await?;
296 let client_owner = client.preferred_owner();
297 let pending_proposal = client.pending_proposal().clone();
298 self.wallet
299 .as_mut()
300 .update_from_info(pending_proposal, client_owner, &info);
301 self.save_wallet().await
302 }
303
304 pub async fn update_wallet_for_new_chain(
306 &mut self,
307 chain_id: ChainId,
308 owner: Option<AccountOwner>,
309 timestamp: Timestamp,
310 epoch: Epoch,
311 ) -> Result<(), Error> {
312 if self.wallet.get(chain_id).is_none() {
313 self.mutate_wallet(|w| {
314 w.insert(UserChain {
315 chain_id,
316 owner,
317 block_hash: None,
318 timestamp,
319 next_block_height: BlockHeight::ZERO,
320 pending_proposal: None,
321 epoch: Some(epoch),
322 })
323 })
324 .await?;
325 }
326
327 Ok(())
328 }
329
330 pub async fn process_inbox(
331 &mut self,
332 chain_client: &ChainClient<Env>,
333 ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
334 let mut certificates = Vec::new();
335 let (new_certificates, maybe_timeout) = {
337 chain_client.synchronize_from_validators().await?;
338 let result = chain_client.process_inbox_without_prepare().await;
339 self.update_wallet_from_client(chain_client).await?;
340 if result.is_err() {
341 self.save_wallet().await?;
342 }
343 result?
344 };
345 certificates.extend(new_certificates);
346 if maybe_timeout.is_none() {
347 self.save_wallet().await?;
348 return Ok(certificates);
349 }
350
351 let (listener, _listen_handle, mut notification_stream) =
353 chain_client.listen(ListeningMode::FullChain).await?;
354 self.chain_listeners.spawn_task(listener);
355
356 loop {
357 let (new_certificates, maybe_timeout) = {
358 let result = chain_client.process_inbox().await;
359 self.update_wallet_from_client(chain_client).await?;
360 if result.is_err() {
361 self.save_wallet().await?;
362 }
363 result?
364 };
365 certificates.extend(new_certificates);
366 if let Some(timestamp) = maybe_timeout {
367 util::wait_for_next_round(&mut notification_stream, timestamp).await
368 } else {
369 self.save_wallet().await?;
370 return Ok(certificates);
371 }
372 }
373 }
374
375 pub async fn assign_new_chain_to_key(
376 &mut self,
377 chain_id: ChainId,
378 owner: AccountOwner,
379 ) -> Result<(), Error> {
380 self.client.track_chain(chain_id);
381 let client = self.make_chain_client(chain_id);
382 let chain_description = client.get_chain_description().await?;
383 let config = chain_description.config();
384
385 if !config.ownership.verify_owner(&owner) {
386 tracing::error!(
387 "The chain with the ID returned by the faucet is not owned by you. \
388 Please make sure you are connecting to a genuine faucet."
389 );
390 return Err(error::Inner::ChainOwnership.into());
391 }
392
393 self.wallet_mut()
394 .mutate(|w| {
395 w.assign_new_chain_to_owner(
396 owner,
397 chain_id,
398 chain_description.timestamp(),
399 chain_description.config().epoch,
400 )
401 })
402 .await
403 .map_err(|e| error::Inner::Persistence(Box::new(e)))?
404 .context("assigning new chain")?;
405 Ok(())
406 }
407
408 pub async fn apply_client_command<E, F, Fut, T>(
413 &mut self,
414 client: &ChainClient<Env>,
415 mut f: F,
416 ) -> Result<T, Error>
417 where
418 F: FnMut(&ChainClient<Env>) -> Fut,
419 Fut: Future<Output = Result<ClientOutcome<T>, E>>,
420 Error: From<E>,
421 {
422 client.prepare_chain().await?;
423 let result = f(client).await;
425 self.update_wallet_from_client(client).await?;
426 if let ClientOutcome::Committed(t) = result? {
427 return Ok(t);
428 }
429
430 let (listener, _listen_handle, mut notification_stream) =
432 client.listen(ListeningMode::FullChain).await?;
433 self.chain_listeners.spawn_task(listener);
434
435 loop {
436 let result = f(client).await;
438 self.update_wallet_from_client(client).await?;
439 let timeout = match result? {
440 ClientOutcome::Committed(t) => return Ok(t),
441 ClientOutcome::WaitForTimeout(timeout) => timeout,
442 };
443 util::wait_for_next_round(&mut notification_stream, timeout).await;
445 }
446 }
447
448 pub async fn change_ownership(
449 &mut self,
450 chain_id: Option<ChainId>,
451 ownership_config: ChainOwnershipConfig,
452 ) -> Result<(), Error> {
453 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
454 let chain_client = self.make_chain_client(chain_id);
455 info!(
456 ?ownership_config, %chain_id, preferred_owner=?chain_client.preferred_owner(),
457 "Changing ownership of a chain"
458 );
459 let time_start = Instant::now();
460 let ownership = ChainOwnership::try_from(ownership_config)?;
461
462 let certificate = self
463 .apply_client_command(&chain_client, |chain_client| {
464 let ownership = ownership.clone();
465 let chain_client = chain_client.clone();
466 async move {
467 chain_client
468 .change_ownership(ownership)
469 .await
470 .map_err(Error::from)
471 .context("Failed to change ownership")
472 }
473 })
474 .await?;
475 let time_total = time_start.elapsed();
476 info!("Operation confirmed after {} ms", time_total.as_millis());
477 debug!("{:?}", certificate);
478 Ok(())
479 }
480
481 pub async fn set_preferred_owner(
482 &mut self,
483 chain_id: Option<ChainId>,
484 preferred_owner: AccountOwner,
485 ) -> Result<(), Error> {
486 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
487 let mut chain_client = self.make_chain_client(chain_id);
488 let old_owner = chain_client.preferred_owner();
489 info!(%chain_id, ?old_owner, %preferred_owner, "Changing preferred owner for chain");
490 chain_client.set_preferred_owner(preferred_owner);
491 self.update_wallet_from_client(&chain_client).await?;
492 info!("New preferred owner set");
493 Ok(())
494 }
495
496 pub async fn check_compatible_version_info(
497 &self,
498 address: &str,
499 node: &impl ValidatorNode,
500 ) -> Result<VersionInfo, Error> {
501 match node.get_version_info().await {
502 Ok(version_info) if version_info.is_compatible_with(&linera_version::VERSION_INFO) => {
503 info!(
504 "Version information for validator {address}: {}",
505 version_info
506 );
507 Ok(version_info)
508 }
509 Ok(version_info) => Err(error::Inner::UnexpectedVersionInfo {
510 remote: Box::new(version_info),
511 local: Box::new(linera_version::VERSION_INFO.clone()),
512 }
513 .into()),
514 Err(error) => Err(error::Inner::UnavailableVersionInfo {
515 address: address.to_string(),
516 error: Box::new(error),
517 }
518 .into()),
519 }
520 }
521
522 pub async fn check_matching_network_description(
523 &self,
524 address: &str,
525 node: &impl ValidatorNode,
526 ) -> Result<CryptoHash, Error> {
527 let network_description = self.wallet().genesis_config().network_description();
528 match node.get_network_description().await {
529 Ok(description) => {
530 if description == network_description {
531 Ok(description.genesis_config_hash)
532 } else {
533 Err(error::Inner::UnexpectedNetworkDescription {
534 remote: Box::new(description),
535 local: Box::new(network_description),
536 }
537 .into())
538 }
539 }
540 Err(error) => Err(error::Inner::UnavailableNetworkDescription {
541 address: address.to_string(),
542 error: Box::new(error),
543 }
544 .into()),
545 }
546 }
547
548 pub async fn check_validator_chain_info_response(
549 &self,
550 public_key: Option<&ValidatorPublicKey>,
551 address: &str,
552 node: &impl ValidatorNode,
553 chain_id: ChainId,
554 ) -> Result<(), Error> {
555 let query = ChainInfoQuery::new(chain_id);
556 match node.handle_chain_info_query(query).await {
557 Ok(response) => {
558 info!(
559 "Validator {address} sees chain {chain_id} at block height {} and epoch {:?}",
560 response.info.next_block_height, response.info.epoch,
561 );
562 if let Some(public_key) = public_key {
563 if response.check(*public_key).is_ok() {
564 info!("Signature for public key {public_key} is OK.");
565 } else {
566 return Err(error::Inner::InvalidSignature {
567 public_key: *public_key,
568 }
569 .into());
570 }
571 }
572 Ok(())
573 }
574 Err(error) => Err(error::Inner::UnavailableChainInfo {
575 address: address.to_string(),
576 chain_id,
577 error: Box::new(error),
578 }
579 .into()),
580 }
581 }
582}
583
584#[cfg(feature = "fs")]
585impl<Env: Environment, W> ClientContext<Env, W>
586where
587 W: Persist<Target = Wallet>,
588{
589 pub async fn publish_module(
590 &mut self,
591 chain_client: &ChainClient<Env>,
592 contract: PathBuf,
593 service: PathBuf,
594 vm_runtime: VmRuntime,
595 ) -> Result<ModuleId, Error> {
596 info!("Loading bytecode files");
597 let contract_bytecode = Bytecode::load_from_file(&contract)
598 .await
599 .with_context(|| format!("failed to load contract bytecode from {:?}", &contract))?;
600 let service_bytecode = Bytecode::load_from_file(&service)
601 .await
602 .with_context(|| format!("failed to load service bytecode from {:?}", &service))?;
603
604 info!("Publishing module");
605 let (blobs, module_id) =
606 create_bytecode_blobs(contract_bytecode, service_bytecode, vm_runtime).await;
607 let (module_id, _) = self
608 .apply_client_command(chain_client, |chain_client| {
609 let blobs = blobs.clone();
610 let chain_client = chain_client.clone();
611 async move {
612 chain_client
613 .publish_module_blobs(blobs, module_id)
614 .await
615 .context("Failed to publish module")
616 }
617 })
618 .await?;
619
620 info!("{}", "Module published successfully!");
621
622 info!("Synchronizing client and processing inbox");
623 self.process_inbox(chain_client).await?;
624 Ok(module_id)
625 }
626
627 pub async fn publish_data_blob(
628 &mut self,
629 chain_client: &ChainClient<Env>,
630 blob_path: PathBuf,
631 ) -> Result<CryptoHash, Error> {
632 info!("Loading data blob file");
633 let blob_bytes = fs::read(&blob_path).context(format!(
634 "failed to load data blob bytes from {:?}",
635 &blob_path
636 ))?;
637
638 info!("Publishing data blob");
639 self.apply_client_command(chain_client, |chain_client| {
640 let blob_bytes = blob_bytes.clone();
641 let chain_client = chain_client.clone();
642 async move {
643 chain_client
644 .publish_data_blob(blob_bytes)
645 .await
646 .context("Failed to publish data blob")
647 }
648 })
649 .await?;
650
651 info!("{}", "Data blob published successfully!");
652 Ok(CryptoHash::new(&BlobContent::new_data(blob_bytes)))
653 }
654
655 pub async fn read_data_blob(
657 &mut self,
658 chain_client: &ChainClient<Env>,
659 hash: CryptoHash,
660 ) -> Result<(), Error> {
661 info!("Verifying data blob");
662 self.apply_client_command(chain_client, |chain_client| {
663 let chain_client = chain_client.clone();
664 async move {
665 chain_client
666 .read_data_blob(hash)
667 .await
668 .context("Failed to verify data blob")
669 }
670 })
671 .await?;
672
673 info!("{}", "Data blob verified successfully!");
674 Ok(())
675 }
676}
677
678#[cfg(not(web))]
679impl<Env: Environment, W> ClientContext<Env, W>
680where
681 W: Persist<Target = Wallet>,
682{
683 pub async fn prepare_for_benchmark(
684 &mut self,
685 num_chains: usize,
686 tokens_per_chain: Amount,
687 fungible_application_id: Option<ApplicationId>,
688 pub_keys: Vec<AccountPublicKey>,
689 chains_config_path: Option<&Path>,
690 ) -> Result<(Vec<ChainClient<Env>>, Vec<ChainId>), Error> {
691 let start = Instant::now();
692 self.process_inboxes_and_force_validator_updates().await;
696 info!(
697 "Processed inboxes and forced validator updates in {} ms",
698 start.elapsed().as_millis()
699 );
700
701 let start = Instant::now();
702 let (benchmark_chains, chain_clients) = self
703 .make_benchmark_chains(
704 num_chains,
705 tokens_per_chain,
706 pub_keys,
707 chains_config_path.is_some(),
708 )
709 .await?;
710 info!(
711 "Got {} chains in {} ms",
712 num_chains,
713 start.elapsed().as_millis()
714 );
715
716 if let Some(id) = fungible_application_id {
717 let start = Instant::now();
718 self.supply_fungible_tokens(&benchmark_chains, id).await?;
719 info!(
720 "Supplied fungible tokens in {} ms",
721 start.elapsed().as_millis()
722 );
723 let start = Instant::now();
725 for chain_client in &chain_clients {
726 chain_client.process_inbox().await?;
727 }
728 info!(
729 "Processed inboxes after supplying fungible tokens in {} ms",
730 start.elapsed().as_millis()
731 );
732 }
733
734 let all_chains = Benchmark::<Env>::get_all_chains(chains_config_path, &benchmark_chains)?;
735 let known_chain_ids: HashSet<_> = benchmark_chains.iter().map(|(id, _)| *id).collect();
736 let unknown_chain_ids: Vec<_> = all_chains
737 .iter()
738 .filter(|id| !known_chain_ids.contains(id))
739 .copied()
740 .collect();
741 if !unknown_chain_ids.is_empty() {
742 for chain_id in &unknown_chain_ids {
746 self.client.get_chain_description(*chain_id).await?;
747 }
748 }
749
750 Ok((chain_clients, all_chains))
751 }
752
753 pub async fn wrap_up_benchmark(
754 &mut self,
755 chain_clients: Vec<ChainClient<Env>>,
756 close_chains: bool,
757 wrap_up_max_in_flight: usize,
758 ) -> Result<(), Error> {
759 if close_chains {
760 info!("Closing chains...");
761 let stream = stream::iter(chain_clients)
762 .map(|chain_client| async move {
763 Benchmark::<Env>::close_benchmark_chain(&chain_client).await?;
764 info!("Closed chain {:?}", chain_client.chain_id());
765 Ok::<(), BenchmarkError>(())
766 })
767 .buffer_unordered(wrap_up_max_in_flight);
768 stream.try_collect::<Vec<_>>().await?;
769 } else {
770 info!("Processing inbox for all chains...");
771 let stream = stream::iter(chain_clients.clone())
772 .map(|chain_client| async move {
773 chain_client.process_inbox().await?;
774 info!("Processed inbox for chain {:?}", chain_client.chain_id());
775 Ok::<(), ChainClientError>(())
776 })
777 .buffer_unordered(wrap_up_max_in_flight);
778 stream.try_collect::<Vec<_>>().await?;
779
780 info!("Updating wallet from chain clients...");
781 for chain_client in chain_clients {
782 let info = chain_client.chain_info().await?;
783 let client_owner = chain_client.preferred_owner();
784 let pending_proposal = chain_client.pending_proposal().clone();
785 self.wallet
786 .as_mut()
787 .update_from_info(pending_proposal, client_owner, &info);
788 }
789 self.save_wallet().await?;
790 }
791
792 Ok(())
793 }
794
795 async fn process_inboxes_and_force_validator_updates(&mut self) {
796 let mut chain_clients = vec![];
797 for chain_id in &self.wallet.owned_chain_ids() {
798 chain_clients.push(self.make_chain_client(*chain_id));
799 }
800
801 let mut join_set = task::JoinSet::new();
802 for chain_client in chain_clients {
803 join_set.spawn(async move {
804 Self::process_inbox_without_updating_wallet(&chain_client)
805 .await
806 .expect("Processing inbox should not fail!");
807 chain_client
808 });
809 }
810
811 let chain_clients = join_set.join_all().await;
812 for chain_client in &chain_clients {
813 self.update_wallet_from_client(chain_client).await.unwrap();
814 }
815 }
816
817 async fn process_inbox_without_updating_wallet(
818 chain_client: &ChainClient<Env>,
819 ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
820 chain_client.synchronize_from_validators().await?;
822 let (certificates, maybe_timeout) = chain_client.process_inbox_without_prepare().await?;
823 assert!(
824 maybe_timeout.is_none(),
825 "Should not timeout within benchmark!"
826 );
827
828 Ok(certificates)
829 }
830
831 async fn make_benchmark_chains(
834 &mut self,
835 num_chains: usize,
836 balance: Amount,
837 pub_keys: Vec<AccountPublicKey>,
838 wallet_only: bool,
839 ) -> Result<(Vec<(ChainId, AccountOwner)>, Vec<ChainClient<Env>>), Error> {
840 let mut chains_found_in_wallet = 0;
841 let mut benchmark_chains = Vec::with_capacity(num_chains);
842 let mut chain_clients = Vec::with_capacity(num_chains);
843 let start = Instant::now();
844 for chain_id in self.wallet.owned_chain_ids() {
845 if chains_found_in_wallet == num_chains {
846 break;
847 }
848 let chain_client = self.make_chain_client(chain_id);
849 let ownership = chain_client.chain_info().await?.manager.ownership;
850 if !ownership.owners.is_empty() || ownership.super_owners.len() != 1 {
851 continue;
852 }
853 chain_client.process_inbox().await?;
854 benchmark_chains.push((
855 chain_id,
856 *ownership
857 .super_owners
858 .first()
859 .expect("should have a super owner"),
860 ));
861 chain_clients.push(chain_client);
862 chains_found_in_wallet += 1;
863 }
864 info!(
865 "Got {} chains from the wallet in {} ms",
866 benchmark_chains.len(),
867 start.elapsed().as_millis()
868 );
869
870 let num_chains_to_create = num_chains - chains_found_in_wallet;
871
872 let default_chain_id = self
873 .wallet
874 .default_chain()
875 .expect("should have default chain");
876 let default_chain_client = self.make_chain_client(default_chain_id);
877
878 if num_chains_to_create > 0 {
879 if wallet_only {
880 return Err(
881 error::Inner::Benchmark(BenchmarkError::NotEnoughChainsInWallet(
882 num_chains,
883 chains_found_in_wallet,
884 ))
885 .into(),
886 );
887 }
888 let mut pub_keys_iter = pub_keys.into_iter().take(num_chains_to_create);
889 let operations_per_block = 900; for i in (0..num_chains_to_create).step_by(operations_per_block) {
891 let num_new_chains = operations_per_block.min(num_chains_to_create - i);
892 let pub_key = pub_keys_iter.next().unwrap();
893 let owner = pub_key.into();
894
895 let certificate = Self::execute_open_chains_operations(
896 num_new_chains,
897 &default_chain_client,
898 balance,
899 owner,
900 )
901 .await?;
902 info!("Block executed successfully");
903
904 let block = certificate.block();
905 for i in 0..num_new_chains {
906 let chain_id = block.body.blobs[i]
907 .iter()
908 .find(|blob| blob.id().blob_type == BlobType::ChainDescription)
909 .map(|blob| ChainId(blob.id().hash))
910 .expect("failed to create a new chain");
911 self.client.track_chain(chain_id);
912
913 let mut chain_client = self.client.create_chain_client(
914 chain_id,
915 None,
916 BlockHeight::ZERO,
917 None,
918 Some(owner),
919 self.timing_sender(),
920 );
921 chain_client.set_preferred_owner(owner);
922 chain_client.process_inbox().await?;
923 benchmark_chains.push((chain_id, owner));
924 chain_clients.push(chain_client);
925 }
926 }
927
928 info!(
929 "Created {} chains in {} ms",
930 num_chains_to_create,
931 start.elapsed().as_millis()
932 );
933 }
934
935 info!("Updating wallet from client");
936 self.update_wallet_from_client(&default_chain_client)
937 .await?;
938 info!("Retrying pending outgoing messages");
939 default_chain_client
940 .retry_pending_outgoing_messages()
941 .await
942 .context("outgoing messages to create the new chains should be delivered")?;
943 info!("Processing default chain inbox");
944 default_chain_client.process_inbox().await?;
945
946 assert_eq!(
947 benchmark_chains.len(),
948 chain_clients.len(),
949 "benchmark_chains and chain_clients must have the same size"
950 );
951
952 Ok((benchmark_chains, chain_clients))
953 }
954
955 async fn execute_open_chains_operations(
956 num_new_chains: usize,
957 chain_client: &ChainClient<Env>,
958 balance: Amount,
959 owner: AccountOwner,
960 ) -> Result<ConfirmedBlockCertificate, Error> {
961 let config = OpenChainConfig {
962 ownership: ChainOwnership::single_super(owner),
963 balance,
964 application_permissions: Default::default(),
965 };
966 let operations = iter::repeat_n(
967 Operation::system(SystemOperation::OpenChain(config)),
968 num_new_chains,
969 )
970 .collect();
971 info!("Executing {} OpenChain operations", num_new_chains);
972 Ok(chain_client
973 .execute_operations(operations, vec![])
974 .await?
975 .expect("should execute block with OpenChain operations"))
976 }
977
978 async fn supply_fungible_tokens(
980 &mut self,
981 key_pairs: &[(ChainId, AccountOwner)],
982 application_id: ApplicationId,
983 ) -> Result<(), Error> {
984 let default_chain_id = self
985 .wallet
986 .default_chain()
987 .expect("should have default chain");
988 let default_key = self.wallet.get(default_chain_id).unwrap().owner.unwrap();
989 let amount = Amount::from_nanos(4);
991 let operations: Vec<Operation> = key_pairs
992 .iter()
993 .map(|(chain_id, owner)| {
994 Benchmark::<Env>::fungible_transfer(
995 application_id,
996 *chain_id,
997 default_key,
998 *owner,
999 amount,
1000 )
1001 })
1002 .collect();
1003 let chain_client = self.make_chain_client(default_chain_id);
1004 for operation_chunk in operations.chunks(1000) {
1006 chain_client
1007 .execute_operations(operation_chunk.to_vec(), vec![])
1008 .await?
1009 .expect("should execute block with Transfer operations");
1010 }
1011 self.update_wallet_from_client(&chain_client).await?;
1012
1013 Ok(())
1014 }
1015}