1use std::sync::Arc;
5
6use futures::{Future, StreamExt as _, TryStreamExt as _};
7use linera_base::{
8 crypto::{CryptoHash, ValidatorPublicKey},
9 data_types::{Epoch, Timestamp},
10 identifiers::{Account, AccountOwner, ChainId},
11 ownership::ChainOwnership,
12 time::{Duration, Instant},
13 util::future::FutureSyncExt as _,
14};
15use linera_chain::{manager::LockingBlock, types::ConfirmedBlockCertificate};
16use linera_core::{
17 client::{chain_client, ChainClient, Client, ListeningMode},
18 data_types::{ChainInfo, ChainInfoQuery, ClientOutcome},
19 join_set_ext::JoinSet,
20 node::ValidatorNode,
21 wallet, Environment, JoinSetExt as _, Wallet as _,
22};
23use linera_rpc::node_provider::{NodeOptions, NodeProvider};
24use linera_version::VersionInfo;
25use thiserror_context::Context;
26use tracing::{debug, info, warn};
27#[cfg(not(web))]
28use {
29 crate::{
30 benchmark::{fungible_transfer, Benchmark, BenchmarkError},
31 client_metrics::ClientMetrics,
32 },
33 futures::stream,
34 linera_base::{
35 crypto::AccountPublicKey,
36 data_types::{Amount, BlockHeight},
37 identifiers::{ApplicationId, BlobType},
38 },
39 linera_execution::{
40 system::{OpenChainConfig, SystemOperation},
41 Operation,
42 },
43 std::{collections::HashSet, path::Path},
44 tokio::{sync::mpsc, task},
45};
46#[cfg(feature = "fs")]
47use {
48 linera_base::{
49 data_types::{BlobContent, Bytecode},
50 identifiers::ModuleId,
51 vm::VmRuntime,
52 },
53 linera_core::client::create_bytecode_blobs,
54 std::{fs, path::PathBuf},
55};
56
57use crate::{
58 chain_listener::{self, ClientContext as _},
59 client_options::{ChainOwnershipConfig, Options},
60 config::GenesisConfig,
61 error, util, Error,
62};
63
64pub struct ValidatorQueryResults {
66 pub version_info: Result<VersionInfo, Error>,
68 pub genesis_config_hash: Result<CryptoHash, Error>,
70 pub chain_info: Result<ChainInfo, Error>,
72}
73
74impl ValidatorQueryResults {
75 pub fn errors(&self) -> Vec<&Error> {
77 let mut errors = Vec::new();
78 if let Err(e) = &self.version_info {
79 errors.push(e);
80 }
81 if let Err(e) = &self.genesis_config_hash {
82 errors.push(e);
83 }
84 if let Err(e) = &self.chain_info {
85 errors.push(e);
86 }
87 errors
88 }
89
90 pub fn print(
95 &self,
96 public_key: Option<&ValidatorPublicKey>,
97 address: Option<&str>,
98 weight: Option<u64>,
99 reference: Option<&ValidatorQueryResults>,
100 ) {
101 if let Some(key) = public_key {
102 println!("Public key: {}", key);
103 }
104 if let Some(address) = address {
105 println!("Address: {}", address);
106 }
107 if let Some(w) = weight {
108 println!("Weight: {}", w);
109 }
110
111 let ref_version = reference.and_then(|ref_results| ref_results.version_info.as_ref().ok());
112 match &self.version_info {
113 Ok(version_info) => {
114 if ref_version.is_none_or(|ref_v| ref_v.crate_version != version_info.crate_version)
115 {
116 println!("Linera protocol: v{}", version_info.crate_version);
117 }
118 if ref_version.is_none_or(|ref_v| ref_v.rpc_hash != version_info.rpc_hash) {
119 println!("RPC API hash: {}", version_info.rpc_hash);
120 }
121 if ref_version.is_none_or(|ref_v| ref_v.graphql_hash != version_info.graphql_hash) {
122 println!("GraphQL API hash: {}", version_info.graphql_hash);
123 }
124 if ref_version.is_none_or(|ref_v| ref_v.wit_hash != version_info.wit_hash) {
125 println!("WIT API hash: {}", version_info.wit_hash);
126 }
127 if ref_version.is_none_or(|ref_v| {
128 (&ref_v.git_commit, ref_v.git_dirty)
129 != (&version_info.git_commit, version_info.git_dirty)
130 }) {
131 println!(
132 "Source code: {}/tree/{}{}",
133 env!("CARGO_PKG_REPOSITORY"),
134 version_info.git_commit,
135 if version_info.git_dirty {
136 " (dirty)"
137 } else {
138 ""
139 }
140 );
141 }
142 }
143 Err(err) => println!("Error getting version info: {err}"),
144 }
145
146 let ref_genesis_hash =
147 reference.and_then(|ref_results| ref_results.genesis_config_hash.as_ref().ok());
148 match &self.genesis_config_hash {
149 Ok(hash) if ref_genesis_hash.is_some_and(|ref_hash| ref_hash == hash) => {}
150 Ok(hash) => println!("Genesis config hash: {hash}"),
151 Err(err) => println!("Error getting genesis config: {err}"),
152 }
153
154 let ref_info = reference.and_then(|ref_results| ref_results.chain_info.as_ref().ok());
155 match &self.chain_info {
156 Ok(info) => {
157 if ref_info.is_none_or(|ref_info| info.block_hash != ref_info.block_hash) {
158 if let Some(hash) = info.block_hash {
159 println!("Block hash: {}", hash);
160 } else {
161 println!("Block hash: None");
162 }
163 }
164 if ref_info
165 .is_none_or(|ref_info| info.next_block_height != ref_info.next_block_height)
166 {
167 println!("Next height: {}", info.next_block_height);
168 }
169 if ref_info.is_none_or(|ref_info| info.timestamp != ref_info.timestamp) {
170 println!("Timestamp: {}", info.timestamp);
171 }
172 if ref_info.is_none_or(|ref_info| info.epoch != ref_info.epoch) {
173 println!("Epoch: {}", info.epoch);
174 }
175 if ref_info.is_none_or(|ref_info| {
176 info.manager.current_round != ref_info.manager.current_round
177 }) {
178 println!("Round: {}", info.manager.current_round);
179 }
180 if let Some(locking) = &info.manager.requested_locking {
181 match &**locking {
182 LockingBlock::Fast(proposal) => {
183 println!(
184 "Locking fast block from {}",
185 proposal.content.block.timestamp
186 );
187 }
188 LockingBlock::Regular(validated) => {
189 println!(
190 "Locking block {} in {} from {}",
191 validated.hash(),
192 validated.round,
193 validated.block().header.timestamp
194 );
195 }
196 }
197 }
198 }
199 Err(err) => println!("Error getting chain info: {err}"),
200 }
201 }
202}
203
204pub struct ClientContext<Env: Environment> {
205 pub client: Arc<Client<Env>>,
206 pub genesis_config: crate::config::GenesisConfig,
208 pub send_timeout: Duration,
209 pub recv_timeout: Duration,
210 pub retry_delay: Duration,
211 pub max_retries: u32,
212 pub max_backoff: Duration,
213 pub chain_listeners: JoinSet,
214 pub default_chain: Option<ChainId>,
216 #[cfg(not(web))]
217 pub client_metrics: Option<ClientMetrics>,
218}
219
220impl<Env: Environment> chain_listener::ClientContext for ClientContext<Env> {
221 type Environment = Env;
222
223 fn wallet(&self) -> &Env::Wallet {
224 self.client.wallet()
225 }
226
227 fn storage(&self) -> &Env::Storage {
228 self.client.storage_client()
229 }
230
231 fn client(&self) -> &Arc<Client<Env>> {
232 &self.client
233 }
234
235 #[cfg(not(web))]
236 fn timing_sender(
237 &self,
238 ) -> Option<mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
239 self.client_metrics
240 .as_ref()
241 .map(|metrics| metrics.timing_sender.clone())
242 }
243
244 async fn update_wallet_for_new_chain(
245 &mut self,
246 chain_id: ChainId,
247 owner: Option<AccountOwner>,
248 timestamp: Timestamp,
249 epoch: Epoch,
250 ) -> Result<(), Error> {
251 self.update_wallet_for_new_chain(chain_id, owner, timestamp, epoch)
252 .make_sync()
253 .await
254 }
255
256 async fn update_wallet(&mut self, client: &ChainClient<Env>) -> Result<(), Error> {
257 self.update_wallet_from_client(client).make_sync().await
258 }
259}
260
261impl<S, Si, W> ClientContext<linera_core::environment::Impl<S, NodeProvider, Si, W>>
262where
263 S: linera_core::environment::Storage,
264 Si: linera_core::environment::Signer,
265 W: linera_core::environment::Wallet,
266{
267 #[allow(clippy::too_many_arguments)]
271 pub async fn new(
272 storage: S,
273 wallet: W,
274 signer: Si,
275 options: &Options,
276 default_chain: Option<ChainId>,
277 genesis_config: GenesisConfig,
278 block_cache_size: usize,
279 execution_state_cache_size: usize,
280 ) -> Result<Self, Error> {
281 #[cfg(not(web))]
282 let timing_config = options.to_timing_config();
283 let node_provider = NodeProvider::new(NodeOptions {
284 send_timeout: options.send_timeout,
285 recv_timeout: options.recv_timeout,
286 retry_delay: options.retry_delay,
287 max_retries: options.max_retries,
288 max_backoff: options.max_backoff,
289 });
290 let chain_modes: Vec<_> = wallet
291 .items()
292 .map_ok(|(id, chain)| {
293 let mode = if chain.is_follow_only() {
294 ListeningMode::FollowChain
295 } else {
296 ListeningMode::FullChain
297 };
298 (id, mode)
299 })
300 .try_collect()
301 .await
302 .map_err(error::Inner::wallet)?;
303 let name = match chain_modes.len() {
304 0 => "Client node".to_string(),
305 1 => format!("Client node for {:.8}", chain_modes[0].0),
306 n => format!(
307 "Client node for {:.8} and {} others",
308 chain_modes[0].0,
309 n - 1
310 ),
311 };
312
313 let client = Client::new(
314 linera_core::environment::Impl {
315 network: node_provider,
316 storage,
317 signer,
318 wallet,
319 },
320 genesis_config.admin_chain_id(),
321 options.long_lived_services,
322 chain_modes,
323 name,
324 util::non_zero_duration(options.chain_worker_ttl),
325 util::non_zero_duration(options.sender_chain_worker_ttl),
326 options.to_chain_client_options(),
327 block_cache_size,
328 execution_state_cache_size,
329 &options.to_requests_scheduler_config(),
330 );
331
332 #[cfg(not(web))]
333 let client_metrics = if timing_config.enabled {
334 Some(ClientMetrics::new(timing_config))
335 } else {
336 None
337 };
338
339 Ok(ClientContext {
340 client: Arc::new(client),
341 default_chain,
342 genesis_config,
343 send_timeout: options.send_timeout,
344 recv_timeout: options.recv_timeout,
345 retry_delay: options.retry_delay,
346 max_retries: options.max_retries,
347 max_backoff: options.max_backoff,
348 chain_listeners: JoinSet::default(),
349 #[cfg(not(web))]
350 client_metrics,
351 })
352 }
353}
354
355impl<Env: Environment> ClientContext<Env> {
356 pub fn wallet(&self) -> &Env::Wallet {
360 self.client.wallet()
361 }
362
363 pub fn admin_chain_id(&self) -> ChainId {
365 self.client.admin_chain_id()
366 }
367
368 pub fn default_account(&self) -> Account {
371 Account::chain(self.default_chain())
372 }
373
374 pub fn default_chain(&self) -> ChainId {
376 self.default_chain
377 .expect("default chain requested but none set")
378 }
379
380 pub async fn first_non_admin_chain(&self) -> Result<ChainId, Error> {
381 let admin_chain_id = self.admin_chain_id();
382 std::pin::pin!(self
383 .wallet()
384 .chain_ids()
385 .try_filter(|chain_id| futures::future::ready(*chain_id != admin_chain_id)))
386 .next()
387 .await
388 .expect("No non-admin chain specified in wallet with no non-admin chain")
389 .map_err(Error::wallet)
390 }
391
392 pub fn make_node_provider(&self) -> NodeProvider {
394 NodeProvider::new(self.make_node_options())
395 }
396
397 fn make_node_options(&self) -> NodeOptions {
398 NodeOptions {
399 send_timeout: self.send_timeout,
400 recv_timeout: self.recv_timeout,
401 retry_delay: self.retry_delay,
402 max_retries: self.max_retries,
403 max_backoff: self.max_backoff,
404 }
405 }
406
407 #[cfg(not(web))]
408 pub fn client_metrics(&self) -> Option<&ClientMetrics> {
409 self.client_metrics.as_ref()
410 }
411
412 pub async fn update_wallet_from_client<Env_: Environment>(
413 &self,
414 client: &ChainClient<Env_>,
415 ) -> Result<(), Error> {
416 let info = client.chain_info().await?;
417 let chain_id = info.chain_id;
418 let existing_owner = self
419 .wallet()
420 .get(chain_id)
421 .await
422 .map_err(error::Inner::wallet)?
423 .and_then(|chain| chain.owner);
424
425 let new_chain = wallet::Chain {
426 pending_proposal: client.pending_proposal().await,
427 owner: existing_owner,
428 ..info.as_ref().into()
429 };
430
431 self.wallet()
432 .insert(chain_id, new_chain)
433 .await
434 .map_err(error::Inner::wallet)?;
435
436 Ok(())
437 }
438
439 pub async fn update_wallet_for_new_chain(
441 &mut self,
442 chain_id: ChainId,
443 owner: Option<AccountOwner>,
444 timestamp: Timestamp,
445 epoch: Epoch,
446 ) -> Result<(), Error> {
447 self.wallet()
448 .try_insert(
449 chain_id,
450 linera_core::wallet::Chain::new(owner, epoch, timestamp),
451 )
452 .await
453 .map_err(error::Inner::wallet)?;
454 Ok(())
455 }
456
457 pub async fn process_inbox(
458 &mut self,
459 chain_client: &ChainClient<Env>,
460 ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
461 let mut certificates = Vec::new();
462 let (new_certificates, maybe_timeout) = {
464 chain_client.synchronize_from_validators().await?;
465 let result = chain_client.process_inbox_without_prepare().await;
466 self.update_wallet_from_client(chain_client).await?;
467 result?
468 };
469 certificates.extend(new_certificates);
470 if maybe_timeout.is_none() {
471 return Ok(certificates);
472 }
473
474 let (listener, _listen_handle, mut notification_stream) = chain_client.listen().await?;
476 self.chain_listeners.spawn_task(listener);
477
478 loop {
479 let (new_certificates, maybe_timeout) = {
480 let result = chain_client.process_inbox().await;
481 self.update_wallet_from_client(chain_client).await?;
482 result?
483 };
484 certificates.extend(new_certificates);
485 if let Some(timestamp) = maybe_timeout {
486 util::wait_for_next_round(&mut notification_stream, timestamp).await
487 } else {
488 return Ok(certificates);
489 }
490 }
491 }
492
493 pub async fn assign_new_chain_to_key(
494 &mut self,
495 chain_id: ChainId,
496 owner: AccountOwner,
497 ) -> Result<(), Error> {
498 self.client
499 .extend_chain_mode(chain_id, ListeningMode::FullChain);
500 let client = self.make_chain_client(chain_id).await?;
501 let info = client.prepare_for_owner(owner).await.map_err(|error| {
502 tracing::error!(%chain_id, %owner, %error, "Chain is not owned");
503 error::Inner::ChainOwnership
504 })?;
505
506 let modified = self
508 .wallet()
509 .modify(chain_id, |chain| chain.owner = Some(owner))
510 .await
511 .map_err(error::Inner::wallet)?;
512 if modified.is_none() {
514 self.wallet()
515 .insert(
516 chain_id,
517 wallet::Chain {
518 owner: Some(owner),
519 timestamp: info.timestamp,
520 epoch: Some(info.epoch),
521 ..Default::default()
522 },
523 )
524 .await
525 .map_err(error::Inner::wallet)
526 .context("assigning new chain")?;
527 }
528 Ok(())
529 }
530
531 pub async fn apply_client_command<E, F, Fut, T>(
536 &mut self,
537 client: &ChainClient<Env>,
538 mut f: F,
539 ) -> Result<T, Error>
540 where
541 F: FnMut(&ChainClient<Env>) -> Fut,
542 Fut: Future<Output = Result<ClientOutcome<T>, E>>,
543 Error: From<E>,
544 {
545 client.prepare_chain().await?;
546 let result = f(client).await;
548 self.update_wallet_from_client(client).await?;
549 match result? {
550 ClientOutcome::Committed(t) => return Ok(t),
551 ClientOutcome::Conflict(certificate) => {
552 return Err(chain_client::Error::Conflict(certificate.hash()).into());
553 }
554 ClientOutcome::WaitForTimeout(_) => {}
555 }
556
557 let (listener, _listen_handle, mut notification_stream) = client.listen().await?;
559 self.chain_listeners.spawn_task(listener);
560
561 loop {
562 let result = f(client).await;
564 self.update_wallet_from_client(client).await?;
565 let timeout = match result? {
566 ClientOutcome::Committed(t) => return Ok(t),
567 ClientOutcome::Conflict(certificate) => {
568 return Err(chain_client::Error::Conflict(certificate.hash()).into());
569 }
570 ClientOutcome::WaitForTimeout(timeout) => timeout,
571 };
572 util::wait_for_next_round(&mut notification_stream, timeout).await;
574 }
575 }
576
577 pub async fn ownership(&mut self, chain_id: Option<ChainId>) -> Result<ChainOwnership, Error> {
578 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
579 let client = self.make_chain_client(chain_id).await?;
580 let info = client.chain_info().await?;
581 Ok(info.manager.ownership)
582 }
583
584 pub async fn change_ownership(
585 &mut self,
586 chain_id: Option<ChainId>,
587 ownership_config: ChainOwnershipConfig,
588 ) -> Result<(), Error> {
589 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
590 let chain_client = self.make_chain_client(chain_id).await?;
591 info!(
592 ?ownership_config, %chain_id, preferred_owner=?chain_client.preferred_owner(),
593 "Changing ownership of a chain"
594 );
595 let time_start = Instant::now();
596 let mut ownership = chain_client.query_chain_ownership().await?;
597 ownership_config.update(&mut ownership)?;
598
599 if ownership.super_owners.is_empty() && ownership.owners.is_empty() {
600 tracing::error!("At least one owner or super owner of the chain has to be set.");
601 return Err(error::Inner::ChainOwnership.into());
602 }
603
604 let certificate = self
605 .apply_client_command(&chain_client, |chain_client| {
606 let ownership = ownership.clone();
607 let chain_client = chain_client.clone();
608 async move {
609 chain_client
610 .change_ownership(ownership)
611 .await
612 .map_err(Error::from)
613 .context("Failed to change ownership")
614 }
615 })
616 .await?;
617 let time_total = time_start.elapsed();
618 info!("Operation confirmed after {} ms", time_total.as_millis());
619 debug!("{:?}", certificate);
620 Ok(())
621 }
622
623 pub async fn set_preferred_owner(
624 &mut self,
625 chain_id: Option<ChainId>,
626 preferred_owner: AccountOwner,
627 ) -> Result<(), Error> {
628 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
629 let mut chain_client = self.make_chain_client(chain_id).await?;
630 let old_owner = chain_client.preferred_owner();
631 info!(%chain_id, ?old_owner, %preferred_owner, "Changing preferred owner for chain");
632 chain_client.set_preferred_owner(preferred_owner);
633 self.update_wallet_from_client(&chain_client).await?;
634 info!("New preferred owner set");
635 Ok(())
636 }
637
638 pub async fn check_compatible_version_info(
639 &self,
640 address: &str,
641 node: &impl ValidatorNode,
642 ) -> Result<VersionInfo, Error> {
643 match node.get_version_info().await {
644 Ok(version_info) if version_info.is_compatible_with(&linera_version::VERSION_INFO) => {
645 debug!(
646 "Version information for validator {address}: {}",
647 version_info
648 );
649 Ok(version_info)
650 }
651 Ok(version_info) => Err(error::Inner::UnexpectedVersionInfo {
652 remote: Box::new(version_info),
653 local: Box::new(linera_version::VERSION_INFO.clone()),
654 }
655 .into()),
656 Err(error) => Err(error::Inner::UnavailableVersionInfo {
657 address: address.to_string(),
658 error: Box::new(error),
659 }
660 .into()),
661 }
662 }
663
664 pub async fn check_matching_network_description(
665 &self,
666 address: &str,
667 node: &impl ValidatorNode,
668 ) -> Result<CryptoHash, Error> {
669 let network_description = self.genesis_config.network_description();
670 match node.get_network_description().await {
671 Ok(description) => {
672 if description == network_description {
673 Ok(description.genesis_config_hash)
674 } else {
675 Err(error::Inner::UnexpectedNetworkDescription {
676 remote: Box::new(description),
677 local: Box::new(network_description),
678 }
679 .into())
680 }
681 }
682 Err(error) => Err(error::Inner::UnavailableNetworkDescription {
683 address: address.to_string(),
684 error: Box::new(error),
685 }
686 .into()),
687 }
688 }
689
690 pub async fn check_validator_chain_info_response(
691 &self,
692 public_key: Option<&ValidatorPublicKey>,
693 address: &str,
694 node: &impl ValidatorNode,
695 chain_id: ChainId,
696 ) -> Result<ChainInfo, Error> {
697 let query = ChainInfoQuery::new(chain_id).with_manager_values();
698 match node.handle_chain_info_query(query).await {
699 Ok(response) => {
700 debug!(
701 "Validator {address} sees chain {chain_id} at block height {} and epoch {:?}",
702 response.info.next_block_height, response.info.epoch,
703 );
704 if let Some(public_key) = public_key {
705 if response.check(*public_key).is_ok() {
706 debug!("Signature for public key {public_key} is OK.");
707 } else {
708 return Err(error::Inner::InvalidSignature {
709 public_key: *public_key,
710 }
711 .into());
712 }
713 } else {
714 warn!("Not checking signature as public key was not given");
715 }
716 Ok(*response.info)
717 }
718 Err(error) => Err(error::Inner::UnavailableChainInfo {
719 address: address.to_string(),
720 chain_id,
721 error: Box::new(error),
722 }
723 .into()),
724 }
725 }
726
727 pub async fn query_validator(
731 &self,
732 address: &str,
733 node: &impl ValidatorNode,
734 chain_id: ChainId,
735 public_key: Option<&ValidatorPublicKey>,
736 ) -> ValidatorQueryResults {
737 let version_info = self.check_compatible_version_info(address, node).await;
738 let genesis_config_hash = self.check_matching_network_description(address, node).await;
739 let chain_info = self
740 .check_validator_chain_info_response(public_key, address, node, chain_id)
741 .await;
742
743 ValidatorQueryResults {
744 version_info,
745 genesis_config_hash,
746 chain_info,
747 }
748 }
749
750 pub async fn query_local_node(
754 &self,
755 chain_id: ChainId,
756 ) -> Result<ValidatorQueryResults, Error> {
757 let version_info = Ok(linera_version::VERSION_INFO.clone());
758 let genesis_config_hash = Ok(self
759 .genesis_config
760 .network_description()
761 .genesis_config_hash);
762 let chain_info = self
763 .make_chain_client(chain_id)
764 .await?
765 .chain_info_with_manager_values()
766 .await
767 .map(|info| *info)
768 .map_err(|e| e.into());
769
770 Ok(ValidatorQueryResults {
771 version_info,
772 genesis_config_hash,
773 chain_info,
774 })
775 }
776}
777
778#[cfg(feature = "fs")]
779impl<Env: Environment> ClientContext<Env> {
780 pub async fn publish_module(
781 &mut self,
782 chain_client: &ChainClient<Env>,
783 contract: PathBuf,
784 service: PathBuf,
785 vm_runtime: VmRuntime,
786 ) -> Result<ModuleId, Error> {
787 info!("Loading bytecode files");
788 let contract_bytecode = Bytecode::load_from_file(&contract)
789 .await
790 .with_context(|| format!("failed to load contract bytecode from {:?}", &contract))?;
791 let service_bytecode = Bytecode::load_from_file(&service)
792 .await
793 .with_context(|| format!("failed to load service bytecode from {:?}", &service))?;
794
795 info!("Publishing module");
796 let (blobs, module_id) =
797 create_bytecode_blobs(contract_bytecode, service_bytecode, vm_runtime).await;
798 let (module_id, _) = self
799 .apply_client_command(chain_client, |chain_client| {
800 let blobs = blobs.clone();
801 let chain_client = chain_client.clone();
802 async move {
803 chain_client
804 .publish_module_blobs(blobs, module_id)
805 .await
806 .context("Failed to publish module")
807 }
808 })
809 .await?;
810
811 info!("{}", "Module published successfully!");
812
813 info!("Synchronizing client and processing inbox");
814 self.process_inbox(chain_client).await?;
815 Ok(module_id)
816 }
817
818 pub async fn publish_data_blob(
819 &mut self,
820 chain_client: &ChainClient<Env>,
821 blob_path: PathBuf,
822 ) -> Result<CryptoHash, Error> {
823 info!("Loading data blob file");
824 let blob_bytes = fs::read(&blob_path).context(format!(
825 "failed to load data blob bytes from {:?}",
826 &blob_path
827 ))?;
828
829 info!("Publishing data blob");
830 self.apply_client_command(chain_client, |chain_client| {
831 let blob_bytes = blob_bytes.clone();
832 let chain_client = chain_client.clone();
833 async move {
834 chain_client
835 .publish_data_blob(blob_bytes)
836 .await
837 .context("Failed to publish data blob")
838 }
839 })
840 .await?;
841
842 info!("{}", "Data blob published successfully!");
843 Ok(CryptoHash::new(&BlobContent::new_data(blob_bytes)))
844 }
845
846 pub async fn read_data_blob(
848 &mut self,
849 chain_client: &ChainClient<Env>,
850 hash: CryptoHash,
851 ) -> Result<(), Error> {
852 info!("Verifying data blob");
853 self.apply_client_command(chain_client, |chain_client| {
854 let chain_client = chain_client.clone();
855 async move {
856 chain_client
857 .read_data_blob(hash)
858 .await
859 .context("Failed to verify data blob")
860 }
861 })
862 .await?;
863
864 info!("{}", "Data blob verified successfully!");
865 Ok(())
866 }
867}
868
869#[cfg(not(web))]
870impl<Env: Environment> ClientContext<Env> {
871 pub async fn prepare_for_benchmark(
872 &mut self,
873 num_chains: usize,
874 tokens_per_chain: Amount,
875 fungible_application_id: Option<ApplicationId>,
876 pub_keys: Vec<AccountPublicKey>,
877 chains_config_path: Option<&Path>,
878 close_chains: bool,
879 ) -> Result<Vec<ChainClient<Env>>, Error> {
880 let start = Instant::now();
881 self.process_inboxes_and_force_validator_updates().await;
885 info!(
886 "Processed inboxes and forced validator updates in {} ms",
887 start.elapsed().as_millis()
888 );
889
890 let start = Instant::now();
891 let (benchmark_chains, chain_clients) = self
892 .make_benchmark_chains(
893 num_chains,
894 tokens_per_chain,
895 pub_keys,
896 chains_config_path.is_some(),
897 close_chains,
898 )
899 .await?;
900 info!(
901 "Got {} chains in {} ms",
902 num_chains,
903 start.elapsed().as_millis()
904 );
905
906 if let Some(id) = fungible_application_id {
907 let start = Instant::now();
908 self.supply_fungible_tokens(&benchmark_chains, id).await?;
909 info!(
910 "Supplied fungible tokens in {} ms",
911 start.elapsed().as_millis()
912 );
913 let start = Instant::now();
915 for chain_client in &chain_clients {
916 chain_client.process_inbox().await?;
917 }
918 info!(
919 "Processed inboxes after supplying fungible tokens in {} ms",
920 start.elapsed().as_millis()
921 );
922 }
923
924 let all_chains = Benchmark::<Env>::get_all_chains(chains_config_path, &benchmark_chains)?;
925 let known_chain_ids: HashSet<_> = benchmark_chains.iter().map(|(id, _)| *id).collect();
926 let unknown_chain_ids: Vec<_> = all_chains
927 .iter()
928 .filter(|id| !known_chain_ids.contains(id))
929 .copied()
930 .collect();
931 if !unknown_chain_ids.is_empty() {
932 for chain_id in &unknown_chain_ids {
936 self.client.get_chain_description(*chain_id).await?;
937 }
938 }
939
940 Ok(chain_clients)
941 }
942
943 pub async fn wrap_up_benchmark(
944 &mut self,
945 chain_clients: Vec<ChainClient<Env>>,
946 close_chains: bool,
947 wrap_up_max_in_flight: usize,
948 ) -> Result<(), Error> {
949 if close_chains {
950 info!("Closing chains...");
951 let stream = stream::iter(chain_clients)
952 .map(|chain_client| async move {
953 Benchmark::<Env>::close_benchmark_chain(&chain_client).await?;
954 info!("Closed chain {:?}", chain_client.chain_id());
955 Ok::<(), BenchmarkError>(())
956 })
957 .buffer_unordered(wrap_up_max_in_flight);
958 stream.try_collect::<Vec<_>>().await?;
959 } else {
960 info!("Processing inbox for all chains...");
961 let stream = stream::iter(chain_clients.clone())
962 .map(|chain_client| async move {
963 chain_client.process_inbox().await?;
964 info!("Processed inbox for chain {:?}", chain_client.chain_id());
965 Ok::<(), chain_client::Error>(())
966 })
967 .buffer_unordered(wrap_up_max_in_flight);
968 stream.try_collect::<Vec<_>>().await?;
969
970 info!("Updating wallet from chain clients...");
971 for chain_client in chain_clients {
972 let info = chain_client.chain_info().await?;
973 let client_owner = chain_client.preferred_owner();
974 let pending_proposal = chain_client.pending_proposal().await;
975 self.wallet()
976 .insert(
977 info.chain_id,
978 wallet::Chain {
979 pending_proposal,
980 owner: client_owner,
981 ..info.as_ref().into()
982 },
983 )
984 .await
985 .map_err(error::Inner::wallet)?;
986 }
987 }
988
989 Ok(())
990 }
991
992 async fn process_inboxes_and_force_validator_updates(&mut self) {
993 let mut join_set = task::JoinSet::new();
994
995 let chain_clients: Vec<_> = self
996 .wallet()
997 .owned_chain_ids()
998 .map_err(|e| error::Inner::wallet(e).into())
999 .and_then(|id| self.make_chain_client(id))
1000 .try_collect()
1001 .await
1002 .unwrap();
1003
1004 for chain_client in chain_clients {
1005 join_set.spawn(async move {
1006 Self::process_inbox_without_updating_wallet(&chain_client)
1007 .await
1008 .expect("Processing inbox should not fail!");
1009 chain_client
1010 });
1011 }
1012
1013 for chain_client in join_set.join_all().await {
1014 self.update_wallet_from_client(&chain_client).await.unwrap();
1015 }
1016 }
1017
1018 async fn process_inbox_without_updating_wallet(
1019 chain_client: &ChainClient<Env>,
1020 ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
1021 chain_client.synchronize_from_validators().await?;
1023 let (certificates, maybe_timeout) = chain_client.process_inbox_without_prepare().await?;
1024 assert!(
1025 maybe_timeout.is_none(),
1026 "Should not timeout within benchmark!"
1027 );
1028
1029 Ok(certificates)
1030 }
1031
1032 async fn make_benchmark_chains(
1038 &mut self,
1039 num_chains: usize,
1040 balance: Amount,
1041 pub_keys: Vec<AccountPublicKey>,
1042 wallet_only: bool,
1043 close_chains: bool,
1044 ) -> Result<(Vec<(ChainId, AccountOwner)>, Vec<ChainClient<Env>>), Error> {
1045 let mut chains_found_in_wallet = 0;
1046 let mut benchmark_chains = Vec::with_capacity(num_chains);
1047 let mut chain_clients = Vec::with_capacity(num_chains);
1048 let start = Instant::now();
1049
1050 if !close_chains || wallet_only {
1055 let mut owned_chain_ids = std::pin::pin!(self.wallet().owned_chain_ids());
1056 while let Some(chain_id) = owned_chain_ids.next().await {
1057 let chain_id = chain_id.map_err(error::Inner::wallet)?;
1058 if chains_found_in_wallet == num_chains {
1059 break;
1060 }
1061 let chain_client = self.make_chain_client(chain_id).await?;
1062 let ownership = chain_client.chain_info().await?.manager.ownership;
1063 if !ownership.owners.is_empty() || ownership.super_owners.len() != 1 {
1064 continue;
1065 }
1066 let owner = *ownership.super_owners.first().unwrap();
1067 chain_client.process_inbox().await?;
1068 benchmark_chains.push((chain_id, owner));
1069 chain_clients.push(chain_client);
1070 chains_found_in_wallet += 1;
1071 }
1072 info!(
1073 "Got {} chains from the wallet in {} ms",
1074 benchmark_chains.len(),
1075 start.elapsed().as_millis()
1076 );
1077 }
1078
1079 let num_chains_to_create = num_chains - chains_found_in_wallet;
1080
1081 let default_chain_client = self.make_chain_client(self.default_chain()).await?;
1082
1083 if num_chains_to_create > 0 {
1084 if wallet_only {
1085 return Err(
1086 error::Inner::Benchmark(BenchmarkError::NotEnoughChainsInWallet(
1087 num_chains,
1088 chains_found_in_wallet,
1089 ))
1090 .into(),
1091 );
1092 }
1093 let mut pub_keys_iter = pub_keys.into_iter().take(num_chains_to_create);
1094 let operations_per_block = 900; for i in (0..num_chains_to_create).step_by(operations_per_block) {
1096 let num_new_chains = operations_per_block.min(num_chains_to_create - i);
1097 let owners: Vec<AccountOwner> = (&mut pub_keys_iter)
1100 .take(num_new_chains)
1101 .map(|pk| pk.into())
1102 .collect();
1103
1104 let certificate = Self::execute_open_chains_operations(
1105 &default_chain_client,
1106 balance,
1107 owners.clone(),
1108 )
1109 .await?;
1110 info!("Block executed successfully");
1111
1112 let block = certificate.block();
1113 for (i, owner) in owners.into_iter().enumerate() {
1114 let chain_id = block.body.blobs[i]
1115 .iter()
1116 .find(|blob| blob.id().blob_type == BlobType::ChainDescription)
1117 .map(|blob| ChainId(blob.id().hash))
1118 .expect("failed to create a new chain");
1119 self.client
1120 .extend_chain_mode(chain_id, ListeningMode::FullChain);
1121
1122 let mut chain_client = self.client.create_chain_client(
1123 chain_id,
1124 None,
1125 BlockHeight::ZERO,
1126 &None,
1127 Some(owner),
1128 self.timing_sender(),
1129 false,
1130 );
1131 chain_client.set_preferred_owner(owner);
1132 chain_client.process_inbox().await?;
1133 benchmark_chains.push((chain_id, owner));
1134 chain_clients.push(chain_client);
1135 }
1136 }
1137
1138 info!(
1139 "Created {} chains in {} ms",
1140 num_chains_to_create,
1141 start.elapsed().as_millis()
1142 );
1143 }
1144
1145 if !close_chains {
1147 info!("Updating wallet from client");
1148 self.update_wallet_from_client(&default_chain_client)
1149 .await?;
1150 }
1151 info!("Retrying pending outgoing messages");
1152 default_chain_client
1153 .retry_pending_outgoing_messages()
1154 .await
1155 .context("outgoing messages to create the new chains should be delivered")?;
1156 info!("Processing default chain inbox");
1157 default_chain_client.process_inbox().await?;
1158
1159 assert_eq!(
1160 benchmark_chains.len(),
1161 chain_clients.len(),
1162 "benchmark_chains and chain_clients must have the same size"
1163 );
1164
1165 Ok((benchmark_chains, chain_clients))
1166 }
1167
1168 async fn execute_open_chains_operations(
1169 chain_client: &ChainClient<Env>,
1170 balance: Amount,
1171 owners: Vec<AccountOwner>,
1172 ) -> Result<ConfirmedBlockCertificate, Error> {
1173 let operations: Vec<_> = owners
1174 .iter()
1175 .map(|owner| {
1176 let config = OpenChainConfig {
1177 ownership: ChainOwnership::single_super(*owner),
1178 balance,
1179 application_permissions: Default::default(),
1180 };
1181 Operation::system(SystemOperation::OpenChain(config))
1182 })
1183 .collect();
1184 info!("Executing {} OpenChain operations", operations.len());
1185 Ok(chain_client
1186 .execute_operations(operations, vec![])
1187 .await?
1188 .expect("should execute block with OpenChain operations"))
1189 }
1190
1191 async fn supply_fungible_tokens(
1193 &mut self,
1194 key_pairs: &[(ChainId, AccountOwner)],
1195 application_id: ApplicationId,
1196 ) -> Result<(), Error> {
1197 let default_chain_id = self.default_chain();
1198 let default_key = self
1199 .wallet()
1200 .get(default_chain_id)
1201 .await
1202 .unwrap()
1203 .unwrap()
1204 .owner
1205 .unwrap();
1206 let amount = Amount::from_nanos(4);
1208 let operations: Vec<Operation> = key_pairs
1209 .iter()
1210 .map(|(chain_id, owner)| {
1211 fungible_transfer(application_id, *chain_id, default_key, *owner, amount)
1212 })
1213 .collect();
1214 let chain_client = self.make_chain_client(default_chain_id).await?;
1215 for operation_chunk in operations.chunks(1000) {
1217 chain_client
1218 .execute_operations(operation_chunk.to_vec(), vec![])
1219 .await?
1220 .expect("should execute block with Transfer operations");
1221 }
1222 self.update_wallet_from_client(&chain_client).await?;
1223
1224 Ok(())
1225 }
1226}