1use std::sync::Arc;
5
6#[cfg(not(web))]
7use futures::StreamExt as _;
8use futures::{Future, TryStreamExt as _};
9use linera_base::{
10 crypto::{CryptoHash, ValidatorPublicKey},
11 data_types::{ChainDescription, Epoch, Timestamp},
12 identifiers::{Account, AccountOwner, ChainId},
13 ownership::ChainOwnership,
14 time::{Duration, Instant},
15 util::future::FutureSyncExt as _,
16};
17use linera_chain::{manager::LockingBlock, types::ConfirmedBlockCertificate};
18use linera_core::{
19 client::{chain_client, ChainClient, Client, ListeningMode},
20 data_types::{ChainInfo, ChainInfoQuery, ClientOutcome},
21 join_set_ext::JoinSet,
22 node::ValidatorNode,
23 wallet, Environment, JoinSetExt as _, Wallet as _,
24};
25use linera_rpc::node_provider::{NodeOptions, NodeProvider};
26use linera_storage::Storage as _;
27use linera_version::VersionInfo;
28use thiserror_context::Context;
29use tracing::{debug, info, warn};
30#[cfg(not(web))]
31use {
32 crate::{
33 benchmark::{fungible_transfer, Benchmark, BenchmarkError},
34 client_metrics::ClientMetrics,
35 },
36 futures::stream,
37 linera_base::{
38 crypto::AccountPublicKey,
39 data_types::{Amount, BlockHeight},
40 identifiers::{ApplicationId, BlobType},
41 },
42 linera_execution::{
43 system::{OpenChainConfig, SystemOperation},
44 Operation,
45 },
46 std::{collections::HashSet, path::Path},
47 tokio::{sync::mpsc, task},
48};
49#[cfg(feature = "fs")]
50use {
51 linera_base::{
52 data_types::{BlobContent, Bytecode},
53 identifiers::ModuleId,
54 vm::VmRuntime,
55 },
56 linera_core::client::create_bytecode_blobs,
57 std::{fs, path::PathBuf},
58};
59
60use crate::{
61 chain_listener::{self, ClientContext as _},
62 client_options::{ChainOwnershipConfig, Options},
63 config::GenesisConfig,
64 error, util, Error,
65};
66
67pub struct ValidatorQueryResults {
69 pub version_info: Result<VersionInfo, Error>,
71 pub genesis_config_hash: Result<CryptoHash, Error>,
73 pub chain_info: Result<ChainInfo, Error>,
75}
76
77impl ValidatorQueryResults {
78 pub fn errors(&self) -> Vec<&Error> {
80 let mut errors = Vec::new();
81 if let Err(e) = &self.version_info {
82 errors.push(e);
83 }
84 if let Err(e) = &self.genesis_config_hash {
85 errors.push(e);
86 }
87 if let Err(e) = &self.chain_info {
88 errors.push(e);
89 }
90 errors
91 }
92
93 pub fn print(
98 &self,
99 public_key: Option<&ValidatorPublicKey>,
100 address: Option<&str>,
101 weight: Option<u64>,
102 reference: Option<&ValidatorQueryResults>,
103 ) {
104 if let Some(key) = public_key {
105 println!("Public key: {key}");
106 }
107 if let Some(address) = address {
108 println!("Address: {address}");
109 }
110 if let Some(w) = weight {
111 println!("Weight: {w}");
112 }
113
114 let ref_version = reference.and_then(|ref_results| ref_results.version_info.as_ref().ok());
115 match &self.version_info {
116 Ok(version_info) => {
117 if ref_version.is_none_or(|ref_v| ref_v.crate_version != version_info.crate_version)
118 {
119 println!("Linera protocol: v{}", version_info.crate_version);
120 }
121 if ref_version.is_none_or(|ref_v| ref_v.rpc_hash != version_info.rpc_hash) {
122 println!("RPC API hash: {}", version_info.rpc_hash);
123 }
124 if ref_version.is_none_or(|ref_v| ref_v.graphql_hash != version_info.graphql_hash) {
125 println!("GraphQL API hash: {}", version_info.graphql_hash);
126 }
127 if ref_version.is_none_or(|ref_v| ref_v.wit_hash != version_info.wit_hash) {
128 println!("WIT API hash: {}", version_info.wit_hash);
129 }
130 if ref_version.is_none_or(|ref_v| {
131 (&ref_v.git_commit, ref_v.git_dirty)
132 != (&version_info.git_commit, version_info.git_dirty)
133 }) {
134 println!(
135 "Source code: {}/tree/{}{}",
136 env!("CARGO_PKG_REPOSITORY"),
137 version_info.git_commit,
138 if version_info.git_dirty {
139 " (dirty)"
140 } else {
141 ""
142 }
143 );
144 }
145 }
146 Err(err) => println!("Error getting version info: {err}"),
147 }
148
149 let ref_genesis_hash =
150 reference.and_then(|ref_results| ref_results.genesis_config_hash.as_ref().ok());
151 match &self.genesis_config_hash {
152 Ok(hash) if ref_genesis_hash.is_some_and(|ref_hash| ref_hash == hash) => {}
153 Ok(hash) => println!("Genesis config hash: {hash}"),
154 Err(err) => println!("Error getting genesis config: {err}"),
155 }
156
157 let ref_info = reference.and_then(|ref_results| ref_results.chain_info.as_ref().ok());
158 match &self.chain_info {
159 Ok(info) => {
160 if ref_info.is_none_or(|ref_info| info.block_hash != ref_info.block_hash) {
161 if let Some(hash) = info.block_hash {
162 println!("Block hash: {hash}");
163 } else {
164 println!("Block hash: None");
165 }
166 }
167 if ref_info
168 .is_none_or(|ref_info| info.next_block_height != ref_info.next_block_height)
169 {
170 println!("Next height: {}", info.next_block_height);
171 }
172 if ref_info.is_none_or(|ref_info| info.timestamp != ref_info.timestamp) {
173 println!("Timestamp: {}", info.timestamp);
174 }
175 if ref_info.is_none_or(|ref_info| info.epoch != ref_info.epoch) {
176 println!("Epoch: {}", info.epoch);
177 }
178 if ref_info.is_none_or(|ref_info| {
179 info.manager.current_round != ref_info.manager.current_round
180 }) {
181 println!("Round: {}", info.manager.current_round);
182 }
183 if let Some(locking) = &info.manager.requested_locking {
184 match &**locking {
185 LockingBlock::Fast(proposal) => {
186 println!(
187 "Locking fast block from {}",
188 proposal.content.block.timestamp
189 );
190 }
191 LockingBlock::Regular(validated) => {
192 println!(
193 "Locking block {} in {} from {}",
194 validated.hash(),
195 validated.round,
196 validated.block().header.timestamp
197 );
198 }
199 }
200 }
201 }
202 Err(err) => println!("Error getting chain info: {err}"),
203 }
204 println!();
205 }
206}
207
208pub struct ClientContext<Env: Environment> {
209 pub client: Arc<Client<Env>>,
210 pub genesis_config: crate::config::GenesisConfig,
212 pub send_timeout: Duration,
213 pub recv_timeout: Duration,
214 pub retry_delay: Duration,
215 pub max_retries: u32,
216 pub max_backoff: Duration,
217 pub chain_listeners: JoinSet,
218 pub default_chain: Option<ChainId>,
220 #[cfg(not(web))]
221 pub client_metrics: Option<ClientMetrics>,
222}
223
224impl<Env: Environment> chain_listener::ClientContext for ClientContext<Env> {
225 type Environment = Env;
226
227 fn wallet(&self) -> &Env::Wallet {
228 self.client.wallet()
229 }
230
231 fn storage(&self) -> &Env::Storage {
232 self.client.storage_client()
233 }
234
235 fn client(&self) -> &Arc<Client<Env>> {
236 &self.client
237 }
238
239 #[cfg(not(web))]
240 fn timing_sender(
241 &self,
242 ) -> Option<mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
243 self.client_metrics
244 .as_ref()
245 .map(|metrics| metrics.timing_sender.clone())
246 }
247
248 async fn update_wallet_for_new_chain(
249 &mut self,
250 chain_id: ChainId,
251 owner: Option<AccountOwner>,
252 timestamp: Timestamp,
253 epoch: Epoch,
254 ) -> Result<(), Error> {
255 self.update_wallet_for_new_chain(chain_id, owner, timestamp, epoch)
256 .make_sync()
257 .await
258 }
259
260 async fn update_wallet(&mut self, client: &ChainClient<Env>) -> Result<(), Error> {
261 self.update_wallet_from_client(client).make_sync().await
262 }
263}
264
265impl<S, Si, W> ClientContext<linera_core::environment::Impl<S, NodeProvider, Si, W>>
266where
267 S: linera_core::environment::Storage,
268 Si: linera_core::environment::Signer,
269 W: linera_core::environment::Wallet,
270{
271 #[expect(clippy::too_many_arguments)]
275 pub async fn new(
276 storage: S,
277 wallet: W,
278 signer: Si,
279 options: &Options,
280 default_chain: Option<ChainId>,
281 genesis_config: GenesisConfig,
282 block_cache_size: usize,
283 execution_state_cache_size: usize,
284 ) -> Result<Self, Error> {
285 #[cfg(not(web))]
286 let timing_config = options.to_timing_config();
287 let node_provider = NodeProvider::new(NodeOptions {
288 send_timeout: options.send_timeout,
289 recv_timeout: options.recv_timeout,
290 retry_delay: options.retry_delay,
291 max_retries: options.max_retries,
292 max_backoff: options.max_backoff,
293 });
294 let chain_modes: Vec<_> = wallet
295 .items()
296 .map_ok(|(id, chain)| {
297 let mode = if chain.is_follow_only() {
298 ListeningMode::FollowChain
299 } else {
300 ListeningMode::FullChain
301 };
302 (id, mode)
303 })
304 .try_collect()
305 .await
306 .map_err(error::Inner::wallet)?;
307 let name = match chain_modes.len() {
308 0 => "Client node".to_string(),
309 1 => format!("Client node for {:.8}", chain_modes[0].0),
310 n => format!(
311 "Client node for {:.8} and {} others",
312 chain_modes[0].0,
313 n - 1
314 ),
315 };
316
317 let client = Client::new(
318 linera_core::environment::Impl {
319 network: node_provider,
320 storage,
321 signer,
322 wallet,
323 },
324 genesis_config.admin_chain_id(),
325 options.long_lived_services,
326 chain_modes,
327 name,
328 util::non_zero_duration(options.chain_worker_ttl),
329 util::non_zero_duration(options.sender_chain_worker_ttl),
330 options.cross_chain_batch_size_limit,
331 options.to_chain_client_options(),
332 block_cache_size,
333 execution_state_cache_size,
334 &options.to_requests_scheduler_config(),
335 );
336
337 #[cfg(not(web))]
338 let client_metrics = if timing_config.enabled {
339 Some(ClientMetrics::new(timing_config))
340 } else {
341 None
342 };
343
344 Ok(ClientContext {
345 client: Arc::new(client),
346 default_chain,
347 genesis_config,
348 send_timeout: options.send_timeout,
349 recv_timeout: options.recv_timeout,
350 retry_delay: options.retry_delay,
351 max_retries: options.max_retries,
352 max_backoff: options.max_backoff,
353 chain_listeners: JoinSet::default(),
354 #[cfg(not(web))]
355 client_metrics,
356 })
357 }
358}
359
360impl<Env: Environment> ClientContext<Env> {
361 pub fn wallet(&self) -> &Env::Wallet {
365 self.client.wallet()
366 }
367
368 pub fn admin_chain_id(&self) -> ChainId {
370 self.client.admin_chain_id()
371 }
372
373 pub fn default_account(&self) -> Account {
376 Account::chain(self.default_chain())
377 }
378
379 pub fn default_chain(&self) -> ChainId {
381 self.default_chain
382 .expect("default chain requested but none set")
383 }
384
385 pub async fn first_non_admin_chain(&self) -> Result<ChainId, Error> {
386 let admin_chain_id = self.admin_chain_id();
387 let chain_ids = self
388 .wallet()
389 .chain_ids()
390 .try_filter(|chain_id| futures::future::ready(*chain_id != admin_chain_id))
391 .try_collect::<Vec<ChainId>>()
392 .await
393 .map_err(Error::wallet)?;
394 Ok(chain_ids
395 .into_iter()
396 .min()
397 .expect("No non-admin chain specified in wallet with no non-admin chain"))
398 }
399
400 pub fn make_node_provider(&self) -> NodeProvider {
402 NodeProvider::new(self.make_node_options())
403 }
404
405 fn make_node_options(&self) -> NodeOptions {
406 NodeOptions {
407 send_timeout: self.send_timeout,
408 recv_timeout: self.recv_timeout,
409 retry_delay: self.retry_delay,
410 max_retries: self.max_retries,
411 max_backoff: self.max_backoff,
412 }
413 }
414
415 #[cfg(not(web))]
416 pub fn client_metrics(&self) -> Option<&ClientMetrics> {
417 self.client_metrics.as_ref()
418 }
419
420 pub async fn update_wallet_from_client<Env_: Environment>(
421 &self,
422 client: &ChainClient<Env_>,
423 ) -> Result<(), Error> {
424 let info = client.chain_info().await?;
425 let chain_id = info.chain_id;
426 let existing_owner = self
427 .wallet()
428 .get(chain_id)
429 .await
430 .map_err(error::Inner::wallet)?
431 .and_then(|chain| chain.owner);
432
433 let pending_fast_proposal = client
436 .pending_proposal()
437 .await
438 .filter(|p| p.round.is_some_and(|r| r.is_fast()));
439 let new_chain = wallet::Chain {
440 pending_fast_proposal,
441 owner: existing_owner,
442 ..info.as_ref().into()
443 };
444
445 self.wallet()
446 .insert(chain_id, new_chain)
447 .await
448 .map_err(error::Inner::wallet)?;
449
450 Ok(())
451 }
452
453 pub async fn update_wallet_for_new_chain(
455 &mut self,
456 chain_id: ChainId,
457 owner: Option<AccountOwner>,
458 timestamp: Timestamp,
459 epoch: Epoch,
460 ) -> Result<(), Error> {
461 self.wallet()
462 .try_insert(
463 chain_id,
464 linera_core::wallet::Chain::new(owner, epoch, timestamp),
465 )
466 .await
467 .map_err(error::Inner::wallet)?;
468 Ok(())
469 }
470
471 pub async fn extend_with_chain(
474 &mut self,
475 description: ChainDescription,
476 owner: Option<AccountOwner>,
477 ) -> Result<(), Error> {
478 let chain_id = description.id();
479 self.client
480 .storage_client()
481 .create_chain(description.clone())
482 .await?;
483 self.wallet()
484 .try_insert(
485 chain_id,
486 linera_core::wallet::Chain::new(
487 owner,
488 description.config().epoch,
489 description.timestamp(),
490 ),
491 )
492 .await
493 .map_err(error::Inner::wallet)?;
494 self.client
495 .extend_chain_mode(chain_id, ListeningMode::FullChain);
496 Ok(())
497 }
498
499 pub async fn process_inbox(
500 &mut self,
501 chain_client: &ChainClient<Env>,
502 ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
503 let mut certificates = Vec::new();
504 let (new_certificates, maybe_timeout) = {
506 chain_client.synchronize_from_validators().await?;
507 let result = chain_client.process_inbox_without_prepare().await;
508 self.update_wallet_from_client(chain_client).await?;
509 result?
510 };
511 certificates.extend(new_certificates);
512 if maybe_timeout.is_none() {
513 return Ok(certificates);
514 }
515
516 let (listener, _listen_handle, mut notification_stream) = chain_client.listen().await?;
518 self.chain_listeners.spawn_task(listener);
519
520 loop {
521 let (new_certificates, maybe_timeout) = {
522 let result = chain_client.process_inbox().await;
523 self.update_wallet_from_client(chain_client).await?;
524 result?
525 };
526 certificates.extend(new_certificates);
527 if let Some(timestamp) = maybe_timeout {
528 util::wait_for_next_round(&mut notification_stream, timestamp).await
529 } else {
530 return Ok(certificates);
531 }
532 }
533 }
534
535 pub async fn assign_new_chain_to_key(
536 &mut self,
537 chain_id: ChainId,
538 owner: AccountOwner,
539 ) -> Result<(), Error> {
540 self.client
541 .extend_chain_mode(chain_id, ListeningMode::FullChain);
542 let client = self.make_chain_client(chain_id).await?;
543 let info = client.prepare_for_owner(owner).await.map_err(|error| {
544 tracing::error!(%chain_id, %owner, %error, "Chain is not owned");
545 error::Inner::ChainOwnership
546 })?;
547
548 let modified = self
550 .wallet()
551 .modify(chain_id, |chain| chain.owner = Some(owner))
552 .await
553 .map_err(error::Inner::wallet)?;
554 if modified.is_none() {
556 self.wallet()
557 .insert(
558 chain_id,
559 wallet::Chain {
560 owner: Some(owner),
561 timestamp: info.timestamp,
562 epoch: Some(info.epoch),
563 ..Default::default()
564 },
565 )
566 .await
567 .map_err(error::Inner::wallet)
568 .context("assigning new chain")?;
569 }
570 Ok(())
571 }
572
573 pub async fn apply_client_command<E, F, Fut, T>(
578 &mut self,
579 client: &ChainClient<Env>,
580 mut f: F,
581 ) -> Result<T, Error>
582 where
583 F: FnMut(&ChainClient<Env>) -> Fut,
584 Fut: Future<Output = Result<ClientOutcome<T>, E>>,
585 Error: From<E>,
586 {
587 client.prepare_chain().await?;
588 let result = f(client).await;
590 self.update_wallet_from_client(client).await?;
591 match result? {
592 ClientOutcome::Committed(t) => return Ok(t),
593 ClientOutcome::Conflict(certificate) => {
594 return Err(chain_client::Error::Conflict(certificate.hash()).into());
595 }
596 ClientOutcome::WaitForTimeout(_) => {}
597 }
598
599 let (listener, _listen_handle, mut notification_stream) = client.listen().await?;
601 self.chain_listeners.spawn_task(listener);
602
603 loop {
604 let result = f(client).await;
606 self.update_wallet_from_client(client).await?;
607 let timeout = match result? {
608 ClientOutcome::Committed(t) => return Ok(t),
609 ClientOutcome::Conflict(certificate) => {
610 return Err(chain_client::Error::Conflict(certificate.hash()).into());
611 }
612 ClientOutcome::WaitForTimeout(timeout) => timeout,
613 };
614 util::wait_for_next_round(&mut notification_stream, timeout).await;
616 }
617 }
618
619 pub async fn ownership(&mut self, chain_id: Option<ChainId>) -> Result<ChainOwnership, Error> {
620 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
621 let client = self.make_chain_client(chain_id).await?;
622 let info = client.chain_info().await?;
623 Ok(info.manager.ownership)
624 }
625
626 pub async fn change_ownership(
627 &mut self,
628 chain_id: Option<ChainId>,
629 ownership_config: ChainOwnershipConfig,
630 ) -> Result<(), Error> {
631 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
632 let chain_client = self.make_chain_client(chain_id).await?;
633 info!(
634 ?ownership_config, %chain_id, preferred_owner=?chain_client.preferred_owner(),
635 "Changing ownership of a chain"
636 );
637 let time_start = Instant::now();
638 let mut ownership = chain_client.query_chain_ownership().await?;
639 ownership_config.update(&mut ownership)?;
640
641 if ownership.super_owners.is_empty() && ownership.owners.is_empty() {
642 tracing::error!("At least one owner or super owner of the chain has to be set.");
643 return Err(error::Inner::ChainOwnership.into());
644 }
645
646 let certificate = self
647 .apply_client_command(&chain_client, |chain_client| {
648 let ownership = ownership.clone();
649 let chain_client = chain_client.clone();
650 async move {
651 chain_client
652 .change_ownership(ownership)
653 .await
654 .map_err(Error::from)
655 .context("Failed to change ownership")
656 }
657 })
658 .await?;
659 let time_total = time_start.elapsed();
660 info!("Operation confirmed after {} ms", time_total.as_millis());
661 debug!("{:?}", certificate);
662 Ok(())
663 }
664
665 pub async fn set_preferred_owner(
666 &mut self,
667 chain_id: Option<ChainId>,
668 preferred_owner: AccountOwner,
669 ) -> Result<(), Error> {
670 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
671 let mut chain_client = self.make_chain_client(chain_id).await?;
672 let old_owner = chain_client.preferred_owner();
673 info!(%chain_id, ?old_owner, %preferred_owner, "Changing preferred owner for chain");
674 chain_client.set_preferred_owner(preferred_owner);
675 self.update_wallet_from_client(&chain_client).await?;
676 info!("New preferred owner set");
677 Ok(())
678 }
679
680 pub async fn check_compatible_version_info(
681 &self,
682 address: &str,
683 node: &impl ValidatorNode,
684 ) -> Result<VersionInfo, Error> {
685 match node.get_version_info().await {
686 Ok(version_info) if version_info.is_compatible_with(&linera_version::VERSION_INFO) => {
687 debug!(
688 "Version information for validator {address}: {}",
689 version_info
690 );
691 Ok(version_info)
692 }
693 Ok(version_info) => Err(error::Inner::UnexpectedVersionInfo {
694 remote: Box::new(version_info),
695 local: Box::new(linera_version::VERSION_INFO.clone()),
696 }
697 .into()),
698 Err(error) => Err(error::Inner::UnavailableVersionInfo {
699 address: address.to_string(),
700 error: Box::new(error),
701 }
702 .into()),
703 }
704 }
705
706 pub async fn check_matching_network_description(
707 &self,
708 address: &str,
709 node: &impl ValidatorNode,
710 ) -> Result<CryptoHash, Error> {
711 let network_description = self.genesis_config.network_description();
712 match node.get_network_description().await {
713 Ok(description) => {
714 if description == network_description {
715 Ok(description.genesis_config_hash)
716 } else {
717 Err(error::Inner::UnexpectedNetworkDescription {
718 remote: Box::new(description),
719 local: Box::new(network_description),
720 }
721 .into())
722 }
723 }
724 Err(error) => Err(error::Inner::UnavailableNetworkDescription {
725 address: address.to_string(),
726 error: Box::new(error),
727 }
728 .into()),
729 }
730 }
731
732 pub async fn check_validator_chain_info_response(
733 &self,
734 public_key: Option<&ValidatorPublicKey>,
735 address: &str,
736 node: &impl ValidatorNode,
737 chain_id: ChainId,
738 ) -> Result<ChainInfo, Error> {
739 let query = ChainInfoQuery::new(chain_id).with_manager_values();
740 match node.handle_chain_info_query(query).await {
741 Ok(response) => {
742 debug!(
743 "Validator {address} sees chain {chain_id} at block height {} and epoch {:?}",
744 response.info.next_block_height, response.info.epoch,
745 );
746 if let Some(public_key) = public_key {
747 if response.check(*public_key).is_ok() {
748 debug!("Signature for public key {public_key} is OK.");
749 } else {
750 return Err(error::Inner::InvalidSignature {
751 public_key: *public_key,
752 }
753 .into());
754 }
755 } else {
756 warn!("Not checking signature as public key was not given");
757 }
758 Ok(*response.info)
759 }
760 Err(error) => Err(error::Inner::UnavailableChainInfo {
761 address: address.to_string(),
762 chain_id,
763 error: Box::new(error),
764 }
765 .into()),
766 }
767 }
768
769 pub async fn query_validator(
773 &self,
774 address: &str,
775 node: &impl ValidatorNode,
776 chain_id: ChainId,
777 public_key: Option<&ValidatorPublicKey>,
778 ) -> ValidatorQueryResults {
779 let version_info = self.check_compatible_version_info(address, node).await;
780 let genesis_config_hash = self.check_matching_network_description(address, node).await;
781 let chain_info = self
782 .check_validator_chain_info_response(public_key, address, node, chain_id)
783 .await;
784
785 ValidatorQueryResults {
786 version_info,
787 genesis_config_hash,
788 chain_info,
789 }
790 }
791
792 pub async fn query_local_node(
796 &self,
797 chain_id: ChainId,
798 ) -> Result<ValidatorQueryResults, Error> {
799 let version_info = Ok(linera_version::VERSION_INFO.clone());
800 let genesis_config_hash = Ok(self
801 .genesis_config
802 .network_description()
803 .genesis_config_hash);
804 let chain_info = self
805 .make_chain_client(chain_id)
806 .await?
807 .chain_info_with_manager_values()
808 .await
809 .map(|info| *info)
810 .map_err(|e| e.into());
811
812 Ok(ValidatorQueryResults {
813 version_info,
814 genesis_config_hash,
815 chain_info,
816 })
817 }
818}
819
820#[cfg(feature = "fs")]
821impl<Env: Environment> ClientContext<Env> {
822 pub async fn publish_module(
823 &mut self,
824 chain_client: &ChainClient<Env>,
825 contract: PathBuf,
826 service: PathBuf,
827 vm_runtime: VmRuntime,
828 ) -> Result<ModuleId, Error> {
829 info!("Loading bytecode files");
830 let contract_bytecode = Bytecode::load_from_file(&contract).await.map_err(|e| {
831 std::io::Error::new(
832 e.kind(),
833 format!("failed to load contract bytecode from {contract:?}: {e}"),
834 )
835 })?;
836 let service_bytecode = Bytecode::load_from_file(&service).await.map_err(|e| {
837 std::io::Error::new(
838 e.kind(),
839 format!("failed to load service bytecode from {service:?}: {e}"),
840 )
841 })?;
842
843 info!("Publishing module");
844 let (blobs, module_id) =
845 create_bytecode_blobs(contract_bytecode, service_bytecode, vm_runtime).await;
846 let (module_id, _) = self
847 .apply_client_command(chain_client, |chain_client| {
848 let blobs = blobs.clone();
849 let chain_client = chain_client.clone();
850 async move {
851 chain_client
852 .publish_module_blobs(blobs, module_id)
853 .await
854 .context("Failed to publish module")
855 }
856 })
857 .await?;
858
859 info!("{}", "Module published successfully!");
860
861 info!("Synchronizing client and processing inbox");
862 self.process_inbox(chain_client).await?;
863 Ok(module_id)
864 }
865
866 pub async fn publish_data_blob(
867 &mut self,
868 chain_client: &ChainClient<Env>,
869 blob_path: PathBuf,
870 ) -> Result<CryptoHash, Error> {
871 info!("Loading data blob file");
872 let blob_bytes = fs::read(&blob_path).map_err(|e| {
873 std::io::Error::new(
874 e.kind(),
875 format!("failed to load data blob bytes from {blob_path:?}: {e}"),
876 )
877 })?;
878
879 info!("Publishing data blob");
880 self.apply_client_command(chain_client, |chain_client| {
881 let blob_bytes = blob_bytes.clone();
882 let chain_client = chain_client.clone();
883 async move {
884 chain_client
885 .publish_data_blob(blob_bytes)
886 .await
887 .context("Failed to publish data blob")
888 }
889 })
890 .await?;
891
892 info!("{}", "Data blob published successfully!");
893 Ok(CryptoHash::new(&BlobContent::new_data(blob_bytes)))
894 }
895
896 pub async fn read_data_blob(
898 &mut self,
899 chain_client: &ChainClient<Env>,
900 hash: CryptoHash,
901 ) -> Result<(), Error> {
902 info!("Verifying data blob");
903 self.apply_client_command(chain_client, |chain_client| {
904 let chain_client = chain_client.clone();
905 async move {
906 chain_client
907 .read_data_blob(hash)
908 .await
909 .context("Failed to verify data blob")
910 }
911 })
912 .await?;
913
914 info!("{}", "Data blob verified successfully!");
915 Ok(())
916 }
917}
918
919#[cfg(not(web))]
920impl<Env: Environment> ClientContext<Env> {
921 pub async fn prepare_for_benchmark(
922 &mut self,
923 num_chains: usize,
924 tokens_per_chain: Amount,
925 fungible_application_id: Option<ApplicationId>,
926 pub_keys: Vec<AccountPublicKey>,
927 chains_config_path: Option<&Path>,
928 close_chains: bool,
929 ) -> Result<Vec<ChainClient<Env>>, Error> {
930 let start = Instant::now();
931 self.process_inboxes_and_force_validator_updates().await;
935 info!(
936 "Processed inboxes and forced validator updates in {} ms",
937 start.elapsed().as_millis()
938 );
939
940 let start = Instant::now();
941 let (benchmark_chains, chain_clients) = self
942 .make_benchmark_chains(
943 num_chains,
944 tokens_per_chain,
945 pub_keys,
946 chains_config_path.is_some(),
947 close_chains,
948 )
949 .await?;
950 info!(
951 "Got {} chains in {} ms",
952 num_chains,
953 start.elapsed().as_millis()
954 );
955
956 if let Some(id) = fungible_application_id {
957 let start = Instant::now();
958 self.supply_fungible_tokens(&benchmark_chains, id).await?;
959 info!(
960 "Supplied fungible tokens in {} ms",
961 start.elapsed().as_millis()
962 );
963 let start = Instant::now();
965 for chain_client in &chain_clients {
966 chain_client.process_inbox().await?;
967 }
968 info!(
969 "Processed inboxes after supplying fungible tokens in {} ms",
970 start.elapsed().as_millis()
971 );
972 }
973
974 let all_chains = Benchmark::<Env>::get_all_chains(chains_config_path, &benchmark_chains)?;
975 let known_chain_ids: HashSet<_> = benchmark_chains.iter().map(|(id, _)| *id).collect();
976 let unknown_chain_ids: Vec<_> = all_chains
977 .iter()
978 .filter(|id| !known_chain_ids.contains(id))
979 .copied()
980 .collect();
981 if !unknown_chain_ids.is_empty() {
982 for chain_id in &unknown_chain_ids {
986 self.client.get_chain_description(*chain_id).await?;
987 }
988 }
989
990 Ok(chain_clients)
991 }
992
993 pub async fn wrap_up_benchmark(
994 &mut self,
995 chain_clients: Vec<ChainClient<Env>>,
996 close_chains: bool,
997 wrap_up_max_in_flight: usize,
998 ) -> Result<(), Error> {
999 if close_chains {
1000 info!("Closing chains...");
1001 let stream = stream::iter(chain_clients)
1002 .map(|chain_client| async move {
1003 Benchmark::<Env>::close_benchmark_chain(&chain_client).await?;
1004 info!("Closed chain {:?}", chain_client.chain_id());
1005 Ok::<(), BenchmarkError>(())
1006 })
1007 .buffer_unordered(wrap_up_max_in_flight);
1008 stream.try_collect::<Vec<_>>().await?;
1009 } else {
1010 info!("Processing inbox for all chains...");
1011 let stream = stream::iter(chain_clients.clone())
1012 .map(|chain_client| async move {
1013 chain_client.process_inbox().await?;
1014 info!("Processed inbox for chain {:?}", chain_client.chain_id());
1015 Ok::<(), chain_client::Error>(())
1016 })
1017 .buffer_unordered(wrap_up_max_in_flight);
1018 stream.try_collect::<Vec<_>>().await?;
1019
1020 info!("Updating wallet from chain clients...");
1021 for chain_client in chain_clients {
1022 let info = chain_client.chain_info().await?;
1023 let client_owner = chain_client.preferred_owner();
1024 let pending_fast_proposal = chain_client
1025 .pending_proposal()
1026 .await
1027 .filter(|p| p.round.is_some_and(|r| r.is_fast()));
1028 self.wallet()
1029 .insert(
1030 info.chain_id,
1031 wallet::Chain {
1032 pending_fast_proposal,
1033 owner: client_owner,
1034 ..info.as_ref().into()
1035 },
1036 )
1037 .await
1038 .map_err(error::Inner::wallet)?;
1039 }
1040 }
1041
1042 Ok(())
1043 }
1044
1045 async fn process_inboxes_and_force_validator_updates(&mut self) {
1046 let mut join_set = task::JoinSet::new();
1047
1048 let chain_clients: Vec<_> = self
1049 .wallet()
1050 .owned_chain_ids()
1051 .map_err(|e| error::Inner::wallet(e).into())
1052 .and_then(|id| self.make_chain_client(id))
1053 .try_collect()
1054 .await
1055 .unwrap();
1056
1057 for chain_client in chain_clients {
1058 join_set.spawn(async move {
1059 Self::process_inbox_without_updating_wallet(&chain_client)
1060 .await
1061 .expect("Processing inbox should not fail!");
1062 chain_client
1063 });
1064 }
1065
1066 for chain_client in join_set.join_all().await {
1067 self.update_wallet_from_client(&chain_client).await.unwrap();
1068 }
1069 }
1070
1071 async fn process_inbox_without_updating_wallet(
1072 chain_client: &ChainClient<Env>,
1073 ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
1074 chain_client.synchronize_from_validators().await?;
1076 let (certificates, maybe_timeout) = chain_client.process_inbox_without_prepare().await?;
1077 assert!(
1078 maybe_timeout.is_none(),
1079 "Should not timeout within benchmark!"
1080 );
1081
1082 Ok(certificates)
1083 }
1084
1085 async fn make_benchmark_chains(
1091 &mut self,
1092 num_chains: usize,
1093 balance: Amount,
1094 pub_keys: Vec<AccountPublicKey>,
1095 wallet_only: bool,
1096 close_chains: bool,
1097 ) -> Result<(Vec<(ChainId, AccountOwner)>, Vec<ChainClient<Env>>), Error> {
1098 let mut chains_found_in_wallet = 0;
1099 let mut benchmark_chains = Vec::with_capacity(num_chains);
1100 let mut chain_clients = Vec::with_capacity(num_chains);
1101 let start = Instant::now();
1102
1103 if !close_chains || wallet_only {
1108 let mut owned_chain_ids = std::pin::pin!(self.wallet().owned_chain_ids());
1109 while let Some(chain_id) = owned_chain_ids.next().await {
1110 let chain_id = chain_id.map_err(error::Inner::wallet)?;
1111 if chains_found_in_wallet == num_chains {
1112 break;
1113 }
1114 let chain_client = self.make_chain_client(chain_id).await?;
1115 let ownership = chain_client.chain_info().await?.manager.ownership;
1116 if !ownership.owners.is_empty() || ownership.super_owners.len() != 1 {
1117 continue;
1118 }
1119 let owner = *ownership.super_owners.first().unwrap();
1120 chain_client.process_inbox().await?;
1121 benchmark_chains.push((chain_id, owner));
1122 chain_clients.push(chain_client);
1123 chains_found_in_wallet += 1;
1124 }
1125 info!(
1126 "Got {} chains from the wallet in {} ms",
1127 benchmark_chains.len(),
1128 start.elapsed().as_millis()
1129 );
1130 }
1131
1132 let num_chains_to_create = num_chains - chains_found_in_wallet;
1133
1134 let default_chain_client = self.make_chain_client(self.default_chain()).await?;
1135
1136 if num_chains_to_create > 0 {
1137 if wallet_only {
1138 return Err(
1139 error::Inner::Benchmark(BenchmarkError::NotEnoughChainsInWallet(
1140 num_chains,
1141 chains_found_in_wallet,
1142 ))
1143 .into(),
1144 );
1145 }
1146 let mut pub_keys_iter = pub_keys.into_iter().take(num_chains_to_create);
1147 let operations_per_block = 900; for i in (0..num_chains_to_create).step_by(operations_per_block) {
1149 let num_new_chains = operations_per_block.min(num_chains_to_create - i);
1150 let owners: Vec<AccountOwner> = (&mut pub_keys_iter)
1153 .take(num_new_chains)
1154 .map(|pk| pk.into())
1155 .collect();
1156
1157 let certificate = Self::execute_open_chains_operations(
1158 &default_chain_client,
1159 balance,
1160 owners.clone(),
1161 )
1162 .await?;
1163 info!("Block executed successfully");
1164
1165 let block = certificate.block();
1166 for (i, owner) in owners.into_iter().enumerate() {
1167 let chain_id = block.body.blobs[i]
1168 .iter()
1169 .find(|blob| blob.id().blob_type == BlobType::ChainDescription)
1170 .map(|blob| ChainId(blob.id().hash))
1171 .expect("failed to create a new chain");
1172 self.client
1173 .extend_chain_mode(chain_id, ListeningMode::FullChain);
1174
1175 let mut chain_client = self.client.create_chain_client(
1176 chain_id,
1177 None,
1178 BlockHeight::ZERO,
1179 &None,
1180 Some(owner),
1181 self.timing_sender(),
1182 false,
1183 );
1184 chain_client.set_preferred_owner(owner);
1185 chain_client.process_inbox().await?;
1186 benchmark_chains.push((chain_id, owner));
1187 chain_clients.push(chain_client);
1188 }
1189 }
1190
1191 info!(
1192 "Created {} chains in {} ms",
1193 num_chains_to_create,
1194 start.elapsed().as_millis()
1195 );
1196 }
1197
1198 if !close_chains {
1200 info!("Updating wallet from client");
1201 self.update_wallet_from_client(&default_chain_client)
1202 .await?;
1203 }
1204 info!("Retrying pending outgoing messages");
1205 default_chain_client
1206 .retry_pending_outgoing_messages()
1207 .await
1208 .context("outgoing messages to create the new chains should be delivered")?;
1209 info!("Processing default chain inbox");
1210 default_chain_client.process_inbox().await?;
1211
1212 assert_eq!(
1213 benchmark_chains.len(),
1214 chain_clients.len(),
1215 "benchmark_chains and chain_clients must have the same size"
1216 );
1217
1218 Ok((benchmark_chains, chain_clients))
1219 }
1220
1221 async fn execute_open_chains_operations(
1222 chain_client: &ChainClient<Env>,
1223 balance: Amount,
1224 owners: Vec<AccountOwner>,
1225 ) -> Result<ConfirmedBlockCertificate, Error> {
1226 let operations: Vec<_> = owners
1227 .iter()
1228 .map(|owner| {
1229 let config = OpenChainConfig {
1230 ownership: ChainOwnership::single_super(*owner),
1231 balance,
1232 application_permissions: Default::default(),
1233 };
1234 Operation::system(SystemOperation::OpenChain(config))
1235 })
1236 .collect();
1237 info!("Executing {} OpenChain operations", operations.len());
1238 Ok(chain_client
1239 .execute_operations(operations, vec![])
1240 .await?
1241 .expect("should execute block with OpenChain operations"))
1242 }
1243
1244 async fn supply_fungible_tokens(
1246 &mut self,
1247 key_pairs: &[(ChainId, AccountOwner)],
1248 application_id: ApplicationId,
1249 ) -> Result<(), Error> {
1250 let default_chain_id = self.default_chain();
1251 let default_key = self
1252 .wallet()
1253 .get(default_chain_id)
1254 .await
1255 .unwrap()
1256 .unwrap()
1257 .owner
1258 .unwrap();
1259 let amount = Amount::from_nanos(4);
1261 let operations: Vec<Operation> = key_pairs
1262 .iter()
1263 .map(|(chain_id, owner)| {
1264 fungible_transfer(application_id, *chain_id, default_key, *owner, amount)
1265 })
1266 .collect();
1267 let chain_client = self.make_chain_client(default_chain_id).await?;
1268 for operation_chunk in operations.chunks(1000) {
1270 chain_client
1271 .execute_operations(operation_chunk.to_vec(), vec![])
1272 .await?
1273 .expect("should execute block with Transfer operations");
1274 }
1275 self.update_wallet_from_client(&chain_client).await?;
1276
1277 Ok(())
1278 }
1279}