1use std::sync::Arc;
5
6#[cfg(not(web))]
7use futures::StreamExt as _;
8use futures::{Future, TryStreamExt as _};
9use linera_base::{
10 crypto::{CryptoHash, ValidatorPublicKey},
11 data_types::{ChainDescription, Epoch, Timestamp},
12 identifiers::{Account, AccountOwner, ChainId},
13 ownership::ChainOwnership,
14 time::{Duration, Instant},
15 util::future::FutureSyncExt as _,
16};
17use linera_chain::{manager::LockingBlock, types::ConfirmedBlockCertificate};
18use linera_core::{
19 client::{chain_client, ChainClient, Client, ListeningMode},
20 data_types::{ChainInfo, ChainInfoQuery, ClientOutcome},
21 join_set_ext::JoinSet,
22 node::ValidatorNode,
23 wallet, Environment, JoinSetExt as _, Wallet as _,
24};
25use linera_rpc::node_provider::{NodeOptions, NodeProvider};
26use linera_storage::Storage as _;
27use linera_version::VersionInfo;
28use thiserror_context::Context;
29use tracing::{debug, info, warn};
30#[cfg(not(web))]
31use {
32 crate::{
33 benchmark::{fungible_transfer, Benchmark, BenchmarkError},
34 client_metrics::ClientMetrics,
35 },
36 futures::stream,
37 linera_base::{
38 crypto::AccountPublicKey,
39 data_types::{Amount, BlockHeight},
40 identifiers::{ApplicationId, BlobType},
41 },
42 linera_execution::{
43 system::{OpenChainConfig, SystemOperation},
44 Operation,
45 },
46 std::{collections::HashSet, path::Path},
47 tokio::{sync::mpsc, task},
48};
49#[cfg(feature = "fs")]
50use {
51 linera_base::{
52 data_types::{BlobContent, Bytecode},
53 identifiers::ModuleId,
54 vm::VmRuntime,
55 },
56 linera_core::client::create_bytecode_blobs,
57 std::{fs, path::PathBuf},
58};
59
60use crate::{
61 chain_listener::{self, ClientContext as _, ClientContextExt as _},
62 client_options::{ChainOwnershipConfig, Options},
63 config::GenesisConfig,
64 error, util, Error,
65};
66
67pub struct ValidatorQueryResults {
69 pub version_info: Result<VersionInfo, Error>,
71 pub genesis_config_hash: Result<CryptoHash, Error>,
73 pub chain_info: Result<ChainInfo, Error>,
75}
76
77impl ValidatorQueryResults {
78 pub fn errors(&self) -> Vec<&Error> {
80 let mut errors = Vec::new();
81 if let Err(e) = &self.version_info {
82 errors.push(e);
83 }
84 if let Err(e) = &self.genesis_config_hash {
85 errors.push(e);
86 }
87 if let Err(e) = &self.chain_info {
88 errors.push(e);
89 }
90 errors
91 }
92
93 pub fn print(
98 &self,
99 public_key: Option<&ValidatorPublicKey>,
100 address: Option<&str>,
101 weight: Option<u64>,
102 reference: Option<&ValidatorQueryResults>,
103 ) {
104 if let Some(key) = public_key {
105 println!("Public key: {key}");
106 }
107 if let Some(address) = address {
108 println!("Address: {address}");
109 }
110 if let Some(w) = weight {
111 println!("Weight: {w}");
112 }
113
114 let ref_version = reference.and_then(|ref_results| ref_results.version_info.as_ref().ok());
115 match &self.version_info {
116 Ok(version_info) => {
117 if ref_version.is_none_or(|ref_v| ref_v.crate_version != version_info.crate_version)
118 {
119 println!("Linera protocol: v{}", version_info.crate_version);
120 }
121 if ref_version.is_none_or(|ref_v| ref_v.rpc_hash != version_info.rpc_hash) {
122 println!("RPC API hash: {}", version_info.rpc_hash);
123 }
124 if ref_version.is_none_or(|ref_v| ref_v.graphql_hash != version_info.graphql_hash) {
125 println!("GraphQL API hash: {}", version_info.graphql_hash);
126 }
127 if ref_version.is_none_or(|ref_v| ref_v.wit_hash != version_info.wit_hash) {
128 println!("WIT API hash: {}", version_info.wit_hash);
129 }
130 if ref_version.is_none_or(|ref_v| {
131 (&ref_v.git_commit, ref_v.git_dirty)
132 != (&version_info.git_commit, version_info.git_dirty)
133 }) {
134 println!(
135 "Source code: {}/tree/{}{}",
136 env!("CARGO_PKG_REPOSITORY"),
137 version_info.git_commit,
138 if version_info.git_dirty {
139 " (dirty)"
140 } else {
141 ""
142 }
143 );
144 }
145 }
146 Err(err) => println!("Error getting version info: {err}"),
147 }
148
149 let ref_genesis_hash =
150 reference.and_then(|ref_results| ref_results.genesis_config_hash.as_ref().ok());
151 match &self.genesis_config_hash {
152 Ok(hash) if ref_genesis_hash.is_some_and(|ref_hash| ref_hash == hash) => {}
153 Ok(hash) => println!("Genesis config hash: {hash}"),
154 Err(err) => println!("Error getting genesis config: {err}"),
155 }
156
157 let ref_info = reference.and_then(|ref_results| ref_results.chain_info.as_ref().ok());
158 match &self.chain_info {
159 Ok(info) => {
160 if ref_info.is_none_or(|ref_info| info.block_hash != ref_info.block_hash) {
161 if let Some(hash) = info.block_hash {
162 println!("Block hash: {hash}");
163 } else {
164 println!("Block hash: None");
165 }
166 }
167 if ref_info
168 .is_none_or(|ref_info| info.next_block_height != ref_info.next_block_height)
169 {
170 println!("Next height: {}", info.next_block_height);
171 }
172 if ref_info.is_none_or(|ref_info| info.timestamp != ref_info.timestamp) {
173 println!("Timestamp: {}", info.timestamp);
174 }
175 if ref_info.is_none_or(|ref_info| info.epoch != ref_info.epoch) {
176 println!("Epoch: {}", info.epoch);
177 }
178 if ref_info.is_none_or(|ref_info| {
179 info.manager.current_round != ref_info.manager.current_round
180 }) {
181 println!("Round: {}", info.manager.current_round);
182 }
183 if let Some(leader) = info.manager.leader {
184 println!("Leader: {leader}");
185 }
186 if let Some(locking) = &info.manager.requested_locking {
187 match &**locking {
188 LockingBlock::Fast(proposal) => {
189 println!(
190 "Locking fast block from {}",
191 proposal.content.block.timestamp
192 );
193 }
194 LockingBlock::Regular(validated) => {
195 println!(
196 "Locking block {} in {} from {}",
197 validated.hash(),
198 validated.round,
199 validated.block().header.timestamp
200 );
201 }
202 }
203 }
204 }
205 Err(err) => println!("Error getting chain info: {err}"),
206 }
207 println!();
208 }
209}
210
211pub struct ClientContext<Env: Environment> {
212 pub client: Arc<Client<Env>>,
213 pub genesis_config: crate::config::GenesisConfig,
215 pub send_timeout: Duration,
216 pub recv_timeout: Duration,
217 pub retry_delay: Duration,
218 pub max_retries: u32,
219 pub max_backoff: Duration,
220 pub chain_listeners: JoinSet,
221 pub default_chain: Option<ChainId>,
223 #[cfg(not(web))]
224 pub client_metrics: Option<ClientMetrics>,
225}
226
227impl<Env: Environment> chain_listener::ClientContext for ClientContext<Env> {
228 type Environment = Env;
229
230 fn wallet(&self) -> &Env::Wallet {
231 self.client.wallet()
232 }
233
234 fn storage(&self) -> &Env::Storage {
235 self.client.storage_client()
236 }
237
238 fn client(&self) -> &Arc<Client<Env>> {
239 &self.client
240 }
241
242 #[cfg(not(web))]
243 fn timing_sender(
244 &self,
245 ) -> Option<mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
246 self.client_metrics
247 .as_ref()
248 .map(|metrics| metrics.timing_sender.clone())
249 }
250
251 async fn update_wallet_for_new_chain(
252 &mut self,
253 chain_id: ChainId,
254 owner: Option<AccountOwner>,
255 timestamp: Timestamp,
256 epoch: Epoch,
257 ) -> Result<(), Error> {
258 self.update_wallet_for_new_chain(chain_id, owner, timestamp, epoch)
259 .make_sync()
260 .await
261 }
262
263 async fn update_wallet(&mut self, client: &ChainClient<Env>) -> Result<(), Error> {
264 self.update_wallet_from_client(client).make_sync().await
265 }
266}
267
268impl<S, Si, W> ClientContext<linera_core::environment::Impl<S, NodeProvider, Si, W>>
269where
270 S: linera_core::environment::Storage,
271 Si: linera_core::environment::Signer,
272 W: linera_core::environment::Wallet,
273{
274 #[expect(clippy::too_many_arguments)]
278 pub async fn new(
279 storage: S,
280 wallet: W,
281 signer: Si,
282 options: &Options,
283 default_chain: Option<ChainId>,
284 genesis_config: GenesisConfig,
285 block_cache_size: usize,
286 execution_state_cache_size: usize,
287 ) -> Result<Self, Error> {
288 #[cfg(not(web))]
289 let timing_config = options.to_timing_config();
290 let node_provider = NodeProvider::new(NodeOptions {
291 send_timeout: options.send_timeout,
292 recv_timeout: options.recv_timeout,
293 retry_delay: options.retry_delay,
294 max_retries: options.max_retries,
295 max_backoff: options.max_backoff,
296 });
297 let chain_modes: Vec<_> = wallet
298 .items()
299 .map_ok(|(id, chain)| {
300 let mode = if chain.is_follow_only() {
301 ListeningMode::FollowChain
302 } else {
303 ListeningMode::FullChain
304 };
305 (id, mode)
306 })
307 .try_collect()
308 .await
309 .map_err(error::Inner::wallet)?;
310 let name = match chain_modes.len() {
311 0 => "Client node".to_string(),
312 1 => format!("Client node for {:.8}", chain_modes[0].0),
313 n => format!(
314 "Client node for {:.8} and {} others",
315 chain_modes[0].0,
316 n - 1
317 ),
318 };
319
320 let client = Client::new(
321 linera_core::environment::Impl {
322 network: node_provider,
323 storage,
324 signer,
325 wallet,
326 },
327 genesis_config.admin_chain_id(),
328 options.long_lived_services,
329 chain_modes,
330 name,
331 util::non_zero_duration(options.chain_worker_ttl),
332 util::non_zero_duration(options.sender_chain_worker_ttl),
333 options.cross_chain_batch_size_limit,
334 options.to_chain_client_options(),
335 block_cache_size,
336 execution_state_cache_size,
337 &options.to_requests_scheduler_config(),
338 );
339
340 #[cfg(not(web))]
341 let client_metrics = if timing_config.enabled {
342 Some(ClientMetrics::new(timing_config))
343 } else {
344 None
345 };
346
347 Ok(ClientContext {
348 client: Arc::new(client),
349 default_chain,
350 genesis_config,
351 send_timeout: options.send_timeout,
352 recv_timeout: options.recv_timeout,
353 retry_delay: options.retry_delay,
354 max_retries: options.max_retries,
355 max_backoff: options.max_backoff,
356 chain_listeners: JoinSet::default(),
357 #[cfg(not(web))]
358 client_metrics,
359 })
360 }
361}
362
363impl<Env: Environment> ClientContext<Env> {
364 pub fn wallet(&self) -> &Env::Wallet {
368 self.client.wallet()
369 }
370
371 pub fn admin_chain_id(&self) -> ChainId {
373 self.client.admin_chain_id()
374 }
375
376 pub fn default_account(&self) -> Account {
379 Account::chain(self.default_chain())
380 }
381
382 pub fn default_chain(&self) -> ChainId {
384 self.default_chain
385 .expect("default chain requested but none set")
386 }
387
388 pub async fn first_non_admin_chain(&self) -> Result<ChainId, Error> {
389 let admin_chain_id = self.admin_chain_id();
390 let chain_ids = self
391 .wallet()
392 .chain_ids()
393 .try_filter(|chain_id| futures::future::ready(*chain_id != admin_chain_id))
394 .try_collect::<Vec<ChainId>>()
395 .await
396 .map_err(Error::wallet)?;
397 Ok(chain_ids
398 .into_iter()
399 .min()
400 .expect("No non-admin chain specified in wallet with no non-admin chain"))
401 }
402
403 pub fn make_node_provider(&self) -> NodeProvider {
405 NodeProvider::new(self.make_node_options())
406 }
407
408 fn make_node_options(&self) -> NodeOptions {
409 NodeOptions {
410 send_timeout: self.send_timeout,
411 recv_timeout: self.recv_timeout,
412 retry_delay: self.retry_delay,
413 max_retries: self.max_retries,
414 max_backoff: self.max_backoff,
415 }
416 }
417
418 #[cfg(not(web))]
419 pub fn client_metrics(&self) -> Option<&ClientMetrics> {
420 self.client_metrics.as_ref()
421 }
422
423 pub async fn update_wallet_from_client<Env_: Environment>(
424 &self,
425 client: &ChainClient<Env_>,
426 ) -> Result<(), Error> {
427 let info = client.chain_info().await?;
428 let chain_id = info.chain_id;
429 let existing_owner = self
430 .wallet()
431 .get(chain_id)
432 .await
433 .map_err(error::Inner::wallet)?
434 .and_then(|chain| chain.owner);
435
436 let pending_fast_proposal = client
439 .pending_proposal()
440 .await
441 .filter(|p| p.round.is_some_and(|r| r.is_fast()));
442 let new_chain = wallet::Chain {
443 pending_fast_proposal,
444 owner: existing_owner,
445 ..info.as_ref().into()
446 };
447
448 self.wallet()
449 .insert(chain_id, new_chain)
450 .await
451 .map_err(error::Inner::wallet)?;
452
453 Ok(())
454 }
455
456 pub async fn update_wallet_for_new_chain(
458 &mut self,
459 chain_id: ChainId,
460 owner: Option<AccountOwner>,
461 timestamp: Timestamp,
462 epoch: Epoch,
463 ) -> Result<(), Error> {
464 self.wallet()
465 .try_insert(
466 chain_id,
467 linera_core::wallet::Chain::new(owner, epoch, timestamp),
468 )
469 .await
470 .map_err(error::Inner::wallet)?;
471 Ok(())
472 }
473
474 pub async fn extend_with_chain(
477 &mut self,
478 description: ChainDescription,
479 owner: Option<AccountOwner>,
480 ) -> Result<(), Error> {
481 let chain_id = description.id();
482 self.client
483 .storage_client()
484 .create_chain(description.clone())
485 .await?;
486 self.wallet()
487 .try_insert(
488 chain_id,
489 linera_core::wallet::Chain::new(
490 owner,
491 description.config().epoch,
492 description.timestamp(),
493 ),
494 )
495 .await
496 .map_err(error::Inner::wallet)?;
497 self.client
498 .extend_chain_mode(chain_id, ListeningMode::FullChain);
499 Ok(())
500 }
501
502 pub async fn process_inbox(
503 &mut self,
504 chain_client: &ChainClient<Env>,
505 ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
506 let mut certificates = Vec::new();
507 let (new_certificates, maybe_timeout) = {
509 chain_client.synchronize_from_validators().await?;
510 let result = chain_client.process_inbox_without_prepare().await;
511 self.update_wallet_from_client(chain_client).await?;
512 result?
513 };
514 certificates.extend(new_certificates);
515 if maybe_timeout.is_none() {
516 return Ok(certificates);
517 }
518
519 let (listener, _listen_handle, mut notification_stream) = chain_client.listen().await?;
521 self.chain_listeners.spawn_task(listener);
522
523 loop {
524 let (new_certificates, maybe_timeout) = {
525 let result = chain_client.process_inbox().await;
526 self.update_wallet_from_client(chain_client).await?;
527 result?
528 };
529 certificates.extend(new_certificates);
530 if let Some(timestamp) = maybe_timeout {
531 util::wait_for_next_round(&mut notification_stream, timestamp).await
532 } else {
533 return Ok(certificates);
534 }
535 }
536 }
537
538 pub async fn assign_new_chain_to_key(
539 &mut self,
540 chain_id: ChainId,
541 owner: AccountOwner,
542 ) -> Result<(), Error> {
543 self.client
544 .extend_chain_mode(chain_id, ListeningMode::FullChain);
545 let client = self.make_chain_client(chain_id).await?;
546 let info = client.prepare_for_owner(owner).await.map_err(|error| {
547 tracing::error!(%chain_id, %owner, %error, "Chain is not owned");
548 error::Inner::ChainOwnership
549 })?;
550
551 let modified = self
553 .wallet()
554 .modify(chain_id, |chain| chain.owner = Some(owner))
555 .await
556 .map_err(error::Inner::wallet)?;
557 if modified.is_none() {
559 self.wallet()
560 .insert(
561 chain_id,
562 wallet::Chain {
563 owner: Some(owner),
564 timestamp: info.timestamp,
565 epoch: Some(info.epoch),
566 ..Default::default()
567 },
568 )
569 .await
570 .map_err(error::Inner::wallet)
571 .context("assigning new chain")?;
572 }
573 Ok(())
574 }
575
576 pub async fn apply_client_command<E, F, Fut, T>(
581 &mut self,
582 client: &ChainClient<Env>,
583 mut f: F,
584 ) -> Result<T, Error>
585 where
586 F: FnMut(&ChainClient<Env>) -> Fut,
587 Fut: Future<Output = Result<ClientOutcome<T>, E>>,
588 Error: From<E>,
589 {
590 client.prepare_chain().await?;
591 let result = f(client).await;
593 self.update_wallet_from_client(client).await?;
594 match result? {
595 ClientOutcome::Committed(t) => return Ok(t),
596 ClientOutcome::Conflict(certificate) => {
597 return Err(chain_client::Error::Conflict(certificate.hash()).into());
598 }
599 ClientOutcome::WaitForTimeout(_) => {}
600 }
601
602 let (listener, _listen_handle, mut notification_stream) = client.listen().await?;
604 self.chain_listeners.spawn_task(listener);
605
606 loop {
607 let result = f(client).await;
609 self.update_wallet_from_client(client).await?;
610 let timeout = match result? {
611 ClientOutcome::Committed(t) => return Ok(t),
612 ClientOutcome::Conflict(certificate) => {
613 return Err(chain_client::Error::Conflict(certificate.hash()).into());
614 }
615 ClientOutcome::WaitForTimeout(timeout) => timeout,
616 };
617 util::wait_for_next_round(&mut notification_stream, timeout).await;
619 }
620 }
621
622 pub async fn ownership(&mut self, chain_id: Option<ChainId>) -> Result<ChainOwnership, Error> {
623 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
624 let client = self.make_chain_client(chain_id).await?;
625 let info = client.chain_info().await?;
626 Ok(info.manager.ownership)
627 }
628
629 pub async fn change_ownership(
630 &mut self,
631 chain_id: Option<ChainId>,
632 ownership_config: ChainOwnershipConfig,
633 ) -> Result<(), Error> {
634 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
635 let mut chain_client = self.make_chain_client(chain_id).await?;
636 info!(
637 ?ownership_config, %chain_id, preferred_owner=?chain_client.preferred_owner(),
638 "Changing ownership of a chain"
639 );
640 let time_start = Instant::now();
641 let mut ownership = chain_client.query_chain_ownership().await?;
642 ownership_config.update(&mut ownership)?;
643
644 if ownership.super_owners.is_empty() && ownership.owners.is_empty() {
645 tracing::error!("At least one owner or super owner of the chain has to be set.");
646 return Err(error::Inner::ChainOwnership.into());
647 }
648
649 let certificate = self
650 .apply_client_command(&chain_client, |chain_client| {
651 let ownership = ownership.clone();
652 let chain_client = chain_client.clone();
653 async move {
654 chain_client
655 .change_ownership(ownership)
656 .await
657 .map_err(Error::from)
658 .context("Failed to change ownership")
659 }
660 })
661 .await?;
662 let time_total = time_start.elapsed();
663 info!("Operation confirmed after {} ms", time_total.as_millis());
664 debug!("{:?}", certificate);
665 self.maybe_auto_assign_preferred_owner(&mut chain_client, &ownership)
666 .await?;
667 Ok(())
668 }
669
670 pub async fn set_preferred_owner(
671 &mut self,
672 chain_id: Option<ChainId>,
673 preferred_owner: AccountOwner,
674 ) -> Result<(), Error> {
675 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
676 let mut chain_client = self.make_chain_client(chain_id).await?;
677 let old_owner = chain_client.preferred_owner();
678 info!(%chain_id, ?old_owner, %preferred_owner, "Changing preferred owner for chain");
679 chain_client.set_preferred_owner(preferred_owner);
680 self.update_wallet_from_client(&chain_client).await?;
681 info!("New preferred owner set");
682 Ok(())
683 }
684
685 pub async fn check_compatible_version_info(
686 &self,
687 address: &str,
688 node: &impl ValidatorNode,
689 ) -> Result<VersionInfo, Error> {
690 match node.get_version_info().await {
691 Ok(version_info) if version_info.is_compatible_with(&linera_version::VERSION_INFO) => {
692 debug!(
693 "Version information for validator {address}: {}",
694 version_info
695 );
696 Ok(version_info)
697 }
698 Ok(version_info) => Err(error::Inner::UnexpectedVersionInfo {
699 remote: Box::new(version_info),
700 local: Box::new(linera_version::VERSION_INFO.clone()),
701 }
702 .into()),
703 Err(error) => Err(error::Inner::UnavailableVersionInfo {
704 address: address.to_string(),
705 error: Box::new(error),
706 }
707 .into()),
708 }
709 }
710
711 pub async fn check_matching_network_description(
712 &self,
713 address: &str,
714 node: &impl ValidatorNode,
715 ) -> Result<CryptoHash, Error> {
716 let network_description = self.genesis_config.network_description();
717 match node.get_network_description().await {
718 Ok(description) => {
719 if description == network_description {
720 Ok(description.genesis_config_hash)
721 } else {
722 Err(error::Inner::UnexpectedNetworkDescription {
723 remote: Box::new(description),
724 local: Box::new(network_description),
725 }
726 .into())
727 }
728 }
729 Err(error) => Err(error::Inner::UnavailableNetworkDescription {
730 address: address.to_string(),
731 error: Box::new(error),
732 }
733 .into()),
734 }
735 }
736
737 pub async fn check_validator_chain_info_response(
738 &self,
739 public_key: Option<&ValidatorPublicKey>,
740 address: &str,
741 node: &impl ValidatorNode,
742 chain_id: ChainId,
743 ) -> Result<ChainInfo, Error> {
744 let query = ChainInfoQuery::new(chain_id).with_manager_values();
745 match node.handle_chain_info_query(query).await {
746 Ok(response) => {
747 debug!(
748 "Validator {address} sees chain {chain_id} at block height {} and epoch {:?}",
749 response.info.next_block_height, response.info.epoch,
750 );
751 if let Some(public_key) = public_key {
752 if response.check(*public_key).is_ok() {
753 debug!("Signature for public key {public_key} is OK.");
754 } else {
755 return Err(error::Inner::InvalidSignature {
756 public_key: *public_key,
757 }
758 .into());
759 }
760 } else {
761 warn!("Not checking signature as public key was not given");
762 }
763 Ok(*response.info)
764 }
765 Err(error) => Err(error::Inner::UnavailableChainInfo {
766 address: address.to_string(),
767 chain_id,
768 error: Box::new(error),
769 }
770 .into()),
771 }
772 }
773
774 pub async fn query_validator(
778 &self,
779 address: &str,
780 node: &impl ValidatorNode,
781 chain_id: ChainId,
782 public_key: Option<&ValidatorPublicKey>,
783 ) -> ValidatorQueryResults {
784 let version_info = self.check_compatible_version_info(address, node).await;
785 let genesis_config_hash = self.check_matching_network_description(address, node).await;
786 let chain_info = self
787 .check_validator_chain_info_response(public_key, address, node, chain_id)
788 .await;
789
790 ValidatorQueryResults {
791 version_info,
792 genesis_config_hash,
793 chain_info,
794 }
795 }
796
797 pub async fn query_local_node(
801 &self,
802 chain_id: ChainId,
803 ) -> Result<ValidatorQueryResults, Error> {
804 let version_info = Ok(linera_version::VERSION_INFO.clone());
805 let genesis_config_hash = Ok(self
806 .genesis_config
807 .network_description()
808 .genesis_config_hash);
809 let chain_info = self
810 .make_chain_client(chain_id)
811 .await?
812 .chain_info_with_manager_values()
813 .await
814 .map(|info| *info)
815 .map_err(|e| e.into());
816
817 Ok(ValidatorQueryResults {
818 version_info,
819 genesis_config_hash,
820 chain_info,
821 })
822 }
823}
824
825#[cfg(feature = "fs")]
826impl<Env: Environment> ClientContext<Env> {
827 pub async fn publish_module(
828 &mut self,
829 chain_client: &ChainClient<Env>,
830 contract: PathBuf,
831 service: PathBuf,
832 vm_runtime: VmRuntime,
833 formats: Option<PathBuf>,
834 ) -> Result<ModuleId, Error> {
835 info!("Loading bytecode files");
836 let contract_bytecode = Bytecode::load_from_file(&contract).await.map_err(|e| {
837 std::io::Error::new(
838 e.kind(),
839 format!("failed to load contract bytecode from {contract:?}: {e}"),
840 )
841 })?;
842 let service_bytecode = Bytecode::load_from_file(&service).await.map_err(|e| {
843 std::io::Error::new(
844 e.kind(),
845 format!("failed to load service bytecode from {service:?}: {e}"),
846 )
847 })?;
848
849 let formats_bytes = match formats {
850 Some(path) => Some(load_formats_from_snap(&path)?),
851 None => None,
852 };
853
854 info!("Publishing module");
855 let (blobs, module_id) = create_bytecode_blobs(
856 contract_bytecode,
857 service_bytecode,
858 vm_runtime,
859 formats_bytes,
860 )
861 .await;
862 let (module_id, _) = self
863 .apply_client_command(chain_client, |chain_client| {
864 let blobs = blobs.clone();
865 let chain_client = chain_client.clone();
866 async move {
867 chain_client
868 .publish_module_blobs(blobs, module_id)
869 .await
870 .context("Failed to publish module")
871 }
872 })
873 .await?;
874
875 info!("{}", "Module published successfully!");
876
877 info!("Synchronizing client and processing inbox");
878 self.process_inbox(chain_client).await?;
879 Ok(module_id)
880 }
881
882 pub async fn publish_data_blob(
883 &mut self,
884 chain_client: &ChainClient<Env>,
885 blob_path: PathBuf,
886 ) -> Result<CryptoHash, Error> {
887 info!("Loading data blob file");
888 let blob_bytes = fs::read(&blob_path).map_err(|e| {
889 std::io::Error::new(
890 e.kind(),
891 format!("failed to load data blob bytes from {blob_path:?}: {e}"),
892 )
893 })?;
894
895 info!("Publishing data blob");
896 self.apply_client_command(chain_client, |chain_client| {
897 let blob_bytes = blob_bytes.clone();
898 let chain_client = chain_client.clone();
899 async move {
900 chain_client
901 .publish_data_blob(blob_bytes)
902 .await
903 .context("Failed to publish data blob")
904 }
905 })
906 .await?;
907
908 info!("{}", "Data blob published successfully!");
909 Ok(CryptoHash::new(&BlobContent::new_data(blob_bytes)))
910 }
911
912 pub async fn read_data_blob(
914 &mut self,
915 chain_client: &ChainClient<Env>,
916 hash: CryptoHash,
917 ) -> Result<(), Error> {
918 info!("Verifying data blob");
919 self.apply_client_command(chain_client, |chain_client| {
920 let chain_client = chain_client.clone();
921 async move {
922 chain_client
923 .read_data_blob(hash)
924 .await
925 .context("Failed to verify data blob")
926 }
927 })
928 .await?;
929
930 info!("{}", "Data blob verified successfully!");
931 Ok(())
932 }
933}
934
935#[cfg(feature = "fs")]
939fn load_formats_from_snap(path: &std::path::Path) -> Result<Vec<u8>, Error> {
940 let content = fs::read_to_string(path).map_err(|e| {
941 std::io::Error::new(e.kind(), format!("failed to read SNAP file {path:?}: {e}"))
942 })?;
943 let body = strip_snap_frontmatter(&content).ok_or_else(|| {
944 std::io::Error::new(
945 std::io::ErrorKind::InvalidData,
946 format!("SNAP file {path:?} is missing the `---` frontmatter delimiters"),
947 )
948 })?;
949 let formats: linera_sdk::formats::Formats = serde_yaml_08::from_str(body).map_err(|e| {
950 std::io::Error::new(
951 std::io::ErrorKind::InvalidData,
952 format!("failed to parse SNAP body in {path:?} as Formats: {e}"),
953 )
954 })?;
955 serde_json::to_vec(&formats).map_err(|e| {
956 std::io::Error::new(
957 std::io::ErrorKind::InvalidData,
958 format!("failed to serialize Formats as JSON: {e}"),
959 )
960 .into()
961 })
962}
963
964#[cfg(feature = "fs")]
965fn strip_snap_frontmatter(content: &str) -> Option<&str> {
966 let rest = content.strip_prefix("---\n")?;
967 let end = rest.find("\n---\n")?;
968 Some(&rest[end + "\n---\n".len()..])
969}
970
971#[cfg(not(web))]
972impl<Env: Environment> ClientContext<Env> {
973 pub async fn prepare_for_benchmark(
974 &mut self,
975 num_chains: usize,
976 tokens_per_chain: Amount,
977 fungible_application_id: Option<ApplicationId>,
978 pub_keys: Vec<AccountPublicKey>,
979 chains_config_path: Option<&Path>,
980 close_chains: bool,
981 ) -> Result<Vec<ChainClient<Env>>, Error> {
982 let start = Instant::now();
983 self.process_inboxes_and_force_validator_updates().await;
987 info!(
988 "Processed inboxes and forced validator updates in {} ms",
989 start.elapsed().as_millis()
990 );
991
992 let start = Instant::now();
993 let (benchmark_chains, chain_clients) = self
994 .make_benchmark_chains(
995 num_chains,
996 tokens_per_chain,
997 pub_keys,
998 chains_config_path.is_some(),
999 close_chains,
1000 )
1001 .await?;
1002 info!(
1003 "Got {} chains in {} ms",
1004 num_chains,
1005 start.elapsed().as_millis()
1006 );
1007
1008 if let Some(id) = fungible_application_id {
1009 let start = Instant::now();
1010 self.supply_fungible_tokens(&benchmark_chains, id).await?;
1011 info!(
1012 "Supplied fungible tokens in {} ms",
1013 start.elapsed().as_millis()
1014 );
1015 let start = Instant::now();
1017 for chain_client in &chain_clients {
1018 chain_client.process_inbox().await?;
1019 }
1020 info!(
1021 "Processed inboxes after supplying fungible tokens in {} ms",
1022 start.elapsed().as_millis()
1023 );
1024 }
1025
1026 let all_chains = Benchmark::<Env>::get_all_chains(chains_config_path, &benchmark_chains)?;
1027 let known_chain_ids: HashSet<_> = benchmark_chains.iter().map(|(id, _)| *id).collect();
1028 let unknown_chain_ids: Vec<_> = all_chains
1029 .iter()
1030 .filter(|id| !known_chain_ids.contains(id))
1031 .copied()
1032 .collect();
1033 if !unknown_chain_ids.is_empty() {
1034 for chain_id in &unknown_chain_ids {
1038 self.client.get_chain_description(*chain_id).await?;
1039 }
1040 }
1041
1042 Ok(chain_clients)
1043 }
1044
1045 pub async fn wrap_up_benchmark(
1046 &mut self,
1047 chain_clients: Vec<ChainClient<Env>>,
1048 close_chains: bool,
1049 wrap_up_max_in_flight: usize,
1050 ) -> Result<(), Error> {
1051 if close_chains {
1052 info!("Closing chains...");
1053 let stream = stream::iter(chain_clients)
1054 .map(|chain_client| async move {
1055 Benchmark::<Env>::close_benchmark_chain(&chain_client).await?;
1056 info!("Closed chain {:?}", chain_client.chain_id());
1057 Ok::<(), BenchmarkError>(())
1058 })
1059 .buffer_unordered(wrap_up_max_in_flight);
1060 stream.try_collect::<Vec<_>>().await?;
1061 } else {
1062 info!("Processing inbox for all chains...");
1063 let stream = stream::iter(chain_clients.clone())
1064 .map(|chain_client| async move {
1065 chain_client.process_inbox().await?;
1066 info!("Processed inbox for chain {:?}", chain_client.chain_id());
1067 Ok::<(), chain_client::Error>(())
1068 })
1069 .buffer_unordered(wrap_up_max_in_flight);
1070 stream.try_collect::<Vec<_>>().await?;
1071
1072 info!("Updating wallet from chain clients...");
1073 for chain_client in chain_clients {
1074 let info = chain_client.chain_info().await?;
1075 let client_owner = chain_client.preferred_owner();
1076 let pending_fast_proposal = chain_client
1077 .pending_proposal()
1078 .await
1079 .filter(|p| p.round.is_some_and(|r| r.is_fast()));
1080 self.wallet()
1081 .insert(
1082 info.chain_id,
1083 wallet::Chain {
1084 pending_fast_proposal,
1085 owner: client_owner,
1086 ..info.as_ref().into()
1087 },
1088 )
1089 .await
1090 .map_err(error::Inner::wallet)?;
1091 }
1092 }
1093
1094 Ok(())
1095 }
1096
1097 async fn process_inboxes_and_force_validator_updates(&mut self) {
1098 let mut join_set = task::JoinSet::new();
1099
1100 let chain_clients: Vec<_> = self
1101 .wallet()
1102 .owned_chain_ids()
1103 .map_err(|e| error::Inner::wallet(e).into())
1104 .and_then(|id| self.make_chain_client(id))
1105 .try_collect()
1106 .await
1107 .unwrap();
1108
1109 for chain_client in chain_clients {
1110 join_set.spawn(async move {
1111 Self::process_inbox_without_updating_wallet(&chain_client)
1112 .await
1113 .expect("Processing inbox should not fail!");
1114 chain_client
1115 });
1116 }
1117
1118 for chain_client in join_set.join_all().await {
1119 self.update_wallet_from_client(&chain_client).await.unwrap();
1120 }
1121 }
1122
1123 async fn process_inbox_without_updating_wallet(
1124 chain_client: &ChainClient<Env>,
1125 ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
1126 chain_client.synchronize_from_validators().await?;
1128 let (certificates, maybe_timeout) = chain_client.process_inbox_without_prepare().await?;
1129 assert!(
1130 maybe_timeout.is_none(),
1131 "Should not timeout within benchmark!"
1132 );
1133
1134 Ok(certificates)
1135 }
1136
1137 async fn make_benchmark_chains(
1143 &mut self,
1144 num_chains: usize,
1145 balance: Amount,
1146 pub_keys: Vec<AccountPublicKey>,
1147 wallet_only: bool,
1148 close_chains: bool,
1149 ) -> Result<(Vec<(ChainId, AccountOwner)>, Vec<ChainClient<Env>>), Error> {
1150 let mut chains_found_in_wallet = 0;
1151 let mut benchmark_chains = Vec::with_capacity(num_chains);
1152 let mut chain_clients = Vec::with_capacity(num_chains);
1153 let start = Instant::now();
1154
1155 if !close_chains || wallet_only {
1160 let mut owned_chain_ids = std::pin::pin!(self.wallet().owned_chain_ids());
1161 while let Some(chain_id) = owned_chain_ids.next().await {
1162 let chain_id = chain_id.map_err(error::Inner::wallet)?;
1163 if chains_found_in_wallet == num_chains {
1164 break;
1165 }
1166 let chain_client = self.make_chain_client(chain_id).await?;
1167 let ownership = chain_client.chain_info().await?.manager.ownership;
1168 if !ownership.owners.is_empty() || ownership.super_owners.len() != 1 {
1169 continue;
1170 }
1171 let owner = *ownership.super_owners.first().unwrap();
1172 chain_client.process_inbox().await?;
1173 benchmark_chains.push((chain_id, owner));
1174 chain_clients.push(chain_client);
1175 chains_found_in_wallet += 1;
1176 }
1177 info!(
1178 "Got {} chains from the wallet in {} ms",
1179 benchmark_chains.len(),
1180 start.elapsed().as_millis()
1181 );
1182 }
1183
1184 let num_chains_to_create = num_chains - chains_found_in_wallet;
1185
1186 let default_chain_client = self.make_chain_client(self.default_chain()).await?;
1187
1188 if num_chains_to_create > 0 {
1189 if wallet_only {
1190 return Err(
1191 error::Inner::Benchmark(BenchmarkError::NotEnoughChainsInWallet(
1192 num_chains,
1193 chains_found_in_wallet,
1194 ))
1195 .into(),
1196 );
1197 }
1198 let mut pub_keys_iter = pub_keys.into_iter().take(num_chains_to_create);
1199 let operations_per_block = 900; for i in (0..num_chains_to_create).step_by(operations_per_block) {
1201 let num_new_chains = operations_per_block.min(num_chains_to_create - i);
1202 let owners: Vec<AccountOwner> = (&mut pub_keys_iter)
1205 .take(num_new_chains)
1206 .map(|pk| pk.into())
1207 .collect();
1208
1209 let certificate = Self::execute_open_chains_operations(
1210 &default_chain_client,
1211 balance,
1212 owners.clone(),
1213 )
1214 .await?;
1215 info!("Block executed successfully");
1216
1217 let block = certificate.block();
1218 for (i, owner) in owners.into_iter().enumerate() {
1219 let chain_id = block.body.blobs[i]
1220 .iter()
1221 .find(|blob| blob.id().blob_type == BlobType::ChainDescription)
1222 .map(|blob| ChainId(blob.id().hash))
1223 .expect("failed to create a new chain");
1224 self.client
1225 .extend_chain_mode(chain_id, ListeningMode::FullChain);
1226
1227 let mut chain_client = self.client.create_chain_client(
1228 chain_id,
1229 None,
1230 BlockHeight::ZERO,
1231 &None,
1232 Some(owner),
1233 self.timing_sender(),
1234 false,
1235 );
1236 chain_client.set_preferred_owner(owner);
1237 chain_client.process_inbox().await?;
1238 benchmark_chains.push((chain_id, owner));
1239 chain_clients.push(chain_client);
1240 }
1241 }
1242
1243 info!(
1244 "Created {} chains in {} ms",
1245 num_chains_to_create,
1246 start.elapsed().as_millis()
1247 );
1248 }
1249
1250 if !close_chains {
1252 info!("Updating wallet from client");
1253 self.update_wallet_from_client(&default_chain_client)
1254 .await?;
1255 }
1256 info!("Retrying pending outgoing messages");
1257 default_chain_client
1258 .retry_pending_outgoing_messages()
1259 .await
1260 .context("outgoing messages to create the new chains should be delivered")?;
1261 info!("Processing default chain inbox");
1262 default_chain_client.process_inbox().await?;
1263
1264 assert_eq!(
1265 benchmark_chains.len(),
1266 chain_clients.len(),
1267 "benchmark_chains and chain_clients must have the same size"
1268 );
1269
1270 Ok((benchmark_chains, chain_clients))
1271 }
1272
1273 async fn execute_open_chains_operations(
1274 chain_client: &ChainClient<Env>,
1275 balance: Amount,
1276 owners: Vec<AccountOwner>,
1277 ) -> Result<ConfirmedBlockCertificate, Error> {
1278 let operations: Vec<_> = owners
1279 .iter()
1280 .map(|owner| {
1281 let config = OpenChainConfig {
1282 ownership: ChainOwnership::single_super(*owner),
1283 balance,
1284 application_permissions: Default::default(),
1285 };
1286 Operation::system(SystemOperation::OpenChain(config))
1287 })
1288 .collect();
1289 info!("Executing {} OpenChain operations", operations.len());
1290 Ok(chain_client
1291 .execute_operations(operations, vec![])
1292 .await?
1293 .expect("should execute block with OpenChain operations"))
1294 }
1295
1296 async fn supply_fungible_tokens(
1298 &mut self,
1299 key_pairs: &[(ChainId, AccountOwner)],
1300 application_id: ApplicationId,
1301 ) -> Result<(), Error> {
1302 let default_chain_id = self.default_chain();
1303 let default_key = self
1304 .wallet()
1305 .get(default_chain_id)
1306 .await
1307 .unwrap()
1308 .unwrap()
1309 .owner
1310 .unwrap();
1311 let amount = Amount::from_nanos(4);
1313 let operations: Vec<Operation> = key_pairs
1314 .iter()
1315 .map(|(chain_id, owner)| {
1316 fungible_transfer(application_id, *chain_id, default_key, *owner, amount)
1317 })
1318 .collect();
1319 let chain_client = self.make_chain_client(default_chain_id).await?;
1320 for operation_chunk in operations.chunks(1000) {
1322 chain_client
1323 .execute_operations(operation_chunk.to_vec(), vec![])
1324 .await?
1325 .expect("should execute block with Transfer operations");
1326 }
1327 self.update_wallet_from_client(&chain_client).await?;
1328
1329 Ok(())
1330 }
1331}