1use std::sync::Arc;
5
6use futures::Future;
7use linera_base::{
8 crypto::{CryptoHash, ValidatorPublicKey},
9 data_types::{BlockHeight, Epoch, Timestamp},
10 identifiers::{Account, AccountOwner, ChainId},
11 ownership::ChainOwnership,
12 time::{Duration, Instant},
13};
14use linera_chain::{manager::LockingBlock, types::ConfirmedBlockCertificate};
15use linera_core::{
16 client::{ChainClient, Client, ListeningMode},
17 data_types::{ChainInfo, ChainInfoQuery, ClientOutcome},
18 join_set_ext::JoinSet,
19 node::ValidatorNode,
20 Environment, JoinSetExt as _,
21};
22use linera_persistent::{Persist, PersistExt as _};
23use linera_rpc::node_provider::{NodeOptions, NodeProvider};
24use linera_version::VersionInfo;
25use thiserror_context::Context;
26use tracing::{debug, info, warn};
27#[cfg(not(web))]
28use {
29 crate::{
30 benchmark::{Benchmark, BenchmarkError},
31 client_metrics::ClientMetrics,
32 },
33 futures::{stream, StreamExt, TryStreamExt},
34 linera_base::{
35 crypto::AccountPublicKey,
36 data_types::Amount,
37 identifiers::{ApplicationId, BlobType},
38 },
39 linera_core::client::chain_client,
40 linera_execution::{
41 system::{OpenChainConfig, SystemOperation},
42 Operation,
43 },
44 std::{collections::HashSet, iter, path::Path},
45 tokio::{sync::mpsc, task},
46};
47#[cfg(feature = "fs")]
48use {
49 linera_base::{
50 data_types::{BlobContent, Bytecode},
51 identifiers::ModuleId,
52 vm::VmRuntime,
53 },
54 linera_core::client::create_bytecode_blobs,
55 std::{fs, path::PathBuf},
56};
57
58use crate::{
59 chain_listener::{self, ClientContext as _},
60 client_options::{ChainOwnershipConfig, ClientContextOptions},
61 error, util,
62 wallet::{UserChain, Wallet},
63 Error,
64};
65
66pub struct ValidatorQueryResults {
68 pub version_info: Result<VersionInfo, Error>,
70 pub genesis_config_hash: Result<CryptoHash, Error>,
72 pub chain_info: Result<ChainInfo, Error>,
74}
75
76impl ValidatorQueryResults {
77 pub fn errors(&self) -> Vec<&Error> {
79 let mut errors = Vec::new();
80 if let Err(e) = &self.version_info {
81 errors.push(e);
82 }
83 if let Err(e) = &self.genesis_config_hash {
84 errors.push(e);
85 }
86 if let Err(e) = &self.chain_info {
87 errors.push(e);
88 }
89 errors
90 }
91
92 pub fn print(
97 &self,
98 public_key: Option<&ValidatorPublicKey>,
99 address: Option<&str>,
100 weight: Option<u64>,
101 reference: Option<&ValidatorQueryResults>,
102 ) {
103 if let Some(key) = public_key {
104 println!("Public key: {}", key);
105 }
106 if let Some(address) = address {
107 println!("Address: {}", address);
108 }
109 if let Some(w) = weight {
110 println!("Weight: {}", w);
111 }
112
113 let ref_version = reference.and_then(|ref_results| ref_results.version_info.as_ref().ok());
114 match &self.version_info {
115 Ok(version_info) => {
116 if ref_version.is_none_or(|ref_v| ref_v.crate_version != version_info.crate_version)
117 {
118 println!("Linera protocol: v{}", version_info.crate_version);
119 }
120 if ref_version.is_none_or(|ref_v| ref_v.rpc_hash != version_info.rpc_hash) {
121 println!("RPC API hash: {}", version_info.rpc_hash);
122 }
123 if ref_version.is_none_or(|ref_v| ref_v.graphql_hash != version_info.graphql_hash) {
124 println!("GraphQL API hash: {}", version_info.graphql_hash);
125 }
126 if ref_version.is_none_or(|ref_v| ref_v.wit_hash != version_info.wit_hash) {
127 println!("WIT API hash: v{}", version_info.wit_hash);
128 }
129 if ref_version.is_none_or(|ref_v| {
130 (&ref_v.git_commit, ref_v.git_dirty)
131 != (&version_info.git_commit, version_info.git_dirty)
132 }) {
133 println!(
134 "Source code: {}/tree/{}{}",
135 env!("CARGO_PKG_REPOSITORY"),
136 version_info.git_commit,
137 if version_info.git_dirty {
138 " (dirty)"
139 } else {
140 ""
141 }
142 );
143 }
144 }
145 Err(err) => println!("Error getting version info: {err}"),
146 }
147
148 let ref_genesis_hash =
149 reference.and_then(|ref_results| ref_results.genesis_config_hash.as_ref().ok());
150 match &self.genesis_config_hash {
151 Ok(hash) if ref_genesis_hash.is_some_and(|ref_hash| ref_hash == hash) => {}
152 Ok(hash) => println!("Genesis config hash: {hash}"),
153 Err(err) => println!("Error getting genesis config: {err}"),
154 }
155
156 let ref_info = reference.and_then(|ref_results| ref_results.chain_info.as_ref().ok());
157 match &self.chain_info {
158 Ok(info) => {
159 if ref_info.is_none_or(|ref_info| info.block_hash != ref_info.block_hash) {
160 if let Some(hash) = info.block_hash {
161 println!("Block hash: {}", hash);
162 } else {
163 println!("Block hash: None");
164 }
165 }
166 if ref_info
167 .is_none_or(|ref_info| info.next_block_height != ref_info.next_block_height)
168 {
169 println!("Next height: {}", info.next_block_height);
170 }
171 if ref_info.is_none_or(|ref_info| info.timestamp != ref_info.timestamp) {
172 println!("Timestamp: {}", info.timestamp);
173 }
174 if ref_info.is_none_or(|ref_info| info.epoch != ref_info.epoch) {
175 println!("Epoch: {}", info.epoch);
176 }
177 if ref_info.is_none_or(|ref_info| {
178 info.manager.current_round != ref_info.manager.current_round
179 }) {
180 println!("Round: {}", info.manager.current_round);
181 }
182 if let Some(locking) = &info.manager.requested_locking {
183 match &**locking {
184 LockingBlock::Fast(proposal) => {
185 println!(
186 "Locking fast block from {}",
187 proposal.content.block.timestamp
188 );
189 }
190 LockingBlock::Regular(validated) => {
191 println!(
192 "Locking block {} in {} from {}",
193 validated.hash(),
194 validated.round,
195 validated.block().header.timestamp
196 );
197 }
198 }
199 }
200 }
201 Err(err) => println!("Error getting chain info: {err}"),
202 }
203 }
204}
205
206pub struct ClientContext<Env: Environment, W> {
207 pub wallet: W,
208 pub client: Arc<Client<Env>>,
209 pub send_timeout: Duration,
210 pub recv_timeout: Duration,
211 pub retry_delay: Duration,
212 pub max_retries: u32,
213 pub chain_listeners: JoinSet,
214 #[cfg(not(web))]
215 pub client_metrics: Option<ClientMetrics>,
216}
217
218impl<Env: Environment, W> chain_listener::ClientContext for ClientContext<Env, W>
219where
220 W: Persist<Target = Wallet>,
221{
222 type Environment = Env;
223
224 fn wallet(&self) -> &Wallet {
225 &self.wallet
226 }
227
228 fn storage(&self) -> &Env::Storage {
229 self.client.storage_client()
230 }
231
232 fn client(&self) -> &Arc<Client<Env>> {
233 &self.client
234 }
235
236 #[cfg(not(web))]
237 fn timing_sender(
238 &self,
239 ) -> Option<mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
240 self.client_metrics
241 .as_ref()
242 .map(|metrics| metrics.timing_sender.clone())
243 }
244
245 async fn update_wallet_for_new_chain(
246 &mut self,
247 chain_id: ChainId,
248 owner: Option<AccountOwner>,
249 timestamp: Timestamp,
250 epoch: Epoch,
251 ) -> Result<(), Error> {
252 self.update_wallet_for_new_chain(chain_id, owner, timestamp, epoch)
253 .await
254 }
255
256 async fn update_wallet(&mut self, client: &ChainClient<Env>) -> Result<(), Error> {
257 self.update_wallet_from_client(client).await
258 }
259}
260
261impl<S, Si, W> ClientContext<linera_core::environment::Impl<S, NodeProvider, Si>, W>
262where
263 S: linera_core::environment::Storage,
264 Si: linera_core::environment::Signer,
265 W: Persist<Target = Wallet>,
266{
267 pub fn new(
268 storage: S,
269 options: ClientContextOptions,
270 wallet: W,
271 signer: Si,
272 block_cache_size: usize,
273 execution_state_cache_size: usize,
274 ) -> Self {
275 #[cfg(not(web))]
276 let timing_config = options.to_timing_config();
277 let node_provider = NodeProvider::new(NodeOptions {
278 send_timeout: options.send_timeout,
279 recv_timeout: options.recv_timeout,
280 retry_delay: options.retry_delay,
281 max_retries: options.max_retries,
282 });
283 let chain_ids = wallet.chain_ids();
284 let name = match chain_ids.len() {
285 0 => "Client node".to_string(),
286 1 => format!("Client node for {:.8}", chain_ids[0]),
287 n => format!("Client node for {:.8} and {} others", chain_ids[0], n - 1),
288 };
289 let client = Client::new(
290 linera_core::environment::Impl {
291 network: node_provider,
292 storage,
293 signer,
294 },
295 wallet.genesis_admin_chain(),
296 options.long_lived_services,
297 chain_ids,
298 name,
299 options.chain_worker_ttl,
300 options.sender_chain_worker_ttl,
301 options.to_chain_client_options(),
302 block_cache_size,
303 execution_state_cache_size,
304 );
305
306 #[cfg(not(web))]
307 let client_metrics = if timing_config.enabled {
308 Some(ClientMetrics::new(timing_config))
309 } else {
310 None
311 };
312
313 ClientContext {
314 client: Arc::new(client),
315 wallet,
316 send_timeout: options.send_timeout,
317 recv_timeout: options.recv_timeout,
318 retry_delay: options.retry_delay,
319 max_retries: options.max_retries,
320 chain_listeners: JoinSet::default(),
321 #[cfg(not(web))]
322 client_metrics,
323 }
324 }
325
326 #[cfg(with_testing)]
327 pub fn new_test_client_context(
328 storage: S,
329 wallet: W,
330 signer: Si,
331 block_cache_size: usize,
332 execution_state_cache_size: usize,
333 ) -> Self {
334 use linera_core::{client::chain_client, node::CrossChainMessageDelivery};
335
336 let send_recv_timeout = Duration::from_millis(4000);
337 let retry_delay = Duration::from_millis(1000);
338 let max_retries = 10;
339 let chain_worker_ttl = Duration::from_secs(30);
340 let sender_chain_worker_ttl = Duration::from_secs(1);
341
342 let node_options = NodeOptions {
343 send_timeout: send_recv_timeout,
344 recv_timeout: send_recv_timeout,
345 retry_delay,
346 max_retries,
347 };
348 let chain_ids = wallet.chain_ids();
349 let name = match chain_ids.len() {
350 0 => "Client node".to_string(),
351 1 => format!("Client node for {:.8}", chain_ids[0]),
352 n => format!("Client node for {:.8} and {} others", chain_ids[0], n - 1),
353 };
354 let client = Client::new(
355 linera_core::environment::Impl {
356 storage,
357 network: NodeProvider::new(node_options),
358 signer,
359 },
360 wallet.genesis_admin_chain(),
361 false,
362 chain_ids,
363 name,
364 chain_worker_ttl,
365 sender_chain_worker_ttl,
366 chain_client::Options {
367 cross_chain_message_delivery: CrossChainMessageDelivery::Blocking,
368 ..chain_client::Options::test_default()
369 },
370 block_cache_size,
371 execution_state_cache_size,
372 );
373
374 ClientContext {
375 client: Arc::new(client),
376 wallet,
377 send_timeout: send_recv_timeout,
378 recv_timeout: send_recv_timeout,
379 retry_delay,
380 max_retries,
381 chain_listeners: JoinSet::default(),
382 client_metrics: None,
383 }
384 }
385}
386
387impl<Env: Environment, W: Persist<Target = Wallet>> ClientContext<Env, W> {
388 pub fn wallet(&self) -> &Wallet {
390 &self.wallet
391 }
392
393 pub fn wallet_mut(&mut self) -> &mut W {
395 &mut self.wallet
396 }
397
398 pub async fn mutate_wallet<R: Send>(
399 &mut self,
400 mutation: impl FnOnce(&mut Wallet) -> R + Send,
401 ) -> Result<R, Error> {
402 self.wallet
403 .mutate(mutation)
404 .await
405 .map_err(|e| error::Inner::Persistence(Box::new(e)).into())
406 }
407
408 pub fn default_account(&self) -> Account {
411 Account::chain(self.default_chain())
412 }
413
414 pub fn default_chain(&self) -> ChainId {
416 self.wallet
417 .default_chain()
418 .expect("No chain specified in wallet with no default chain")
419 }
420
421 pub fn first_non_admin_chain(&self) -> ChainId {
422 self.wallet
423 .first_non_admin_chain()
424 .expect("No non-admin chain specified in wallet with no non-admin chain")
425 }
426
427 pub fn make_node_provider(&self) -> NodeProvider {
428 NodeProvider::new(self.make_node_options())
429 }
430
431 fn make_node_options(&self) -> NodeOptions {
432 NodeOptions {
433 send_timeout: self.send_timeout,
434 recv_timeout: self.recv_timeout,
435 retry_delay: self.retry_delay,
436 max_retries: self.max_retries,
437 }
438 }
439
440 #[cfg(not(web))]
441 pub fn client_metrics(&self) -> Option<&ClientMetrics> {
442 self.client_metrics.as_ref()
443 }
444
445 pub async fn save_wallet(&mut self) -> Result<(), Error> {
446 self.wallet
447 .persist()
448 .await
449 .map_err(|e| error::Inner::Persistence(Box::new(e)).into())
450 }
451
452 pub async fn update_wallet_from_client<Env_: Environment>(
453 &mut self,
454 client: &ChainClient<Env_>,
455 ) -> Result<(), Error> {
456 let info = client.chain_info().await?;
457 let client_owner = client.preferred_owner();
458 let pending_proposal = client.pending_proposal().clone();
459 self.wallet
460 .as_mut()
461 .update_from_info(pending_proposal, client_owner, &info);
462 self.save_wallet().await
463 }
464
465 pub async fn update_wallet_for_new_chain(
467 &mut self,
468 chain_id: ChainId,
469 owner: Option<AccountOwner>,
470 timestamp: Timestamp,
471 epoch: Epoch,
472 ) -> Result<(), Error> {
473 if self.wallet.get(chain_id).is_none() {
474 self.mutate_wallet(|w| {
475 w.insert(UserChain {
476 chain_id,
477 owner,
478 block_hash: None,
479 timestamp,
480 next_block_height: BlockHeight::ZERO,
481 pending_proposal: None,
482 epoch: Some(epoch),
483 })
484 })
485 .await?;
486 }
487
488 Ok(())
489 }
490
491 pub async fn process_inbox(
492 &mut self,
493 chain_client: &ChainClient<Env>,
494 ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
495 let mut certificates = Vec::new();
496 let (new_certificates, maybe_timeout) = {
498 chain_client.synchronize_from_validators().await?;
499 let result = chain_client.process_inbox_without_prepare().await;
500 self.update_wallet_from_client(chain_client).await?;
501 if result.is_err() {
502 self.save_wallet().await?;
503 }
504 result?
505 };
506 certificates.extend(new_certificates);
507 if maybe_timeout.is_none() {
508 self.save_wallet().await?;
509 return Ok(certificates);
510 }
511
512 let (listener, _listen_handle, mut notification_stream) =
514 chain_client.listen(ListeningMode::FullChain).await?;
515 self.chain_listeners.spawn_task(listener);
516
517 loop {
518 let (new_certificates, maybe_timeout) = {
519 let result = chain_client.process_inbox().await;
520 self.update_wallet_from_client(chain_client).await?;
521 if result.is_err() {
522 self.save_wallet().await?;
523 }
524 result?
525 };
526 certificates.extend(new_certificates);
527 if let Some(timestamp) = maybe_timeout {
528 util::wait_for_next_round(&mut notification_stream, timestamp).await
529 } else {
530 self.save_wallet().await?;
531 return Ok(certificates);
532 }
533 }
534 }
535
536 pub async fn assign_new_chain_to_key(
537 &mut self,
538 chain_id: ChainId,
539 owner: AccountOwner,
540 ) -> Result<(), Error> {
541 self.client.track_chain(chain_id);
542 let client = self.make_chain_client(chain_id);
543 let chain_description = client.get_chain_description().await?;
544 let config = chain_description.config();
545
546 if !config.ownership.verify_owner(&owner) {
547 tracing::error!(
548 "The chain with the ID returned by the faucet is not owned by you. \
549 Please make sure you are connecting to a genuine faucet."
550 );
551 return Err(error::Inner::ChainOwnership.into());
552 }
553
554 self.wallet_mut()
555 .mutate(|w| {
556 w.assign_new_chain_to_owner(
557 owner,
558 chain_id,
559 chain_description.timestamp(),
560 chain_description.config().epoch,
561 )
562 })
563 .await
564 .map_err(|e| error::Inner::Persistence(Box::new(e)))?
565 .context("assigning new chain")?;
566 Ok(())
567 }
568
569 pub async fn apply_client_command<E, F, Fut, T>(
574 &mut self,
575 client: &ChainClient<Env>,
576 mut f: F,
577 ) -> Result<T, Error>
578 where
579 F: FnMut(&ChainClient<Env>) -> Fut,
580 Fut: Future<Output = Result<ClientOutcome<T>, E>>,
581 Error: From<E>,
582 {
583 client.prepare_chain().await?;
584 let result = f(client).await;
586 self.update_wallet_from_client(client).await?;
587 if let ClientOutcome::Committed(t) = result? {
588 return Ok(t);
589 }
590
591 let (listener, _listen_handle, mut notification_stream) =
593 client.listen(ListeningMode::FullChain).await?;
594 self.chain_listeners.spawn_task(listener);
595
596 loop {
597 let result = f(client).await;
599 self.update_wallet_from_client(client).await?;
600 let timeout = match result? {
601 ClientOutcome::Committed(t) => return Ok(t),
602 ClientOutcome::WaitForTimeout(timeout) => timeout,
603 };
604 util::wait_for_next_round(&mut notification_stream, timeout).await;
606 }
607 }
608
609 pub async fn ownership(&mut self, chain_id: Option<ChainId>) -> Result<ChainOwnership, Error> {
610 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
611 let client = self.make_chain_client(chain_id);
612 let info = client.chain_info().await?;
613 Ok(info.manager.ownership)
614 }
615
616 pub async fn change_ownership(
617 &mut self,
618 chain_id: Option<ChainId>,
619 ownership_config: ChainOwnershipConfig,
620 ) -> Result<(), Error> {
621 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
622 let chain_client = self.make_chain_client(chain_id);
623 info!(
624 ?ownership_config, %chain_id, preferred_owner=?chain_client.preferred_owner(),
625 "Changing ownership of a chain"
626 );
627 let time_start = Instant::now();
628 let ownership = ChainOwnership::try_from(ownership_config)?;
629
630 let certificate = self
631 .apply_client_command(&chain_client, |chain_client| {
632 let ownership = ownership.clone();
633 let chain_client = chain_client.clone();
634 async move {
635 chain_client
636 .change_ownership(ownership)
637 .await
638 .map_err(Error::from)
639 .context("Failed to change ownership")
640 }
641 })
642 .await?;
643 let time_total = time_start.elapsed();
644 info!("Operation confirmed after {} ms", time_total.as_millis());
645 debug!("{:?}", certificate);
646 Ok(())
647 }
648
649 pub async fn set_preferred_owner(
650 &mut self,
651 chain_id: Option<ChainId>,
652 preferred_owner: AccountOwner,
653 ) -> Result<(), Error> {
654 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
655 let mut chain_client = self.make_chain_client(chain_id);
656 let old_owner = chain_client.preferred_owner();
657 info!(%chain_id, ?old_owner, %preferred_owner, "Changing preferred owner for chain");
658 chain_client.set_preferred_owner(preferred_owner);
659 self.update_wallet_from_client(&chain_client).await?;
660 info!("New preferred owner set");
661 Ok(())
662 }
663
664 pub async fn check_compatible_version_info(
665 &self,
666 address: &str,
667 node: &impl ValidatorNode,
668 ) -> Result<VersionInfo, Error> {
669 match node.get_version_info().await {
670 Ok(version_info) if version_info.is_compatible_with(&linera_version::VERSION_INFO) => {
671 debug!(
672 "Version information for validator {address}: {}",
673 version_info
674 );
675 Ok(version_info)
676 }
677 Ok(version_info) => Err(error::Inner::UnexpectedVersionInfo {
678 remote: Box::new(version_info),
679 local: Box::new(linera_version::VERSION_INFO.clone()),
680 }
681 .into()),
682 Err(error) => Err(error::Inner::UnavailableVersionInfo {
683 address: address.to_string(),
684 error: Box::new(error),
685 }
686 .into()),
687 }
688 }
689
690 pub async fn check_matching_network_description(
691 &self,
692 address: &str,
693 node: &impl ValidatorNode,
694 ) -> Result<CryptoHash, Error> {
695 let network_description = self.wallet().genesis_config().network_description();
696 match node.get_network_description().await {
697 Ok(description) => {
698 if description == network_description {
699 Ok(description.genesis_config_hash)
700 } else {
701 Err(error::Inner::UnexpectedNetworkDescription {
702 remote: Box::new(description),
703 local: Box::new(network_description),
704 }
705 .into())
706 }
707 }
708 Err(error) => Err(error::Inner::UnavailableNetworkDescription {
709 address: address.to_string(),
710 error: Box::new(error),
711 }
712 .into()),
713 }
714 }
715
716 pub async fn check_validator_chain_info_response(
717 &self,
718 public_key: Option<&ValidatorPublicKey>,
719 address: &str,
720 node: &impl ValidatorNode,
721 chain_id: ChainId,
722 ) -> Result<ChainInfo, Error> {
723 let query = ChainInfoQuery::new(chain_id).with_manager_values();
724 match node.handle_chain_info_query(query).await {
725 Ok(response) => {
726 debug!(
727 "Validator {address} sees chain {chain_id} at block height {} and epoch {:?}",
728 response.info.next_block_height, response.info.epoch,
729 );
730 if let Some(public_key) = public_key {
731 if response.check(*public_key).is_ok() {
732 debug!("Signature for public key {public_key} is OK.");
733 } else {
734 return Err(error::Inner::InvalidSignature {
735 public_key: *public_key,
736 }
737 .into());
738 }
739 } else {
740 warn!("Not checking signature as public key was not given");
741 }
742 Ok(*response.info)
743 }
744 Err(error) => Err(error::Inner::UnavailableChainInfo {
745 address: address.to_string(),
746 chain_id,
747 error: Box::new(error),
748 }
749 .into()),
750 }
751 }
752
753 pub async fn query_validator(
757 &self,
758 address: &str,
759 node: &impl ValidatorNode,
760 chain_id: ChainId,
761 public_key: Option<&ValidatorPublicKey>,
762 ) -> ValidatorQueryResults {
763 let version_info = self.check_compatible_version_info(address, node).await;
764 let genesis_config_hash = self.check_matching_network_description(address, node).await;
765 let chain_info = self
766 .check_validator_chain_info_response(public_key, address, node, chain_id)
767 .await;
768
769 ValidatorQueryResults {
770 version_info,
771 genesis_config_hash,
772 chain_info,
773 }
774 }
775
776 pub async fn query_local_node(&self, chain_id: ChainId) -> ValidatorQueryResults {
780 let version_info = Ok(linera_version::VERSION_INFO.clone());
781 let genesis_config_hash = Ok(self
782 .wallet()
783 .genesis_config()
784 .network_description()
785 .genesis_config_hash);
786 let chain_info = self
787 .make_chain_client(chain_id)
788 .chain_info_with_manager_values()
789 .await
790 .map(|info| *info)
791 .map_err(|e| e.into());
792
793 ValidatorQueryResults {
794 version_info,
795 genesis_config_hash,
796 chain_info,
797 }
798 }
799}
800
801#[cfg(feature = "fs")]
802impl<Env: Environment, W> ClientContext<Env, W>
803where
804 W: Persist<Target = Wallet>,
805{
806 pub async fn publish_module(
807 &mut self,
808 chain_client: &ChainClient<Env>,
809 contract: PathBuf,
810 service: PathBuf,
811 vm_runtime: VmRuntime,
812 ) -> Result<ModuleId, Error> {
813 info!("Loading bytecode files");
814 let contract_bytecode = Bytecode::load_from_file(&contract)
815 .with_context(|| format!("failed to load contract bytecode from {:?}", &contract))?;
816 let service_bytecode = Bytecode::load_from_file(&service)
817 .with_context(|| format!("failed to load service bytecode from {:?}", &service))?;
818
819 info!("Publishing module");
820 let (blobs, module_id) =
821 create_bytecode_blobs(contract_bytecode, service_bytecode, vm_runtime).await;
822 let (module_id, _) = self
823 .apply_client_command(chain_client, |chain_client| {
824 let blobs = blobs.clone();
825 let chain_client = chain_client.clone();
826 async move {
827 chain_client
828 .publish_module_blobs(blobs, module_id)
829 .await
830 .context("Failed to publish module")
831 }
832 })
833 .await?;
834
835 info!("{}", "Module published successfully!");
836
837 info!("Synchronizing client and processing inbox");
838 self.process_inbox(chain_client).await?;
839 Ok(module_id)
840 }
841
842 pub async fn publish_data_blob(
843 &mut self,
844 chain_client: &ChainClient<Env>,
845 blob_path: PathBuf,
846 ) -> Result<CryptoHash, Error> {
847 info!("Loading data blob file");
848 let blob_bytes = fs::read(&blob_path).context(format!(
849 "failed to load data blob bytes from {:?}",
850 &blob_path
851 ))?;
852
853 info!("Publishing data blob");
854 self.apply_client_command(chain_client, |chain_client| {
855 let blob_bytes = blob_bytes.clone();
856 let chain_client = chain_client.clone();
857 async move {
858 chain_client
859 .publish_data_blob(blob_bytes)
860 .await
861 .context("Failed to publish data blob")
862 }
863 })
864 .await?;
865
866 info!("{}", "Data blob published successfully!");
867 Ok(CryptoHash::new(&BlobContent::new_data(blob_bytes)))
868 }
869
870 pub async fn read_data_blob(
872 &mut self,
873 chain_client: &ChainClient<Env>,
874 hash: CryptoHash,
875 ) -> Result<(), Error> {
876 info!("Verifying data blob");
877 self.apply_client_command(chain_client, |chain_client| {
878 let chain_client = chain_client.clone();
879 async move {
880 chain_client
881 .read_data_blob(hash)
882 .await
883 .context("Failed to verify data blob")
884 }
885 })
886 .await?;
887
888 info!("{}", "Data blob verified successfully!");
889 Ok(())
890 }
891}
892
893#[cfg(not(web))]
894impl<Env: Environment, W> ClientContext<Env, W>
895where
896 W: Persist<Target = Wallet>,
897{
898 pub async fn prepare_for_benchmark(
899 &mut self,
900 num_chains: usize,
901 tokens_per_chain: Amount,
902 fungible_application_id: Option<ApplicationId>,
903 pub_keys: Vec<AccountPublicKey>,
904 chains_config_path: Option<&Path>,
905 ) -> Result<(Vec<ChainClient<Env>>, Vec<ChainId>), Error> {
906 let start = Instant::now();
907 self.process_inboxes_and_force_validator_updates().await;
911 info!(
912 "Processed inboxes and forced validator updates in {} ms",
913 start.elapsed().as_millis()
914 );
915
916 let start = Instant::now();
917 let (benchmark_chains, chain_clients) = self
918 .make_benchmark_chains(
919 num_chains,
920 tokens_per_chain,
921 pub_keys,
922 chains_config_path.is_some(),
923 )
924 .await?;
925 info!(
926 "Got {} chains in {} ms",
927 num_chains,
928 start.elapsed().as_millis()
929 );
930
931 if let Some(id) = fungible_application_id {
932 let start = Instant::now();
933 self.supply_fungible_tokens(&benchmark_chains, id).await?;
934 info!(
935 "Supplied fungible tokens in {} ms",
936 start.elapsed().as_millis()
937 );
938 let start = Instant::now();
940 for chain_client in &chain_clients {
941 chain_client.process_inbox().await?;
942 }
943 info!(
944 "Processed inboxes after supplying fungible tokens in {} ms",
945 start.elapsed().as_millis()
946 );
947 }
948
949 let all_chains = Benchmark::<Env>::get_all_chains(chains_config_path, &benchmark_chains)?;
950 let known_chain_ids: HashSet<_> = benchmark_chains.iter().map(|(id, _)| *id).collect();
951 let unknown_chain_ids: Vec<_> = all_chains
952 .iter()
953 .filter(|id| !known_chain_ids.contains(id))
954 .copied()
955 .collect();
956 if !unknown_chain_ids.is_empty() {
957 for chain_id in &unknown_chain_ids {
961 self.client.get_chain_description(*chain_id).await?;
962 }
963 }
964
965 Ok((chain_clients, all_chains))
966 }
967
968 pub async fn wrap_up_benchmark(
969 &mut self,
970 chain_clients: Vec<ChainClient<Env>>,
971 close_chains: bool,
972 wrap_up_max_in_flight: usize,
973 ) -> Result<(), Error> {
974 if close_chains {
975 info!("Closing chains...");
976 let stream = stream::iter(chain_clients)
977 .map(|chain_client| async move {
978 Benchmark::<Env>::close_benchmark_chain(&chain_client).await?;
979 info!("Closed chain {:?}", chain_client.chain_id());
980 Ok::<(), BenchmarkError>(())
981 })
982 .buffer_unordered(wrap_up_max_in_flight);
983 stream.try_collect::<Vec<_>>().await?;
984 } else {
985 info!("Processing inbox for all chains...");
986 let stream = stream::iter(chain_clients.clone())
987 .map(|chain_client| async move {
988 chain_client.process_inbox().await?;
989 info!("Processed inbox for chain {:?}", chain_client.chain_id());
990 Ok::<(), chain_client::Error>(())
991 })
992 .buffer_unordered(wrap_up_max_in_flight);
993 stream.try_collect::<Vec<_>>().await?;
994
995 info!("Updating wallet from chain clients...");
996 for chain_client in chain_clients {
997 let info = chain_client.chain_info().await?;
998 let client_owner = chain_client.preferred_owner();
999 let pending_proposal = chain_client.pending_proposal().clone();
1000 self.wallet
1001 .as_mut()
1002 .update_from_info(pending_proposal, client_owner, &info);
1003 }
1004 self.save_wallet().await?;
1005 }
1006
1007 Ok(())
1008 }
1009
1010 async fn process_inboxes_and_force_validator_updates(&mut self) {
1011 let mut chain_clients = vec![];
1012 for chain_id in &self.wallet.owned_chain_ids() {
1013 chain_clients.push(self.make_chain_client(*chain_id));
1014 }
1015
1016 let mut join_set = task::JoinSet::new();
1017 for chain_client in chain_clients {
1018 join_set.spawn(async move {
1019 Self::process_inbox_without_updating_wallet(&chain_client)
1020 .await
1021 .expect("Processing inbox should not fail!");
1022 chain_client
1023 });
1024 }
1025
1026 let chain_clients = join_set.join_all().await;
1027 for chain_client in &chain_clients {
1028 self.update_wallet_from_client(chain_client).await.unwrap();
1029 }
1030 }
1031
1032 async fn process_inbox_without_updating_wallet(
1033 chain_client: &ChainClient<Env>,
1034 ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
1035 chain_client.synchronize_from_validators().await?;
1037 let (certificates, maybe_timeout) = chain_client.process_inbox_without_prepare().await?;
1038 assert!(
1039 maybe_timeout.is_none(),
1040 "Should not timeout within benchmark!"
1041 );
1042
1043 Ok(certificates)
1044 }
1045
1046 async fn make_benchmark_chains(
1049 &mut self,
1050 num_chains: usize,
1051 balance: Amount,
1052 pub_keys: Vec<AccountPublicKey>,
1053 wallet_only: bool,
1054 ) -> Result<(Vec<(ChainId, AccountOwner)>, Vec<ChainClient<Env>>), Error> {
1055 let mut chains_found_in_wallet = 0;
1056 let mut benchmark_chains = Vec::with_capacity(num_chains);
1057 let mut chain_clients = Vec::with_capacity(num_chains);
1058 let start = Instant::now();
1059 for chain_id in self.wallet.owned_chain_ids() {
1060 if chains_found_in_wallet == num_chains {
1061 break;
1062 }
1063 let chain_client = self.make_chain_client(chain_id);
1064 let ownership = chain_client.chain_info().await?.manager.ownership;
1065 if !ownership.owners.is_empty() || ownership.super_owners.len() != 1 {
1066 continue;
1067 }
1068 chain_client.process_inbox().await?;
1069 benchmark_chains.push((
1070 chain_id,
1071 *ownership
1072 .super_owners
1073 .first()
1074 .expect("should have a super owner"),
1075 ));
1076 chain_clients.push(chain_client);
1077 chains_found_in_wallet += 1;
1078 }
1079 info!(
1080 "Got {} chains from the wallet in {} ms",
1081 benchmark_chains.len(),
1082 start.elapsed().as_millis()
1083 );
1084
1085 let num_chains_to_create = num_chains - chains_found_in_wallet;
1086
1087 let default_chain_id = self
1088 .wallet
1089 .default_chain()
1090 .expect("should have default chain");
1091 let default_chain_client = self.make_chain_client(default_chain_id);
1092
1093 if num_chains_to_create > 0 {
1094 if wallet_only {
1095 return Err(
1096 error::Inner::Benchmark(BenchmarkError::NotEnoughChainsInWallet(
1097 num_chains,
1098 chains_found_in_wallet,
1099 ))
1100 .into(),
1101 );
1102 }
1103 let mut pub_keys_iter = pub_keys.into_iter().take(num_chains_to_create);
1104 let operations_per_block = 900; for i in (0..num_chains_to_create).step_by(operations_per_block) {
1106 let num_new_chains = operations_per_block.min(num_chains_to_create - i);
1107 let pub_key = pub_keys_iter.next().unwrap();
1108 let owner = pub_key.into();
1109
1110 let certificate = Self::execute_open_chains_operations(
1111 num_new_chains,
1112 &default_chain_client,
1113 balance,
1114 owner,
1115 )
1116 .await?;
1117 info!("Block executed successfully");
1118
1119 let block = certificate.block();
1120 for i in 0..num_new_chains {
1121 let chain_id = block.body.blobs[i]
1122 .iter()
1123 .find(|blob| blob.id().blob_type == BlobType::ChainDescription)
1124 .map(|blob| ChainId(blob.id().hash))
1125 .expect("failed to create a new chain");
1126 self.client.track_chain(chain_id);
1127
1128 let mut chain_client = self.client.create_chain_client(
1129 chain_id,
1130 None,
1131 BlockHeight::ZERO,
1132 None,
1133 Some(owner),
1134 self.timing_sender(),
1135 );
1136 chain_client.set_preferred_owner(owner);
1137 chain_client.process_inbox().await?;
1138 benchmark_chains.push((chain_id, owner));
1139 chain_clients.push(chain_client);
1140 }
1141 }
1142
1143 info!(
1144 "Created {} chains in {} ms",
1145 num_chains_to_create,
1146 start.elapsed().as_millis()
1147 );
1148 }
1149
1150 info!("Updating wallet from client");
1151 self.update_wallet_from_client(&default_chain_client)
1152 .await?;
1153 info!("Retrying pending outgoing messages");
1154 default_chain_client
1155 .retry_pending_outgoing_messages()
1156 .await
1157 .context("outgoing messages to create the new chains should be delivered")?;
1158 info!("Processing default chain inbox");
1159 default_chain_client.process_inbox().await?;
1160
1161 assert_eq!(
1162 benchmark_chains.len(),
1163 chain_clients.len(),
1164 "benchmark_chains and chain_clients must have the same size"
1165 );
1166
1167 Ok((benchmark_chains, chain_clients))
1168 }
1169
1170 async fn execute_open_chains_operations(
1171 num_new_chains: usize,
1172 chain_client: &ChainClient<Env>,
1173 balance: Amount,
1174 owner: AccountOwner,
1175 ) -> Result<ConfirmedBlockCertificate, Error> {
1176 let config = OpenChainConfig {
1177 ownership: ChainOwnership::single_super(owner),
1178 balance,
1179 application_permissions: Default::default(),
1180 };
1181 let operations = iter::repeat_n(
1182 Operation::system(SystemOperation::OpenChain(config)),
1183 num_new_chains,
1184 )
1185 .collect();
1186 info!("Executing {} OpenChain operations", num_new_chains);
1187 Ok(chain_client
1188 .execute_operations(operations, vec![])
1189 .await?
1190 .expect("should execute block with OpenChain operations"))
1191 }
1192
1193 async fn supply_fungible_tokens(
1195 &mut self,
1196 key_pairs: &[(ChainId, AccountOwner)],
1197 application_id: ApplicationId,
1198 ) -> Result<(), Error> {
1199 let default_chain_id = self
1200 .wallet
1201 .default_chain()
1202 .expect("should have default chain");
1203 let default_key = self.wallet.get(default_chain_id).unwrap().owner.unwrap();
1204 let amount = Amount::from_nanos(4);
1206 let operations: Vec<Operation> = key_pairs
1207 .iter()
1208 .map(|(chain_id, owner)| {
1209 Benchmark::<Env>::fungible_transfer(
1210 application_id,
1211 *chain_id,
1212 default_key,
1213 *owner,
1214 amount,
1215 )
1216 })
1217 .collect();
1218 let chain_client = self.make_chain_client(default_chain_id);
1219 for operation_chunk in operations.chunks(1000) {
1221 chain_client
1222 .execute_operations(operation_chunk.to_vec(), vec![])
1223 .await?
1224 .expect("should execute block with Transfer operations");
1225 }
1226 self.update_wallet_from_client(&chain_client).await?;
1227
1228 Ok(())
1229 }
1230}