1use std::sync::Arc;
5
6use futures::{Future, StreamExt as _, TryStreamExt as _};
7use linera_base::{
8 crypto::{CryptoHash, ValidatorPublicKey},
9 data_types::{Epoch, Timestamp},
10 identifiers::{Account, AccountOwner, ChainId},
11 ownership::ChainOwnership,
12 time::{Duration, Instant},
13 util::future::FutureSyncExt as _,
14};
15use linera_chain::{manager::LockingBlock, types::ConfirmedBlockCertificate};
16use linera_core::{
17 client::{ChainClient, Client, ListeningMode},
18 data_types::{ChainInfo, ChainInfoQuery, ClientOutcome},
19 join_set_ext::JoinSet,
20 node::ValidatorNode,
21 wallet, Environment, JoinSetExt as _, Wallet as _,
22};
23use linera_rpc::node_provider::{NodeOptions, NodeProvider};
24use linera_version::VersionInfo;
25use thiserror_context::Context;
26use tracing::{debug, info, warn};
27#[cfg(not(web))]
28use {
29 crate::{
30 benchmark::{Benchmark, BenchmarkError},
31 client_metrics::ClientMetrics,
32 },
33 futures::stream,
34 linera_base::{
35 crypto::AccountPublicKey,
36 data_types::{Amount, BlockHeight},
37 identifiers::{ApplicationId, BlobType},
38 },
39 linera_core::client::chain_client,
40 linera_execution::{
41 system::{OpenChainConfig, SystemOperation},
42 Operation,
43 },
44 std::{collections::HashSet, iter, path::Path},
45 tokio::{sync::mpsc, task},
46};
47#[cfg(feature = "fs")]
48use {
49 linera_base::{
50 data_types::{BlobContent, Bytecode},
51 identifiers::ModuleId,
52 vm::VmRuntime,
53 },
54 linera_core::client::create_bytecode_blobs,
55 std::{fs, path::PathBuf},
56};
57
58use crate::{
59 chain_listener::{self, ClientContext as _},
60 client_options::{ChainOwnershipConfig, ClientContextOptions},
61 config::GenesisConfig,
62 error, util, Error,
63};
64
65pub struct ValidatorQueryResults {
67 pub version_info: Result<VersionInfo, Error>,
69 pub genesis_config_hash: Result<CryptoHash, Error>,
71 pub chain_info: Result<ChainInfo, Error>,
73}
74
75impl ValidatorQueryResults {
76 pub fn errors(&self) -> Vec<&Error> {
78 let mut errors = Vec::new();
79 if let Err(e) = &self.version_info {
80 errors.push(e);
81 }
82 if let Err(e) = &self.genesis_config_hash {
83 errors.push(e);
84 }
85 if let Err(e) = &self.chain_info {
86 errors.push(e);
87 }
88 errors
89 }
90
91 pub fn print(
96 &self,
97 public_key: Option<&ValidatorPublicKey>,
98 address: Option<&str>,
99 weight: Option<u64>,
100 reference: Option<&ValidatorQueryResults>,
101 ) {
102 if let Some(key) = public_key {
103 println!("Public key: {}", key);
104 }
105 if let Some(address) = address {
106 println!("Address: {}", address);
107 }
108 if let Some(w) = weight {
109 println!("Weight: {}", w);
110 }
111
112 let ref_version = reference.and_then(|ref_results| ref_results.version_info.as_ref().ok());
113 match &self.version_info {
114 Ok(version_info) => {
115 if ref_version.is_none_or(|ref_v| ref_v.crate_version != version_info.crate_version)
116 {
117 println!("Linera protocol: v{}", version_info.crate_version);
118 }
119 if ref_version.is_none_or(|ref_v| ref_v.rpc_hash != version_info.rpc_hash) {
120 println!("RPC API hash: {}", version_info.rpc_hash);
121 }
122 if ref_version.is_none_or(|ref_v| ref_v.graphql_hash != version_info.graphql_hash) {
123 println!("GraphQL API hash: {}", version_info.graphql_hash);
124 }
125 if ref_version.is_none_or(|ref_v| ref_v.wit_hash != version_info.wit_hash) {
126 println!("WIT API hash: {}", version_info.wit_hash);
127 }
128 if ref_version.is_none_or(|ref_v| {
129 (&ref_v.git_commit, ref_v.git_dirty)
130 != (&version_info.git_commit, version_info.git_dirty)
131 }) {
132 println!(
133 "Source code: {}/tree/{}{}",
134 env!("CARGO_PKG_REPOSITORY"),
135 version_info.git_commit,
136 if version_info.git_dirty {
137 " (dirty)"
138 } else {
139 ""
140 }
141 );
142 }
143 }
144 Err(err) => println!("Error getting version info: {err}"),
145 }
146
147 let ref_genesis_hash =
148 reference.and_then(|ref_results| ref_results.genesis_config_hash.as_ref().ok());
149 match &self.genesis_config_hash {
150 Ok(hash) if ref_genesis_hash.is_some_and(|ref_hash| ref_hash == hash) => {}
151 Ok(hash) => println!("Genesis config hash: {hash}"),
152 Err(err) => println!("Error getting genesis config: {err}"),
153 }
154
155 let ref_info = reference.and_then(|ref_results| ref_results.chain_info.as_ref().ok());
156 match &self.chain_info {
157 Ok(info) => {
158 if ref_info.is_none_or(|ref_info| info.block_hash != ref_info.block_hash) {
159 if let Some(hash) = info.block_hash {
160 println!("Block hash: {}", hash);
161 } else {
162 println!("Block hash: None");
163 }
164 }
165 if ref_info
166 .is_none_or(|ref_info| info.next_block_height != ref_info.next_block_height)
167 {
168 println!("Next height: {}", info.next_block_height);
169 }
170 if ref_info.is_none_or(|ref_info| info.timestamp != ref_info.timestamp) {
171 println!("Timestamp: {}", info.timestamp);
172 }
173 if ref_info.is_none_or(|ref_info| info.epoch != ref_info.epoch) {
174 println!("Epoch: {}", info.epoch);
175 }
176 if ref_info.is_none_or(|ref_info| {
177 info.manager.current_round != ref_info.manager.current_round
178 }) {
179 println!("Round: {}", info.manager.current_round);
180 }
181 if let Some(locking) = &info.manager.requested_locking {
182 match &**locking {
183 LockingBlock::Fast(proposal) => {
184 println!(
185 "Locking fast block from {}",
186 proposal.content.block.timestamp
187 );
188 }
189 LockingBlock::Regular(validated) => {
190 println!(
191 "Locking block {} in {} from {}",
192 validated.hash(),
193 validated.round,
194 validated.block().header.timestamp
195 );
196 }
197 }
198 }
199 }
200 Err(err) => println!("Error getting chain info: {err}"),
201 }
202 }
203}
204
205pub struct ClientContext<Env: Environment> {
206 pub client: Arc<Client<Env>>,
207 pub genesis_config: crate::config::GenesisConfig,
209 pub send_timeout: Duration,
210 pub recv_timeout: Duration,
211 pub retry_delay: Duration,
212 pub max_retries: u32,
213 pub chain_listeners: JoinSet,
214 pub default_chain: Option<ChainId>,
216 #[cfg(not(web))]
217 pub client_metrics: Option<ClientMetrics>,
218}
219
220impl<Env: Environment> chain_listener::ClientContext for ClientContext<Env> {
221 type Environment = Env;
222
223 fn wallet(&self) -> &Env::Wallet {
224 self.client.wallet()
225 }
226
227 fn storage(&self) -> &Env::Storage {
228 self.client.storage_client()
229 }
230
231 fn client(&self) -> &Arc<Client<Env>> {
232 &self.client
233 }
234
235 #[cfg(not(web))]
236 fn timing_sender(
237 &self,
238 ) -> Option<mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
239 self.client_metrics
240 .as_ref()
241 .map(|metrics| metrics.timing_sender.clone())
242 }
243
244 async fn update_wallet_for_new_chain(
245 &mut self,
246 chain_id: ChainId,
247 owner: Option<AccountOwner>,
248 timestamp: Timestamp,
249 epoch: Epoch,
250 ) -> Result<(), Error> {
251 self.update_wallet_for_new_chain(chain_id, owner, timestamp, epoch)
252 .make_sync()
253 .await
254 }
255
256 async fn update_wallet(&mut self, client: &ChainClient<Env>) -> Result<(), Error> {
257 self.update_wallet_from_client(client).make_sync().await
258 }
259}
260
261impl<S, Si, W> ClientContext<linera_core::environment::Impl<S, NodeProvider, Si, W>>
262where
263 S: linera_core::environment::Storage,
264 Si: linera_core::environment::Signer,
265 W: linera_core::environment::Wallet,
266{
267 #[allow(clippy::too_many_arguments)]
271 pub async fn new(
272 storage: S,
273 wallet: W,
274 signer: Si,
275 options: ClientContextOptions,
276 default_chain: Option<ChainId>,
277 genesis_config: GenesisConfig,
278 block_cache_size: usize,
279 execution_state_cache_size: usize,
280 ) -> Result<Self, Error> {
281 #[cfg(not(web))]
282 let timing_config = options.to_timing_config();
283 let node_provider = NodeProvider::new(NodeOptions {
284 send_timeout: options.send_timeout,
285 recv_timeout: options.recv_timeout,
286 retry_delay: options.retry_delay,
287 max_retries: options.max_retries,
288 });
289 let chain_ids: Vec<_> = wallet
290 .chain_ids()
291 .try_collect()
292 .await
293 .map_err(error::Inner::wallet)?;
294 let name = match chain_ids.len() {
295 0 => "Client node".to_string(),
296 1 => format!("Client node for {:.8}", chain_ids[0]),
297 n => format!("Client node for {:.8} and {} others", chain_ids[0], n - 1),
298 };
299 let client = Client::new(
300 linera_core::environment::Impl {
301 network: node_provider,
302 storage,
303 signer,
304 wallet,
305 },
306 genesis_config.admin_id(),
307 options.long_lived_services,
308 chain_ids,
309 name,
310 options.chain_worker_ttl,
311 options.sender_chain_worker_ttl,
312 options.to_chain_client_options(),
313 block_cache_size,
314 execution_state_cache_size,
315 options.to_requests_scheduler_config(),
316 );
317
318 #[cfg(not(web))]
319 let client_metrics = if timing_config.enabled {
320 Some(ClientMetrics::new(timing_config))
321 } else {
322 None
323 };
324
325 Ok(ClientContext {
326 client: Arc::new(client),
327 default_chain,
328 genesis_config,
329 send_timeout: options.send_timeout,
330 recv_timeout: options.recv_timeout,
331 retry_delay: options.retry_delay,
332 max_retries: options.max_retries,
333 chain_listeners: JoinSet::default(),
334 #[cfg(not(web))]
335 client_metrics,
336 })
337 }
338}
339
340impl<Env: Environment> ClientContext<Env> {
341 pub fn wallet(&self) -> &Env::Wallet {
345 self.client.wallet()
346 }
347
348 pub fn admin_chain(&self) -> ChainId {
350 self.client.admin_chain()
351 }
352
353 pub fn default_account(&self) -> Account {
356 Account::chain(self.default_chain())
357 }
358
359 pub fn default_chain(&self) -> ChainId {
361 self.default_chain
362 .expect("default chain requested but none set")
363 }
364
365 pub async fn first_non_admin_chain(&self) -> Result<ChainId, Error> {
366 let admin_id = self.admin_chain();
367 std::pin::pin!(self
368 .wallet()
369 .chain_ids()
370 .try_filter(|chain_id| futures::future::ready(*chain_id != admin_id)))
371 .next()
372 .await
373 .expect("No non-admin chain specified in wallet with no non-admin chain")
374 .map_err(Error::wallet)
375 }
376
377 pub fn make_node_provider(&self) -> NodeProvider {
379 NodeProvider::new(self.make_node_options())
380 }
381
382 fn make_node_options(&self) -> NodeOptions {
383 NodeOptions {
384 send_timeout: self.send_timeout,
385 recv_timeout: self.recv_timeout,
386 retry_delay: self.retry_delay,
387 max_retries: self.max_retries,
388 }
389 }
390
391 #[cfg(not(web))]
392 pub fn client_metrics(&self) -> Option<&ClientMetrics> {
393 self.client_metrics.as_ref()
394 }
395
396 pub async fn update_wallet_from_client<Env_: Environment>(
397 &self,
398 client: &ChainClient<Env_>,
399 ) -> Result<(), Error> {
400 let info = client.chain_info().await?;
401 let chain_id = info.chain_id;
402 let client_owner = client.preferred_owner();
403 let pending_proposal = client.pending_proposal().clone();
404 let new_chain: wallet::Chain = info.as_ref().into();
405 let modified = self
407 .wallet()
408 .modify(chain_id, |chain| {
409 chain.block_hash = new_chain.block_hash;
410 chain.next_block_height = new_chain.next_block_height;
411 chain.timestamp = new_chain.timestamp;
412 chain.epoch = new_chain.epoch;
413 chain.owner = client_owner;
414 chain.pending_proposal = pending_proposal.clone();
415 })
416 .await
417 .map_err(error::Inner::wallet)?;
418 if modified.is_none() {
420 self.wallet()
421 .insert(
422 chain_id,
423 wallet::Chain {
424 pending_proposal,
425 owner: client_owner,
426 ..new_chain
427 },
428 )
429 .await
430 .map_err(error::Inner::wallet)?;
431 }
432 Ok(())
433 }
434
435 pub async fn update_wallet_for_new_chain(
437 &mut self,
438 chain_id: ChainId,
439 owner: Option<AccountOwner>,
440 timestamp: Timestamp,
441 epoch: Epoch,
442 ) -> Result<(), Error> {
443 let _ = self
444 .wallet()
445 .try_insert(
446 chain_id,
447 linera_core::wallet::Chain::new(owner, epoch, timestamp),
448 )
449 .await
450 .map_err(error::Inner::wallet)?;
451 Ok(())
452 }
453
454 pub async fn set_follow_only(&self, chain_id: ChainId, follow_only: bool) -> Result<(), Error> {
456 self.wallet()
457 .modify(chain_id, |chain| chain.follow_only = follow_only)
458 .await
459 .map_err(error::Inner::wallet)?
460 .ok_or_else(|| error::Inner::UnknownChainId(chain_id))?;
461 self.client.set_chain_follow_only(chain_id, follow_only);
462 Ok(())
463 }
464
465 pub async fn process_inbox(
466 &mut self,
467 chain_client: &ChainClient<Env>,
468 ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
469 let mut certificates = Vec::new();
470 let (new_certificates, maybe_timeout) = {
472 chain_client.synchronize_from_validators().await?;
473 let result = chain_client.process_inbox_without_prepare().await;
474 self.update_wallet_from_client(chain_client).await?;
475 result?
476 };
477 certificates.extend(new_certificates);
478 if maybe_timeout.is_none() {
479 return Ok(certificates);
480 }
481
482 let (listener, _listen_handle, mut notification_stream) =
484 chain_client.listen(ListeningMode::FullChain).await?;
485 self.chain_listeners.spawn_task(listener);
486
487 loop {
488 let (new_certificates, maybe_timeout) = {
489 let result = chain_client.process_inbox().await;
490 self.update_wallet_from_client(chain_client).await?;
491 result?
492 };
493 certificates.extend(new_certificates);
494 if let Some(timestamp) = maybe_timeout {
495 util::wait_for_next_round(&mut notification_stream, timestamp).await
496 } else {
497 return Ok(certificates);
498 }
499 }
500 }
501
502 pub async fn assign_new_chain_to_key(
503 &mut self,
504 chain_id: ChainId,
505 owner: AccountOwner,
506 ) -> Result<(), Error> {
507 self.client.track_chain(chain_id);
508 let client = self.make_chain_client(chain_id).await?;
509 let chain_description = client.get_chain_description().await?;
510 let config = chain_description.config();
511
512 if !config.ownership.is_owner(&owner) {
513 tracing::error!(
514 "The chain with the ID returned by the faucet is not owned by you. \
515 Please make sure you are connecting to a genuine faucet."
516 );
517 return Err(error::Inner::ChainOwnership.into());
518 }
519
520 let timestamp = chain_description.timestamp();
522 let epoch = chain_description.config().epoch;
523 let modified = self
524 .wallet()
525 .modify(chain_id, |chain| {
526 chain.owner = Some(owner);
527 chain.follow_only = false;
528 })
529 .await
530 .map_err(error::Inner::wallet)?;
531 if modified.is_none() {
533 self.wallet()
534 .insert(
535 chain_id,
536 wallet::Chain {
537 owner: Some(owner),
538 timestamp,
539 epoch: Some(epoch),
540 ..Default::default()
541 },
542 )
543 .await
544 .map_err(error::Inner::wallet)
545 .context("assigning new chain")?;
546 }
547 Ok(())
548 }
549
550 pub async fn apply_client_command<E, F, Fut, T>(
555 &mut self,
556 client: &ChainClient<Env>,
557 mut f: F,
558 ) -> Result<T, Error>
559 where
560 F: FnMut(&ChainClient<Env>) -> Fut,
561 Fut: Future<Output = Result<ClientOutcome<T>, E>>,
562 Error: From<E>,
563 {
564 client.prepare_chain().await?;
565 let result = f(client).await;
567 self.update_wallet_from_client(client).await?;
568 if let ClientOutcome::Committed(t) = result? {
569 return Ok(t);
570 }
571
572 let (listener, _listen_handle, mut notification_stream) =
574 client.listen(ListeningMode::FullChain).await?;
575 self.chain_listeners.spawn_task(listener);
576
577 loop {
578 let result = f(client).await;
580 self.update_wallet_from_client(client).await?;
581 let timeout = match result? {
582 ClientOutcome::Committed(t) => return Ok(t),
583 ClientOutcome::WaitForTimeout(timeout) => timeout,
584 };
585 util::wait_for_next_round(&mut notification_stream, timeout).await;
587 }
588 }
589
590 pub async fn ownership(&mut self, chain_id: Option<ChainId>) -> Result<ChainOwnership, Error> {
591 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
592 let client = self.make_chain_client(chain_id).await?;
593 let info = client.chain_info().await?;
594 Ok(info.manager.ownership)
595 }
596
597 pub async fn change_ownership(
598 &mut self,
599 chain_id: Option<ChainId>,
600 ownership_config: ChainOwnershipConfig,
601 ) -> Result<(), Error> {
602 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
603 let chain_client = self.make_chain_client(chain_id).await?;
604 info!(
605 ?ownership_config, %chain_id, preferred_owner=?chain_client.preferred_owner(),
606 "Changing ownership of a chain"
607 );
608 let time_start = Instant::now();
609 let ownership = ChainOwnership::try_from(ownership_config)?;
610
611 let certificate = self
612 .apply_client_command(&chain_client, |chain_client| {
613 let ownership = ownership.clone();
614 let chain_client = chain_client.clone();
615 async move {
616 chain_client
617 .change_ownership(ownership)
618 .await
619 .map_err(Error::from)
620 .context("Failed to change ownership")
621 }
622 })
623 .await?;
624 let time_total = time_start.elapsed();
625 info!("Operation confirmed after {} ms", time_total.as_millis());
626 debug!("{:?}", certificate);
627 Ok(())
628 }
629
630 pub async fn set_preferred_owner(
631 &mut self,
632 chain_id: Option<ChainId>,
633 preferred_owner: AccountOwner,
634 ) -> Result<(), Error> {
635 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
636 let mut chain_client = self.make_chain_client(chain_id).await?;
637 let old_owner = chain_client.preferred_owner();
638 info!(%chain_id, ?old_owner, %preferred_owner, "Changing preferred owner for chain");
639 chain_client.set_preferred_owner(preferred_owner);
640 self.update_wallet_from_client(&chain_client).await?;
641 info!("New preferred owner set");
642 Ok(())
643 }
644
645 pub async fn check_compatible_version_info(
646 &self,
647 address: &str,
648 node: &impl ValidatorNode,
649 ) -> Result<VersionInfo, Error> {
650 match node.get_version_info().await {
651 Ok(version_info) if version_info.is_compatible_with(&linera_version::VERSION_INFO) => {
652 debug!(
653 "Version information for validator {address}: {}",
654 version_info
655 );
656 Ok(version_info)
657 }
658 Ok(version_info) => Err(error::Inner::UnexpectedVersionInfo {
659 remote: Box::new(version_info),
660 local: Box::new(linera_version::VERSION_INFO.clone()),
661 }
662 .into()),
663 Err(error) => Err(error::Inner::UnavailableVersionInfo {
664 address: address.to_string(),
665 error: Box::new(error),
666 }
667 .into()),
668 }
669 }
670
671 pub async fn check_matching_network_description(
672 &self,
673 address: &str,
674 node: &impl ValidatorNode,
675 ) -> Result<CryptoHash, Error> {
676 let network_description = self.genesis_config.network_description();
677 match node.get_network_description().await {
678 Ok(description) => {
679 if description == network_description {
680 Ok(description.genesis_config_hash)
681 } else {
682 Err(error::Inner::UnexpectedNetworkDescription {
683 remote: Box::new(description),
684 local: Box::new(network_description),
685 }
686 .into())
687 }
688 }
689 Err(error) => Err(error::Inner::UnavailableNetworkDescription {
690 address: address.to_string(),
691 error: Box::new(error),
692 }
693 .into()),
694 }
695 }
696
697 pub async fn check_validator_chain_info_response(
698 &self,
699 public_key: Option<&ValidatorPublicKey>,
700 address: &str,
701 node: &impl ValidatorNode,
702 chain_id: ChainId,
703 ) -> Result<ChainInfo, Error> {
704 let query = ChainInfoQuery::new(chain_id).with_manager_values();
705 match node.handle_chain_info_query(query).await {
706 Ok(response) => {
707 debug!(
708 "Validator {address} sees chain {chain_id} at block height {} and epoch {:?}",
709 response.info.next_block_height, response.info.epoch,
710 );
711 if let Some(public_key) = public_key {
712 if response.check(*public_key).is_ok() {
713 debug!("Signature for public key {public_key} is OK.");
714 } else {
715 return Err(error::Inner::InvalidSignature {
716 public_key: *public_key,
717 }
718 .into());
719 }
720 } else {
721 warn!("Not checking signature as public key was not given");
722 }
723 Ok(*response.info)
724 }
725 Err(error) => Err(error::Inner::UnavailableChainInfo {
726 address: address.to_string(),
727 chain_id,
728 error: Box::new(error),
729 }
730 .into()),
731 }
732 }
733
734 pub async fn query_validator(
738 &self,
739 address: &str,
740 node: &impl ValidatorNode,
741 chain_id: ChainId,
742 public_key: Option<&ValidatorPublicKey>,
743 ) -> ValidatorQueryResults {
744 let version_info = self.check_compatible_version_info(address, node).await;
745 let genesis_config_hash = self.check_matching_network_description(address, node).await;
746 let chain_info = self
747 .check_validator_chain_info_response(public_key, address, node, chain_id)
748 .await;
749
750 ValidatorQueryResults {
751 version_info,
752 genesis_config_hash,
753 chain_info,
754 }
755 }
756
757 pub async fn query_local_node(
761 &self,
762 chain_id: ChainId,
763 ) -> Result<ValidatorQueryResults, Error> {
764 let version_info = Ok(linera_version::VERSION_INFO.clone());
765 let genesis_config_hash = Ok(self
766 .genesis_config
767 .network_description()
768 .genesis_config_hash);
769 let chain_info = self
770 .make_chain_client(chain_id)
771 .await?
772 .chain_info_with_manager_values()
773 .await
774 .map(|info| *info)
775 .map_err(|e| e.into());
776
777 Ok(ValidatorQueryResults {
778 version_info,
779 genesis_config_hash,
780 chain_info,
781 })
782 }
783}
784
785#[cfg(feature = "fs")]
786impl<Env: Environment> ClientContext<Env> {
787 pub async fn publish_module(
788 &mut self,
789 chain_client: &ChainClient<Env>,
790 contract: PathBuf,
791 service: PathBuf,
792 vm_runtime: VmRuntime,
793 ) -> Result<ModuleId, Error> {
794 info!("Loading bytecode files");
795 let contract_bytecode = Bytecode::load_from_file(&contract)
796 .with_context(|| format!("failed to load contract bytecode from {:?}", &contract))?;
797 let service_bytecode = Bytecode::load_from_file(&service)
798 .with_context(|| format!("failed to load service bytecode from {:?}", &service))?;
799
800 info!("Publishing module");
801 let (blobs, module_id) =
802 create_bytecode_blobs(contract_bytecode, service_bytecode, vm_runtime).await;
803 let (module_id, _) = self
804 .apply_client_command(chain_client, |chain_client| {
805 let blobs = blobs.clone();
806 let chain_client = chain_client.clone();
807 async move {
808 chain_client
809 .publish_module_blobs(blobs, module_id)
810 .await
811 .context("Failed to publish module")
812 }
813 })
814 .await?;
815
816 info!("{}", "Module published successfully!");
817
818 info!("Synchronizing client and processing inbox");
819 self.process_inbox(chain_client).await?;
820 Ok(module_id)
821 }
822
823 pub async fn publish_data_blob(
824 &mut self,
825 chain_client: &ChainClient<Env>,
826 blob_path: PathBuf,
827 ) -> Result<CryptoHash, Error> {
828 info!("Loading data blob file");
829 let blob_bytes = fs::read(&blob_path).context(format!(
830 "failed to load data blob bytes from {:?}",
831 &blob_path
832 ))?;
833
834 info!("Publishing data blob");
835 self.apply_client_command(chain_client, |chain_client| {
836 let blob_bytes = blob_bytes.clone();
837 let chain_client = chain_client.clone();
838 async move {
839 chain_client
840 .publish_data_blob(blob_bytes)
841 .await
842 .context("Failed to publish data blob")
843 }
844 })
845 .await?;
846
847 info!("{}", "Data blob published successfully!");
848 Ok(CryptoHash::new(&BlobContent::new_data(blob_bytes)))
849 }
850
851 pub async fn read_data_blob(
853 &mut self,
854 chain_client: &ChainClient<Env>,
855 hash: CryptoHash,
856 ) -> Result<(), Error> {
857 info!("Verifying data blob");
858 self.apply_client_command(chain_client, |chain_client| {
859 let chain_client = chain_client.clone();
860 async move {
861 chain_client
862 .read_data_blob(hash)
863 .await
864 .context("Failed to verify data blob")
865 }
866 })
867 .await?;
868
869 info!("{}", "Data blob verified successfully!");
870 Ok(())
871 }
872}
873
874#[cfg(not(web))]
875impl<Env: Environment> ClientContext<Env> {
876 pub async fn prepare_for_benchmark(
877 &mut self,
878 num_chains: usize,
879 tokens_per_chain: Amount,
880 fungible_application_id: Option<ApplicationId>,
881 pub_keys: Vec<AccountPublicKey>,
882 chains_config_path: Option<&Path>,
883 ) -> Result<(Vec<ChainClient<Env>>, Vec<ChainId>), Error> {
884 let start = Instant::now();
885 self.process_inboxes_and_force_validator_updates().await;
889 info!(
890 "Processed inboxes and forced validator updates in {} ms",
891 start.elapsed().as_millis()
892 );
893
894 let start = Instant::now();
895 let (benchmark_chains, chain_clients) = self
896 .make_benchmark_chains(
897 num_chains,
898 tokens_per_chain,
899 pub_keys,
900 chains_config_path.is_some(),
901 )
902 .await?;
903 info!(
904 "Got {} chains in {} ms",
905 num_chains,
906 start.elapsed().as_millis()
907 );
908
909 if let Some(id) = fungible_application_id {
910 let start = Instant::now();
911 self.supply_fungible_tokens(&benchmark_chains, id).await?;
912 info!(
913 "Supplied fungible tokens in {} ms",
914 start.elapsed().as_millis()
915 );
916 let start = Instant::now();
918 for chain_client in &chain_clients {
919 chain_client.process_inbox().await?;
920 }
921 info!(
922 "Processed inboxes after supplying fungible tokens in {} ms",
923 start.elapsed().as_millis()
924 );
925 }
926
927 let all_chains = Benchmark::<Env>::get_all_chains(chains_config_path, &benchmark_chains)?;
928 let known_chain_ids: HashSet<_> = benchmark_chains.iter().map(|(id, _)| *id).collect();
929 let unknown_chain_ids: Vec<_> = all_chains
930 .iter()
931 .filter(|id| !known_chain_ids.contains(id))
932 .copied()
933 .collect();
934 if !unknown_chain_ids.is_empty() {
935 for chain_id in &unknown_chain_ids {
939 self.client.get_chain_description(*chain_id).await?;
940 }
941 }
942
943 Ok((chain_clients, all_chains))
944 }
945
946 pub async fn wrap_up_benchmark(
947 &mut self,
948 chain_clients: Vec<ChainClient<Env>>,
949 close_chains: bool,
950 wrap_up_max_in_flight: usize,
951 ) -> Result<(), Error> {
952 if close_chains {
953 info!("Closing chains...");
954 let stream = stream::iter(chain_clients)
955 .map(|chain_client| async move {
956 Benchmark::<Env>::close_benchmark_chain(&chain_client).await?;
957 info!("Closed chain {:?}", chain_client.chain_id());
958 Ok::<(), BenchmarkError>(())
959 })
960 .buffer_unordered(wrap_up_max_in_flight);
961 stream.try_collect::<Vec<_>>().await?;
962 } else {
963 info!("Processing inbox for all chains...");
964 let stream = stream::iter(chain_clients.clone())
965 .map(|chain_client| async move {
966 chain_client.process_inbox().await?;
967 info!("Processed inbox for chain {:?}", chain_client.chain_id());
968 Ok::<(), chain_client::Error>(())
969 })
970 .buffer_unordered(wrap_up_max_in_flight);
971 stream.try_collect::<Vec<_>>().await?;
972
973 info!("Updating wallet from chain clients...");
974 for chain_client in chain_clients {
975 let info = chain_client.chain_info().await?;
976 let client_owner = chain_client.preferred_owner();
977 let pending_proposal = chain_client.pending_proposal().clone();
978 self.wallet()
979 .insert(
980 info.chain_id,
981 wallet::Chain {
982 pending_proposal,
983 owner: client_owner,
984 ..info.as_ref().into()
985 },
986 )
987 .await
988 .map_err(error::Inner::wallet)?;
989 }
990 }
991
992 Ok(())
993 }
994
995 async fn process_inboxes_and_force_validator_updates(&mut self) {
996 let mut join_set = task::JoinSet::new();
997
998 let chain_clients: Vec<_> = self
999 .wallet()
1000 .owned_chain_ids()
1001 .map_err(|e| error::Inner::wallet(e).into())
1002 .and_then(|id| self.make_chain_client(id))
1003 .try_collect()
1004 .await
1005 .unwrap();
1006
1007 for chain_client in chain_clients {
1008 join_set.spawn(async move {
1009 Self::process_inbox_without_updating_wallet(&chain_client)
1010 .await
1011 .expect("Processing inbox should not fail!");
1012 chain_client
1013 });
1014 }
1015
1016 for chain_client in join_set.join_all().await {
1017 self.update_wallet_from_client(&chain_client).await.unwrap();
1018 }
1019 }
1020
1021 async fn process_inbox_without_updating_wallet(
1022 chain_client: &ChainClient<Env>,
1023 ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
1024 chain_client.synchronize_from_validators().await?;
1026 let (certificates, maybe_timeout) = chain_client.process_inbox_without_prepare().await?;
1027 assert!(
1028 maybe_timeout.is_none(),
1029 "Should not timeout within benchmark!"
1030 );
1031
1032 Ok(certificates)
1033 }
1034
1035 async fn make_benchmark_chains(
1038 &mut self,
1039 num_chains: usize,
1040 balance: Amount,
1041 pub_keys: Vec<AccountPublicKey>,
1042 wallet_only: bool,
1043 ) -> Result<(Vec<(ChainId, AccountOwner)>, Vec<ChainClient<Env>>), Error> {
1044 let mut chains_found_in_wallet = 0;
1045 let mut benchmark_chains = Vec::with_capacity(num_chains);
1046 let mut chain_clients = Vec::with_capacity(num_chains);
1047 let start = Instant::now();
1048 let mut owned_chain_ids = std::pin::pin!(self.wallet().owned_chain_ids());
1049 while let Some(chain_id) = owned_chain_ids.next().await {
1050 let chain_id = chain_id.map_err(error::Inner::wallet)?;
1051 if chains_found_in_wallet == num_chains {
1052 break;
1053 }
1054 let chain_client = self.make_chain_client(chain_id).await?;
1055 let ownership = chain_client.chain_info().await?.manager.ownership;
1056 if !ownership.owners.is_empty() || ownership.super_owners.len() != 1 {
1057 continue;
1058 }
1059 chain_client.process_inbox().await?;
1060 benchmark_chains.push((
1061 chain_id,
1062 *ownership
1063 .super_owners
1064 .first()
1065 .expect("should have a super owner"),
1066 ));
1067 chain_clients.push(chain_client);
1068 chains_found_in_wallet += 1;
1069 }
1070 info!(
1071 "Got {} chains from the wallet in {} ms",
1072 benchmark_chains.len(),
1073 start.elapsed().as_millis()
1074 );
1075
1076 let num_chains_to_create = num_chains - chains_found_in_wallet;
1077
1078 let default_chain_client = self.make_chain_client(self.default_chain()).await?;
1079
1080 if num_chains_to_create > 0 {
1081 if wallet_only {
1082 return Err(
1083 error::Inner::Benchmark(BenchmarkError::NotEnoughChainsInWallet(
1084 num_chains,
1085 chains_found_in_wallet,
1086 ))
1087 .into(),
1088 );
1089 }
1090 let mut pub_keys_iter = pub_keys.into_iter().take(num_chains_to_create);
1091 let operations_per_block = 900; for i in (0..num_chains_to_create).step_by(operations_per_block) {
1093 let num_new_chains = operations_per_block.min(num_chains_to_create - i);
1094 let pub_key = pub_keys_iter.next().unwrap();
1095 let owner = pub_key.into();
1096
1097 let certificate = Self::execute_open_chains_operations(
1098 num_new_chains,
1099 &default_chain_client,
1100 balance,
1101 owner,
1102 )
1103 .await?;
1104 info!("Block executed successfully");
1105
1106 let block = certificate.block();
1107 for i in 0..num_new_chains {
1108 let chain_id = block.body.blobs[i]
1109 .iter()
1110 .find(|blob| blob.id().blob_type == BlobType::ChainDescription)
1111 .map(|blob| ChainId(blob.id().hash))
1112 .expect("failed to create a new chain");
1113 self.client.track_chain(chain_id);
1114
1115 let mut chain_client = self.client.create_chain_client(
1116 chain_id,
1117 None,
1118 BlockHeight::ZERO,
1119 None,
1120 Some(owner),
1121 self.timing_sender(),
1122 false,
1123 );
1124 chain_client.set_preferred_owner(owner);
1125 chain_client.process_inbox().await?;
1126 benchmark_chains.push((chain_id, owner));
1127 chain_clients.push(chain_client);
1128 }
1129 }
1130
1131 info!(
1132 "Created {} chains in {} ms",
1133 num_chains_to_create,
1134 start.elapsed().as_millis()
1135 );
1136 }
1137
1138 info!("Updating wallet from client");
1139 self.update_wallet_from_client(&default_chain_client)
1140 .await?;
1141 info!("Retrying pending outgoing messages");
1142 default_chain_client
1143 .retry_pending_outgoing_messages()
1144 .await
1145 .context("outgoing messages to create the new chains should be delivered")?;
1146 info!("Processing default chain inbox");
1147 default_chain_client.process_inbox().await?;
1148
1149 assert_eq!(
1150 benchmark_chains.len(),
1151 chain_clients.len(),
1152 "benchmark_chains and chain_clients must have the same size"
1153 );
1154
1155 Ok((benchmark_chains, chain_clients))
1156 }
1157
1158 async fn execute_open_chains_operations(
1159 num_new_chains: usize,
1160 chain_client: &ChainClient<Env>,
1161 balance: Amount,
1162 owner: AccountOwner,
1163 ) -> Result<ConfirmedBlockCertificate, Error> {
1164 let config = OpenChainConfig {
1165 ownership: ChainOwnership::single_super(owner),
1166 balance,
1167 application_permissions: Default::default(),
1168 };
1169 let operations = iter::repeat_n(
1170 Operation::system(SystemOperation::OpenChain(config)),
1171 num_new_chains,
1172 )
1173 .collect();
1174 info!("Executing {} OpenChain operations", num_new_chains);
1175 Ok(chain_client
1176 .execute_operations(operations, vec![])
1177 .await?
1178 .expect("should execute block with OpenChain operations"))
1179 }
1180
1181 async fn supply_fungible_tokens(
1183 &mut self,
1184 key_pairs: &[(ChainId, AccountOwner)],
1185 application_id: ApplicationId,
1186 ) -> Result<(), Error> {
1187 let default_chain_id = self.default_chain();
1188 let default_key = self
1189 .wallet()
1190 .get(default_chain_id)
1191 .await
1192 .unwrap()
1193 .unwrap()
1194 .owner
1195 .unwrap();
1196 let amount = Amount::from_nanos(4);
1198 let operations: Vec<Operation> = key_pairs
1199 .iter()
1200 .map(|(chain_id, owner)| {
1201 Benchmark::<Env>::fungible_transfer(
1202 application_id,
1203 *chain_id,
1204 default_key,
1205 *owner,
1206 amount,
1207 )
1208 })
1209 .collect();
1210 let chain_client = self.make_chain_client(default_chain_id).await?;
1211 for operation_chunk in operations.chunks(1000) {
1213 chain_client
1214 .execute_operations(operation_chunk.to_vec(), vec![])
1215 .await?
1216 .expect("should execute block with Transfer operations");
1217 }
1218 self.update_wallet_from_client(&chain_client).await?;
1219
1220 Ok(())
1221 }
1222}