1use std::sync::Arc;
5
6use futures::{Future, StreamExt as _, TryStreamExt as _};
7use linera_base::{
8 crypto::{CryptoHash, ValidatorPublicKey},
9 data_types::{Epoch, Timestamp},
10 identifiers::{Account, AccountOwner, ChainId},
11 ownership::ChainOwnership,
12 time::{Duration, Instant},
13 util::future::FutureSyncExt as _,
14};
15use linera_chain::{manager::LockingBlock, types::ConfirmedBlockCertificate};
16use linera_core::{
17 client::{chain_client, ChainClient, Client, ListeningMode},
18 data_types::{ChainInfo, ChainInfoQuery, ClientOutcome},
19 join_set_ext::JoinSet,
20 node::ValidatorNode,
21 wallet, Environment, JoinSetExt as _, Wallet as _,
22};
23use linera_rpc::node_provider::{NodeOptions, NodeProvider};
24use linera_version::VersionInfo;
25use thiserror_context::Context;
26use tracing::{debug, info, warn};
27#[cfg(not(web))]
28use {
29 crate::{
30 benchmark::{Benchmark, BenchmarkError},
31 client_metrics::ClientMetrics,
32 },
33 futures::stream,
34 linera_base::{
35 crypto::AccountPublicKey,
36 data_types::{Amount, BlockHeight},
37 identifiers::{ApplicationId, BlobType},
38 },
39 linera_execution::{
40 system::{OpenChainConfig, SystemOperation},
41 Operation,
42 },
43 std::{collections::HashSet, iter, path::Path},
44 tokio::{sync::mpsc, task},
45};
46#[cfg(feature = "fs")]
47use {
48 linera_base::{
49 data_types::{BlobContent, Bytecode},
50 identifiers::ModuleId,
51 vm::VmRuntime,
52 },
53 linera_core::client::create_bytecode_blobs,
54 std::{fs, path::PathBuf},
55};
56
57use crate::{
58 chain_listener::{self, ClientContext as _},
59 client_options::{ChainOwnershipConfig, Options},
60 config::GenesisConfig,
61 error, util, Error,
62};
63
64pub struct ValidatorQueryResults {
66 pub version_info: Result<VersionInfo, Error>,
68 pub genesis_config_hash: Result<CryptoHash, Error>,
70 pub chain_info: Result<ChainInfo, Error>,
72}
73
74impl ValidatorQueryResults {
75 pub fn errors(&self) -> Vec<&Error> {
77 let mut errors = Vec::new();
78 if let Err(e) = &self.version_info {
79 errors.push(e);
80 }
81 if let Err(e) = &self.genesis_config_hash {
82 errors.push(e);
83 }
84 if let Err(e) = &self.chain_info {
85 errors.push(e);
86 }
87 errors
88 }
89
90 pub fn print(
95 &self,
96 public_key: Option<&ValidatorPublicKey>,
97 address: Option<&str>,
98 weight: Option<u64>,
99 reference: Option<&ValidatorQueryResults>,
100 ) {
101 if let Some(key) = public_key {
102 println!("Public key: {}", key);
103 }
104 if let Some(address) = address {
105 println!("Address: {}", address);
106 }
107 if let Some(w) = weight {
108 println!("Weight: {}", w);
109 }
110
111 let ref_version = reference.and_then(|ref_results| ref_results.version_info.as_ref().ok());
112 match &self.version_info {
113 Ok(version_info) => {
114 if ref_version.is_none_or(|ref_v| ref_v.crate_version != version_info.crate_version)
115 {
116 println!("Linera protocol: v{}", version_info.crate_version);
117 }
118 if ref_version.is_none_or(|ref_v| ref_v.rpc_hash != version_info.rpc_hash) {
119 println!("RPC API hash: {}", version_info.rpc_hash);
120 }
121 if ref_version.is_none_or(|ref_v| ref_v.graphql_hash != version_info.graphql_hash) {
122 println!("GraphQL API hash: {}", version_info.graphql_hash);
123 }
124 if ref_version.is_none_or(|ref_v| ref_v.wit_hash != version_info.wit_hash) {
125 println!("WIT API hash: {}", version_info.wit_hash);
126 }
127 if ref_version.is_none_or(|ref_v| {
128 (&ref_v.git_commit, ref_v.git_dirty)
129 != (&version_info.git_commit, version_info.git_dirty)
130 }) {
131 println!(
132 "Source code: {}/tree/{}{}",
133 env!("CARGO_PKG_REPOSITORY"),
134 version_info.git_commit,
135 if version_info.git_dirty {
136 " (dirty)"
137 } else {
138 ""
139 }
140 );
141 }
142 }
143 Err(err) => println!("Error getting version info: {err}"),
144 }
145
146 let ref_genesis_hash =
147 reference.and_then(|ref_results| ref_results.genesis_config_hash.as_ref().ok());
148 match &self.genesis_config_hash {
149 Ok(hash) if ref_genesis_hash.is_some_and(|ref_hash| ref_hash == hash) => {}
150 Ok(hash) => println!("Genesis config hash: {hash}"),
151 Err(err) => println!("Error getting genesis config: {err}"),
152 }
153
154 let ref_info = reference.and_then(|ref_results| ref_results.chain_info.as_ref().ok());
155 match &self.chain_info {
156 Ok(info) => {
157 if ref_info.is_none_or(|ref_info| info.block_hash != ref_info.block_hash) {
158 if let Some(hash) = info.block_hash {
159 println!("Block hash: {}", hash);
160 } else {
161 println!("Block hash: None");
162 }
163 }
164 if ref_info
165 .is_none_or(|ref_info| info.next_block_height != ref_info.next_block_height)
166 {
167 println!("Next height: {}", info.next_block_height);
168 }
169 if ref_info.is_none_or(|ref_info| info.timestamp != ref_info.timestamp) {
170 println!("Timestamp: {}", info.timestamp);
171 }
172 if ref_info.is_none_or(|ref_info| info.epoch != ref_info.epoch) {
173 println!("Epoch: {}", info.epoch);
174 }
175 if ref_info.is_none_or(|ref_info| {
176 info.manager.current_round != ref_info.manager.current_round
177 }) {
178 println!("Round: {}", info.manager.current_round);
179 }
180 if let Some(locking) = &info.manager.requested_locking {
181 match &**locking {
182 LockingBlock::Fast(proposal) => {
183 println!(
184 "Locking fast block from {}",
185 proposal.content.block.timestamp
186 );
187 }
188 LockingBlock::Regular(validated) => {
189 println!(
190 "Locking block {} in {} from {}",
191 validated.hash(),
192 validated.round,
193 validated.block().header.timestamp
194 );
195 }
196 }
197 }
198 }
199 Err(err) => println!("Error getting chain info: {err}"),
200 }
201 }
202}
203
204pub struct ClientContext<Env: Environment> {
205 pub client: Arc<Client<Env>>,
206 pub genesis_config: crate::config::GenesisConfig,
208 pub send_timeout: Duration,
209 pub recv_timeout: Duration,
210 pub retry_delay: Duration,
211 pub max_retries: u32,
212 pub chain_listeners: JoinSet,
213 pub default_chain: Option<ChainId>,
215 #[cfg(not(web))]
216 pub client_metrics: Option<ClientMetrics>,
217}
218
219impl<Env: Environment> chain_listener::ClientContext for ClientContext<Env> {
220 type Environment = Env;
221
222 fn wallet(&self) -> &Env::Wallet {
223 self.client.wallet()
224 }
225
226 fn storage(&self) -> &Env::Storage {
227 self.client.storage_client()
228 }
229
230 fn client(&self) -> &Arc<Client<Env>> {
231 &self.client
232 }
233
234 #[cfg(not(web))]
235 fn timing_sender(
236 &self,
237 ) -> Option<mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
238 self.client_metrics
239 .as_ref()
240 .map(|metrics| metrics.timing_sender.clone())
241 }
242
243 async fn update_wallet_for_new_chain(
244 &mut self,
245 chain_id: ChainId,
246 owner: Option<AccountOwner>,
247 timestamp: Timestamp,
248 epoch: Epoch,
249 ) -> Result<(), Error> {
250 self.update_wallet_for_new_chain(chain_id, owner, timestamp, epoch)
251 .make_sync()
252 .await
253 }
254
255 async fn update_wallet(&mut self, client: &ChainClient<Env>) -> Result<(), Error> {
256 self.update_wallet_from_client(client).make_sync().await
257 }
258}
259
260impl<S, Si, W> ClientContext<linera_core::environment::Impl<S, NodeProvider, Si, W>>
261where
262 S: linera_core::environment::Storage,
263 Si: linera_core::environment::Signer,
264 W: linera_core::environment::Wallet,
265{
266 #[allow(clippy::too_many_arguments)]
270 pub async fn new(
271 storage: S,
272 wallet: W,
273 signer: Si,
274 options: &Options,
275 default_chain: Option<ChainId>,
276 genesis_config: GenesisConfig,
277 block_cache_size: usize,
278 execution_state_cache_size: usize,
279 ) -> Result<Self, Error> {
280 #[cfg(not(web))]
281 let timing_config = options.to_timing_config();
282 let node_provider = NodeProvider::new(NodeOptions {
283 send_timeout: options.send_timeout,
284 recv_timeout: options.recv_timeout,
285 retry_delay: options.retry_delay,
286 max_retries: options.max_retries,
287 });
288 let chain_modes: Vec<_> = wallet
289 .items()
290 .map_ok(|(id, chain)| {
291 let mode = if chain.is_follow_only() {
292 ListeningMode::FollowChain
293 } else {
294 ListeningMode::FullChain
295 };
296 (id, mode)
297 })
298 .try_collect()
299 .await
300 .map_err(error::Inner::wallet)?;
301 let name = match chain_modes.len() {
302 0 => "Client node".to_string(),
303 1 => format!("Client node for {:.8}", chain_modes[0].0),
304 n => format!(
305 "Client node for {:.8} and {} others",
306 chain_modes[0].0,
307 n - 1
308 ),
309 };
310
311 let client = Client::new(
312 linera_core::environment::Impl {
313 network: node_provider,
314 storage,
315 signer,
316 wallet,
317 },
318 genesis_config.admin_chain_id(),
319 options.long_lived_services,
320 chain_modes,
321 name,
322 options.chain_worker_ttl,
323 options.sender_chain_worker_ttl,
324 options.to_chain_client_options(),
325 block_cache_size,
326 execution_state_cache_size,
327 options.to_requests_scheduler_config(),
328 );
329
330 #[cfg(not(web))]
331 let client_metrics = if timing_config.enabled {
332 Some(ClientMetrics::new(timing_config))
333 } else {
334 None
335 };
336
337 Ok(ClientContext {
338 client: Arc::new(client),
339 default_chain,
340 genesis_config,
341 send_timeout: options.send_timeout,
342 recv_timeout: options.recv_timeout,
343 retry_delay: options.retry_delay,
344 max_retries: options.max_retries,
345 chain_listeners: JoinSet::default(),
346 #[cfg(not(web))]
347 client_metrics,
348 })
349 }
350}
351
352impl<Env: Environment> ClientContext<Env> {
353 pub fn wallet(&self) -> &Env::Wallet {
357 self.client.wallet()
358 }
359
360 pub fn admin_chain_id(&self) -> ChainId {
362 self.client.admin_chain_id()
363 }
364
365 pub fn default_account(&self) -> Account {
368 Account::chain(self.default_chain())
369 }
370
371 pub fn default_chain(&self) -> ChainId {
373 self.default_chain
374 .expect("default chain requested but none set")
375 }
376
377 pub async fn first_non_admin_chain(&self) -> Result<ChainId, Error> {
378 let admin_chain_id = self.admin_chain_id();
379 std::pin::pin!(self
380 .wallet()
381 .chain_ids()
382 .try_filter(|chain_id| futures::future::ready(*chain_id != admin_chain_id)))
383 .next()
384 .await
385 .expect("No non-admin chain specified in wallet with no non-admin chain")
386 .map_err(Error::wallet)
387 }
388
389 pub fn make_node_provider(&self) -> NodeProvider {
391 NodeProvider::new(self.make_node_options())
392 }
393
394 fn make_node_options(&self) -> NodeOptions {
395 NodeOptions {
396 send_timeout: self.send_timeout,
397 recv_timeout: self.recv_timeout,
398 retry_delay: self.retry_delay,
399 max_retries: self.max_retries,
400 }
401 }
402
403 #[cfg(not(web))]
404 pub fn client_metrics(&self) -> Option<&ClientMetrics> {
405 self.client_metrics.as_ref()
406 }
407
408 pub async fn update_wallet_from_client<Env_: Environment>(
409 &self,
410 client: &ChainClient<Env_>,
411 ) -> Result<(), Error> {
412 let info = client.chain_info().await?;
413 let chain_id = info.chain_id;
414 let new_chain = wallet::Chain {
415 pending_proposal: client.pending_proposal().clone(),
416 owner: client.preferred_owner(),
417 ..info.as_ref().into()
418 };
419
420 self.wallet()
421 .insert(chain_id, new_chain)
422 .await
423 .map_err(error::Inner::wallet)?;
424
425 Ok(())
426 }
427
428 pub async fn update_wallet_for_new_chain(
430 &mut self,
431 chain_id: ChainId,
432 owner: Option<AccountOwner>,
433 timestamp: Timestamp,
434 epoch: Epoch,
435 ) -> Result<(), Error> {
436 self.wallet()
437 .try_insert(
438 chain_id,
439 linera_core::wallet::Chain::new(owner, epoch, timestamp),
440 )
441 .await
442 .map_err(error::Inner::wallet)?;
443 Ok(())
444 }
445
446 pub async fn process_inbox(
447 &mut self,
448 chain_client: &ChainClient<Env>,
449 ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
450 let mut certificates = Vec::new();
451 let (new_certificates, maybe_timeout) = {
453 chain_client.synchronize_from_validators().await?;
454 let result = chain_client.process_inbox_without_prepare().await;
455 self.update_wallet_from_client(chain_client).await?;
456 result?
457 };
458 certificates.extend(new_certificates);
459 if maybe_timeout.is_none() {
460 return Ok(certificates);
461 }
462
463 let (listener, _listen_handle, mut notification_stream) = chain_client.listen().await?;
465 self.chain_listeners.spawn_task(listener);
466
467 loop {
468 let (new_certificates, maybe_timeout) = {
469 let result = chain_client.process_inbox().await;
470 self.update_wallet_from_client(chain_client).await?;
471 result?
472 };
473 certificates.extend(new_certificates);
474 if let Some(timestamp) = maybe_timeout {
475 util::wait_for_next_round(&mut notification_stream, timestamp).await
476 } else {
477 return Ok(certificates);
478 }
479 }
480 }
481
482 pub async fn assign_new_chain_to_key(
483 &mut self,
484 chain_id: ChainId,
485 owner: AccountOwner,
486 ) -> Result<(), Error> {
487 self.client
488 .extend_chain_mode(chain_id, ListeningMode::FullChain);
489 let client = self.make_chain_client(chain_id).await?;
490 let chain_description = client.get_chain_description().await?;
491 let config = chain_description.config();
492
493 if !config.ownership.is_owner(&owner) {
494 tracing::error!(
495 "The chain with the ID returned by the faucet is not owned by you. \
496 Please make sure you are connecting to a genuine faucet."
497 );
498 return Err(error::Inner::ChainOwnership.into());
499 }
500
501 let modified = self
503 .wallet()
504 .modify(chain_id, |chain| chain.owner = Some(owner))
505 .await
506 .map_err(error::Inner::wallet)?;
507 if modified.is_none() {
509 let timestamp = chain_description.timestamp();
510 let epoch = chain_description.config().epoch;
511 self.wallet()
512 .insert(
513 chain_id,
514 wallet::Chain {
515 owner: Some(owner),
516 timestamp,
517 epoch: Some(epoch),
518 ..Default::default()
519 },
520 )
521 .await
522 .map_err(error::Inner::wallet)
523 .context("assigning new chain")?;
524 }
525 Ok(())
526 }
527
528 pub async fn apply_client_command<E, F, Fut, T>(
533 &mut self,
534 client: &ChainClient<Env>,
535 mut f: F,
536 ) -> Result<T, Error>
537 where
538 F: FnMut(&ChainClient<Env>) -> Fut,
539 Fut: Future<Output = Result<ClientOutcome<T>, E>>,
540 Error: From<E>,
541 {
542 client.prepare_chain().await?;
543 let result = f(client).await;
545 self.update_wallet_from_client(client).await?;
546 match result? {
547 ClientOutcome::Committed(t) => return Ok(t),
548 ClientOutcome::Conflict(certificate) => {
549 return Err(chain_client::Error::Conflict(certificate.hash()).into());
550 }
551 ClientOutcome::WaitForTimeout(_) => {}
552 }
553
554 let (listener, _listen_handle, mut notification_stream) = client.listen().await?;
556 self.chain_listeners.spawn_task(listener);
557
558 loop {
559 let result = f(client).await;
561 self.update_wallet_from_client(client).await?;
562 let timeout = match result? {
563 ClientOutcome::Committed(t) => return Ok(t),
564 ClientOutcome::Conflict(certificate) => {
565 return Err(chain_client::Error::Conflict(certificate.hash()).into());
566 }
567 ClientOutcome::WaitForTimeout(timeout) => timeout,
568 };
569 util::wait_for_next_round(&mut notification_stream, timeout).await;
571 }
572 }
573
574 pub async fn ownership(&mut self, chain_id: Option<ChainId>) -> Result<ChainOwnership, Error> {
575 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
576 let client = self.make_chain_client(chain_id).await?;
577 let info = client.chain_info().await?;
578 Ok(info.manager.ownership)
579 }
580
581 pub async fn change_ownership(
582 &mut self,
583 chain_id: Option<ChainId>,
584 ownership_config: ChainOwnershipConfig,
585 ) -> Result<(), Error> {
586 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
587 let chain_client = self.make_chain_client(chain_id).await?;
588 info!(
589 ?ownership_config, %chain_id, preferred_owner=?chain_client.preferred_owner(),
590 "Changing ownership of a chain"
591 );
592 let time_start = Instant::now();
593 let mut ownership = chain_client.query_chain_ownership().await?;
594 ownership_config.update(&mut ownership)?;
595
596 if ownership.super_owners.is_empty() && ownership.owners.is_empty() {
597 tracing::error!("At least one owner or super owner of the chain has to be set.");
598 return Err(error::Inner::ChainOwnership.into());
599 }
600
601 let certificate = self
602 .apply_client_command(&chain_client, |chain_client| {
603 let ownership = ownership.clone();
604 let chain_client = chain_client.clone();
605 async move {
606 chain_client
607 .change_ownership(ownership)
608 .await
609 .map_err(Error::from)
610 .context("Failed to change ownership")
611 }
612 })
613 .await?;
614 let time_total = time_start.elapsed();
615 info!("Operation confirmed after {} ms", time_total.as_millis());
616 debug!("{:?}", certificate);
617 Ok(())
618 }
619
620 pub async fn set_preferred_owner(
621 &mut self,
622 chain_id: Option<ChainId>,
623 preferred_owner: AccountOwner,
624 ) -> Result<(), Error> {
625 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
626 let mut chain_client = self.make_chain_client(chain_id).await?;
627 let old_owner = chain_client.preferred_owner();
628 info!(%chain_id, ?old_owner, %preferred_owner, "Changing preferred owner for chain");
629 chain_client.set_preferred_owner(preferred_owner);
630 self.update_wallet_from_client(&chain_client).await?;
631 info!("New preferred owner set");
632 Ok(())
633 }
634
635 pub async fn check_compatible_version_info(
636 &self,
637 address: &str,
638 node: &impl ValidatorNode,
639 ) -> Result<VersionInfo, Error> {
640 match node.get_version_info().await {
641 Ok(version_info) if version_info.is_compatible_with(&linera_version::VERSION_INFO) => {
642 debug!(
643 "Version information for validator {address}: {}",
644 version_info
645 );
646 Ok(version_info)
647 }
648 Ok(version_info) => Err(error::Inner::UnexpectedVersionInfo {
649 remote: Box::new(version_info),
650 local: Box::new(linera_version::VERSION_INFO.clone()),
651 }
652 .into()),
653 Err(error) => Err(error::Inner::UnavailableVersionInfo {
654 address: address.to_string(),
655 error: Box::new(error),
656 }
657 .into()),
658 }
659 }
660
661 pub async fn check_matching_network_description(
662 &self,
663 address: &str,
664 node: &impl ValidatorNode,
665 ) -> Result<CryptoHash, Error> {
666 let network_description = self.genesis_config.network_description();
667 match node.get_network_description().await {
668 Ok(description) => {
669 if description == network_description {
670 Ok(description.genesis_config_hash)
671 } else {
672 Err(error::Inner::UnexpectedNetworkDescription {
673 remote: Box::new(description),
674 local: Box::new(network_description),
675 }
676 .into())
677 }
678 }
679 Err(error) => Err(error::Inner::UnavailableNetworkDescription {
680 address: address.to_string(),
681 error: Box::new(error),
682 }
683 .into()),
684 }
685 }
686
687 pub async fn check_validator_chain_info_response(
688 &self,
689 public_key: Option<&ValidatorPublicKey>,
690 address: &str,
691 node: &impl ValidatorNode,
692 chain_id: ChainId,
693 ) -> Result<ChainInfo, Error> {
694 let query = ChainInfoQuery::new(chain_id).with_manager_values();
695 match node.handle_chain_info_query(query).await {
696 Ok(response) => {
697 debug!(
698 "Validator {address} sees chain {chain_id} at block height {} and epoch {:?}",
699 response.info.next_block_height, response.info.epoch,
700 );
701 if let Some(public_key) = public_key {
702 if response.check(*public_key).is_ok() {
703 debug!("Signature for public key {public_key} is OK.");
704 } else {
705 return Err(error::Inner::InvalidSignature {
706 public_key: *public_key,
707 }
708 .into());
709 }
710 } else {
711 warn!("Not checking signature as public key was not given");
712 }
713 Ok(*response.info)
714 }
715 Err(error) => Err(error::Inner::UnavailableChainInfo {
716 address: address.to_string(),
717 chain_id,
718 error: Box::new(error),
719 }
720 .into()),
721 }
722 }
723
724 pub async fn query_validator(
728 &self,
729 address: &str,
730 node: &impl ValidatorNode,
731 chain_id: ChainId,
732 public_key: Option<&ValidatorPublicKey>,
733 ) -> ValidatorQueryResults {
734 let version_info = self.check_compatible_version_info(address, node).await;
735 let genesis_config_hash = self.check_matching_network_description(address, node).await;
736 let chain_info = self
737 .check_validator_chain_info_response(public_key, address, node, chain_id)
738 .await;
739
740 ValidatorQueryResults {
741 version_info,
742 genesis_config_hash,
743 chain_info,
744 }
745 }
746
747 pub async fn query_local_node(
751 &self,
752 chain_id: ChainId,
753 ) -> Result<ValidatorQueryResults, Error> {
754 let version_info = Ok(linera_version::VERSION_INFO.clone());
755 let genesis_config_hash = Ok(self
756 .genesis_config
757 .network_description()
758 .genesis_config_hash);
759 let chain_info = self
760 .make_chain_client(chain_id)
761 .await?
762 .chain_info_with_manager_values()
763 .await
764 .map(|info| *info)
765 .map_err(|e| e.into());
766
767 Ok(ValidatorQueryResults {
768 version_info,
769 genesis_config_hash,
770 chain_info,
771 })
772 }
773}
774
775#[cfg(feature = "fs")]
776impl<Env: Environment> ClientContext<Env> {
777 pub async fn publish_module(
778 &mut self,
779 chain_client: &ChainClient<Env>,
780 contract: PathBuf,
781 service: PathBuf,
782 vm_runtime: VmRuntime,
783 ) -> Result<ModuleId, Error> {
784 info!("Loading bytecode files");
785 let contract_bytecode = Bytecode::load_from_file(&contract)
786 .await
787 .with_context(|| format!("failed to load contract bytecode from {:?}", &contract))?;
788 let service_bytecode = Bytecode::load_from_file(&service)
789 .await
790 .with_context(|| format!("failed to load service bytecode from {:?}", &service))?;
791
792 info!("Publishing module");
793 let (blobs, module_id) =
794 create_bytecode_blobs(contract_bytecode, service_bytecode, vm_runtime).await;
795 let (module_id, _) = self
796 .apply_client_command(chain_client, |chain_client| {
797 let blobs = blobs.clone();
798 let chain_client = chain_client.clone();
799 async move {
800 chain_client
801 .publish_module_blobs(blobs, module_id)
802 .await
803 .context("Failed to publish module")
804 }
805 })
806 .await?;
807
808 info!("{}", "Module published successfully!");
809
810 info!("Synchronizing client and processing inbox");
811 self.process_inbox(chain_client).await?;
812 Ok(module_id)
813 }
814
815 pub async fn publish_data_blob(
816 &mut self,
817 chain_client: &ChainClient<Env>,
818 blob_path: PathBuf,
819 ) -> Result<CryptoHash, Error> {
820 info!("Loading data blob file");
821 let blob_bytes = fs::read(&blob_path).context(format!(
822 "failed to load data blob bytes from {:?}",
823 &blob_path
824 ))?;
825
826 info!("Publishing data blob");
827 self.apply_client_command(chain_client, |chain_client| {
828 let blob_bytes = blob_bytes.clone();
829 let chain_client = chain_client.clone();
830 async move {
831 chain_client
832 .publish_data_blob(blob_bytes)
833 .await
834 .context("Failed to publish data blob")
835 }
836 })
837 .await?;
838
839 info!("{}", "Data blob published successfully!");
840 Ok(CryptoHash::new(&BlobContent::new_data(blob_bytes)))
841 }
842
843 pub async fn read_data_blob(
845 &mut self,
846 chain_client: &ChainClient<Env>,
847 hash: CryptoHash,
848 ) -> Result<(), Error> {
849 info!("Verifying data blob");
850 self.apply_client_command(chain_client, |chain_client| {
851 let chain_client = chain_client.clone();
852 async move {
853 chain_client
854 .read_data_blob(hash)
855 .await
856 .context("Failed to verify data blob")
857 }
858 })
859 .await?;
860
861 info!("{}", "Data blob verified successfully!");
862 Ok(())
863 }
864}
865
866#[cfg(not(web))]
867impl<Env: Environment> ClientContext<Env> {
868 pub async fn prepare_for_benchmark(
869 &mut self,
870 num_chains: usize,
871 tokens_per_chain: Amount,
872 fungible_application_id: Option<ApplicationId>,
873 pub_keys: Vec<AccountPublicKey>,
874 chains_config_path: Option<&Path>,
875 ) -> Result<(Vec<ChainClient<Env>>, Vec<ChainId>), Error> {
876 let start = Instant::now();
877 self.process_inboxes_and_force_validator_updates().await;
881 info!(
882 "Processed inboxes and forced validator updates in {} ms",
883 start.elapsed().as_millis()
884 );
885
886 let start = Instant::now();
887 let (benchmark_chains, chain_clients) = self
888 .make_benchmark_chains(
889 num_chains,
890 tokens_per_chain,
891 pub_keys,
892 chains_config_path.is_some(),
893 )
894 .await?;
895 info!(
896 "Got {} chains in {} ms",
897 num_chains,
898 start.elapsed().as_millis()
899 );
900
901 if let Some(id) = fungible_application_id {
902 let start = Instant::now();
903 self.supply_fungible_tokens(&benchmark_chains, id).await?;
904 info!(
905 "Supplied fungible tokens in {} ms",
906 start.elapsed().as_millis()
907 );
908 let start = Instant::now();
910 for chain_client in &chain_clients {
911 chain_client.process_inbox().await?;
912 }
913 info!(
914 "Processed inboxes after supplying fungible tokens in {} ms",
915 start.elapsed().as_millis()
916 );
917 }
918
919 let all_chains = Benchmark::<Env>::get_all_chains(chains_config_path, &benchmark_chains)?;
920 let known_chain_ids: HashSet<_> = benchmark_chains.iter().map(|(id, _)| *id).collect();
921 let unknown_chain_ids: Vec<_> = all_chains
922 .iter()
923 .filter(|id| !known_chain_ids.contains(id))
924 .copied()
925 .collect();
926 if !unknown_chain_ids.is_empty() {
927 for chain_id in &unknown_chain_ids {
931 self.client.get_chain_description(*chain_id).await?;
932 }
933 }
934
935 Ok((chain_clients, all_chains))
936 }
937
938 pub async fn wrap_up_benchmark(
939 &mut self,
940 chain_clients: Vec<ChainClient<Env>>,
941 close_chains: bool,
942 wrap_up_max_in_flight: usize,
943 ) -> Result<(), Error> {
944 if close_chains {
945 info!("Closing chains...");
946 let stream = stream::iter(chain_clients)
947 .map(|chain_client| async move {
948 Benchmark::<Env>::close_benchmark_chain(&chain_client).await?;
949 info!("Closed chain {:?}", chain_client.chain_id());
950 Ok::<(), BenchmarkError>(())
951 })
952 .buffer_unordered(wrap_up_max_in_flight);
953 stream.try_collect::<Vec<_>>().await?;
954 } else {
955 info!("Processing inbox for all chains...");
956 let stream = stream::iter(chain_clients.clone())
957 .map(|chain_client| async move {
958 chain_client.process_inbox().await?;
959 info!("Processed inbox for chain {:?}", chain_client.chain_id());
960 Ok::<(), chain_client::Error>(())
961 })
962 .buffer_unordered(wrap_up_max_in_flight);
963 stream.try_collect::<Vec<_>>().await?;
964
965 info!("Updating wallet from chain clients...");
966 for chain_client in chain_clients {
967 let info = chain_client.chain_info().await?;
968 let client_owner = chain_client.preferred_owner();
969 let pending_proposal = chain_client.pending_proposal().clone();
970 self.wallet()
971 .insert(
972 info.chain_id,
973 wallet::Chain {
974 pending_proposal,
975 owner: client_owner,
976 ..info.as_ref().into()
977 },
978 )
979 .await
980 .map_err(error::Inner::wallet)?;
981 }
982 }
983
984 Ok(())
985 }
986
987 async fn process_inboxes_and_force_validator_updates(&mut self) {
988 let mut join_set = task::JoinSet::new();
989
990 let chain_clients: Vec<_> = self
991 .wallet()
992 .owned_chain_ids()
993 .map_err(|e| error::Inner::wallet(e).into())
994 .and_then(|id| self.make_chain_client(id))
995 .try_collect()
996 .await
997 .unwrap();
998
999 for chain_client in chain_clients {
1000 join_set.spawn(async move {
1001 Self::process_inbox_without_updating_wallet(&chain_client)
1002 .await
1003 .expect("Processing inbox should not fail!");
1004 chain_client
1005 });
1006 }
1007
1008 for chain_client in join_set.join_all().await {
1009 self.update_wallet_from_client(&chain_client).await.unwrap();
1010 }
1011 }
1012
1013 async fn process_inbox_without_updating_wallet(
1014 chain_client: &ChainClient<Env>,
1015 ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
1016 chain_client.synchronize_from_validators().await?;
1018 let (certificates, maybe_timeout) = chain_client.process_inbox_without_prepare().await?;
1019 assert!(
1020 maybe_timeout.is_none(),
1021 "Should not timeout within benchmark!"
1022 );
1023
1024 Ok(certificates)
1025 }
1026
1027 async fn make_benchmark_chains(
1030 &mut self,
1031 num_chains: usize,
1032 balance: Amount,
1033 pub_keys: Vec<AccountPublicKey>,
1034 wallet_only: bool,
1035 ) -> Result<(Vec<(ChainId, AccountOwner)>, Vec<ChainClient<Env>>), Error> {
1036 let mut chains_found_in_wallet = 0;
1037 let mut benchmark_chains = Vec::with_capacity(num_chains);
1038 let mut chain_clients = Vec::with_capacity(num_chains);
1039 let start = Instant::now();
1040 let mut owned_chain_ids = std::pin::pin!(self.wallet().owned_chain_ids());
1041 while let Some(chain_id) = owned_chain_ids.next().await {
1042 let chain_id = chain_id.map_err(error::Inner::wallet)?;
1043 if chains_found_in_wallet == num_chains {
1044 break;
1045 }
1046 let chain_client = self.make_chain_client(chain_id).await?;
1047 let ownership = chain_client.chain_info().await?.manager.ownership;
1048 if !ownership.owners.is_empty() || ownership.super_owners.len() != 1 {
1049 continue;
1050 }
1051 chain_client.process_inbox().await?;
1052 benchmark_chains.push((
1053 chain_id,
1054 *ownership
1055 .super_owners
1056 .first()
1057 .expect("should have a super owner"),
1058 ));
1059 chain_clients.push(chain_client);
1060 chains_found_in_wallet += 1;
1061 }
1062 info!(
1063 "Got {} chains from the wallet in {} ms",
1064 benchmark_chains.len(),
1065 start.elapsed().as_millis()
1066 );
1067
1068 let num_chains_to_create = num_chains - chains_found_in_wallet;
1069
1070 let default_chain_client = self.make_chain_client(self.default_chain()).await?;
1071
1072 if num_chains_to_create > 0 {
1073 if wallet_only {
1074 return Err(
1075 error::Inner::Benchmark(BenchmarkError::NotEnoughChainsInWallet(
1076 num_chains,
1077 chains_found_in_wallet,
1078 ))
1079 .into(),
1080 );
1081 }
1082 let mut pub_keys_iter = pub_keys.into_iter().take(num_chains_to_create);
1083 let operations_per_block = 900; for i in (0..num_chains_to_create).step_by(operations_per_block) {
1085 let num_new_chains = operations_per_block.min(num_chains_to_create - i);
1086 let pub_key = pub_keys_iter.next().unwrap();
1087 let owner = pub_key.into();
1088
1089 let certificate = Self::execute_open_chains_operations(
1090 num_new_chains,
1091 &default_chain_client,
1092 balance,
1093 owner,
1094 )
1095 .await?;
1096 info!("Block executed successfully");
1097
1098 let block = certificate.block();
1099 for i in 0..num_new_chains {
1100 let chain_id = block.body.blobs[i]
1101 .iter()
1102 .find(|blob| blob.id().blob_type == BlobType::ChainDescription)
1103 .map(|blob| ChainId(blob.id().hash))
1104 .expect("failed to create a new chain");
1105 self.client
1106 .extend_chain_mode(chain_id, ListeningMode::FullChain);
1107
1108 let mut chain_client = self.client.create_chain_client(
1109 chain_id,
1110 None,
1111 BlockHeight::ZERO,
1112 None,
1113 Some(owner),
1114 self.timing_sender(),
1115 false,
1116 );
1117 chain_client.set_preferred_owner(owner);
1118 chain_client.process_inbox().await?;
1119 benchmark_chains.push((chain_id, owner));
1120 chain_clients.push(chain_client);
1121 }
1122 }
1123
1124 info!(
1125 "Created {} chains in {} ms",
1126 num_chains_to_create,
1127 start.elapsed().as_millis()
1128 );
1129 }
1130
1131 info!("Updating wallet from client");
1132 self.update_wallet_from_client(&default_chain_client)
1133 .await?;
1134 info!("Retrying pending outgoing messages");
1135 default_chain_client
1136 .retry_pending_outgoing_messages()
1137 .await
1138 .context("outgoing messages to create the new chains should be delivered")?;
1139 info!("Processing default chain inbox");
1140 default_chain_client.process_inbox().await?;
1141
1142 assert_eq!(
1143 benchmark_chains.len(),
1144 chain_clients.len(),
1145 "benchmark_chains and chain_clients must have the same size"
1146 );
1147
1148 Ok((benchmark_chains, chain_clients))
1149 }
1150
1151 async fn execute_open_chains_operations(
1152 num_new_chains: usize,
1153 chain_client: &ChainClient<Env>,
1154 balance: Amount,
1155 owner: AccountOwner,
1156 ) -> Result<ConfirmedBlockCertificate, Error> {
1157 let config = OpenChainConfig {
1158 ownership: ChainOwnership::single_super(owner),
1159 balance,
1160 application_permissions: Default::default(),
1161 };
1162 let operations = iter::repeat_n(
1163 Operation::system(SystemOperation::OpenChain(config)),
1164 num_new_chains,
1165 )
1166 .collect();
1167 info!("Executing {} OpenChain operations", num_new_chains);
1168 Ok(chain_client
1169 .execute_operations(operations, vec![])
1170 .await?
1171 .expect("should execute block with OpenChain operations"))
1172 }
1173
1174 async fn supply_fungible_tokens(
1176 &mut self,
1177 key_pairs: &[(ChainId, AccountOwner)],
1178 application_id: ApplicationId,
1179 ) -> Result<(), Error> {
1180 let default_chain_id = self.default_chain();
1181 let default_key = self
1182 .wallet()
1183 .get(default_chain_id)
1184 .await
1185 .unwrap()
1186 .unwrap()
1187 .owner
1188 .unwrap();
1189 let amount = Amount::from_nanos(4);
1191 let operations: Vec<Operation> = key_pairs
1192 .iter()
1193 .map(|(chain_id, owner)| {
1194 Benchmark::<Env>::fungible_transfer(
1195 application_id,
1196 *chain_id,
1197 default_key,
1198 *owner,
1199 amount,
1200 )
1201 })
1202 .collect();
1203 let chain_client = self.make_chain_client(default_chain_id).await?;
1204 for operation_chunk in operations.chunks(1000) {
1206 chain_client
1207 .execute_operations(operation_chunk.to_vec(), vec![])
1208 .await?
1209 .expect("should execute block with Transfer operations");
1210 }
1211 self.update_wallet_from_client(&chain_client).await?;
1212
1213 Ok(())
1214 }
1215}