1use std::{
5 borrow::Cow,
6 collections::BTreeMap,
7 env,
8 marker::PhantomData,
9 mem,
10 path::{Path, PathBuf},
11 str::FromStr,
12 sync,
13 time::Duration,
14};
15
16use anyhow::{bail, ensure, Context, Result};
17use async_graphql::InputType;
18use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
19use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
20use heck::ToKebabCase;
21use linera_base::{
22 abi::ContractAbi,
23 command::{resolve_binary, CommandExt},
24 crypto::{CryptoHash, InMemorySigner},
25 data_types::{Amount, Bytecode, Epoch},
26 identifiers::{
27 Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
28 },
29 vm::VmRuntime,
30};
31use linera_client::{client_options::ResourceControlPolicyConfig, wallet::Wallet};
32use linera_core::worker::Notification;
33use linera_execution::committee::Committee;
34use linera_faucet_client::Faucet;
35use serde::{de::DeserializeOwned, ser::Serialize};
36use serde_json::{json, Value};
37use tempfile::TempDir;
38use tokio::process::{Child, Command};
39use tracing::{error, info, warn};
40#[cfg(feature = "benchmark")]
41use {
42 crate::cli::command::BenchmarkCommand,
43 serde_command_opts::to_args,
44 std::process::Stdio,
45 tokio::{
46 io::{AsyncBufReadExt, BufReader},
47 sync::oneshot,
48 task::JoinHandle,
49 },
50};
51
52use crate::{
53 cli_wrappers::{
54 local_net::{PathProvider, ProcessInbox},
55 Network,
56 },
57 util::{self, ChildExt},
58};
59
60const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
63
64fn reqwest_client() -> reqwest::Client {
65 reqwest::ClientBuilder::new()
66 .timeout(Duration::from_secs(30))
67 .build()
68 .unwrap()
69}
70
71pub struct ClientWrapper {
73 binary_path: sync::Mutex<Option<PathBuf>>,
74 testing_prng_seed: Option<u64>,
75 storage: String,
76 wallet: String,
77 keystore: String,
78 max_pending_message_bundles: usize,
79 network: Network,
80 pub path_provider: PathProvider,
81 on_drop: OnClientDrop,
82 extra_args: Vec<String>,
83}
84
85#[derive(Clone, Copy, Debug, Eq, PartialEq)]
87pub enum OnClientDrop {
88 CloseChains,
90 LeakChains,
92}
93
94impl ClientWrapper {
95 pub fn new(
96 path_provider: PathProvider,
97 network: Network,
98 testing_prng_seed: Option<u64>,
99 id: usize,
100 on_drop: OnClientDrop,
101 ) -> Self {
102 Self::new_with_extra_args(
103 path_provider,
104 network,
105 testing_prng_seed,
106 id,
107 on_drop,
108 vec!["--wait-for-outgoing-messages".to_string()],
109 )
110 }
111
112 pub fn new_with_extra_args(
113 path_provider: PathProvider,
114 network: Network,
115 testing_prng_seed: Option<u64>,
116 id: usize,
117 on_drop: OnClientDrop,
118 extra_args: Vec<String>,
119 ) -> Self {
120 let storage = format!(
121 "rocksdb:{}/client_{}.db",
122 path_provider.path().display(),
123 id
124 );
125 let wallet = format!("wallet_{}.json", id);
126 let keystore = format!("keystore_{}.json", id);
127 Self {
128 binary_path: sync::Mutex::new(None),
129 testing_prng_seed,
130 storage,
131 wallet,
132 keystore,
133 max_pending_message_bundles: 10_000,
134 network,
135 path_provider,
136 on_drop,
137 extra_args,
138 }
139 }
140
141 pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
143 let tmp = TempDir::new()?;
144 let mut command = self.command().await?;
145 command
146 .current_dir(tmp.path())
147 .arg("project")
148 .arg("new")
149 .arg(project_name)
150 .arg("--linera-root")
151 .arg(linera_root)
152 .spawn_and_wait_for_stdout()
153 .await?;
154 Ok(tmp)
155 }
156
157 pub async fn project_publish<T: Serialize>(
159 &self,
160 path: PathBuf,
161 required_application_ids: Vec<String>,
162 publisher: impl Into<Option<ChainId>>,
163 argument: &T,
164 ) -> Result<String> {
165 let json_parameters = serde_json::to_string(&())?;
166 let json_argument = serde_json::to_string(argument)?;
167 let mut command = self.command().await?;
168 command
169 .arg("project")
170 .arg("publish-and-create")
171 .arg(path)
172 .args(publisher.into().iter().map(ChainId::to_string))
173 .args(["--json-parameters", &json_parameters])
174 .args(["--json-argument", &json_argument]);
175 if !required_application_ids.is_empty() {
176 command.arg("--required-application-ids");
177 command.args(required_application_ids);
178 }
179 let stdout = command.spawn_and_wait_for_stdout().await?;
180 Ok(stdout.trim().to_string())
181 }
182
183 pub async fn project_test(&self, path: &Path) -> Result<()> {
185 self.command()
186 .await
187 .context("failed to create project test command")?
188 .current_dir(path)
189 .arg("project")
190 .arg("test")
191 .spawn_and_wait_for_stdout()
192 .await?;
193 Ok(())
194 }
195
196 async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
197 let mut command = self.command_binary().await?;
198 command.current_dir(self.path_provider.path());
199 for (key, value) in envs {
200 command.env(key, value);
201 }
202 for argument in self.command_arguments() {
203 command.arg(&*argument);
204 }
205 Ok(command)
206 }
207
208 async fn command(&self) -> Result<Command> {
209 self.command_with_envs(&[(
210 "RUST_LOG",
211 &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
212 )])
213 .await
214 }
215
216 fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
218 [
219 "--wallet".into(),
220 self.wallet.as_str().into(),
221 "--keystore".into(),
222 self.keystore.as_str().into(),
223 "--storage".into(),
224 self.storage.as_str().into(),
225 "--max-pending-message-bundles".into(),
226 self.max_pending_message_bundles.to_string().into(),
227 "--send-timeout-ms".into(),
228 "500000".into(),
229 "--recv-timeout-ms".into(),
230 "500000".into(),
231 ]
232 .into_iter()
233 .chain(self.extra_args.iter().map(|s| s.as_str().into()))
234 }
235
236 async fn command_binary(&self) -> Result<Command> {
240 match self.command_with_cached_binary_path() {
241 Some(command) => Ok(command),
242 None => {
243 let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
244 let command = Command::new(&resolved_path);
245
246 self.set_cached_binary_path(resolved_path);
247
248 Ok(command)
249 }
250 }
251 }
252
253 fn command_with_cached_binary_path(&self) -> Option<Command> {
255 let binary_path = self.binary_path.lock().unwrap();
256
257 binary_path.as_ref().map(Command::new)
258 }
259
260 fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
268 let mut binary_path = self.binary_path.lock().unwrap();
269
270 if binary_path.is_none() {
271 *binary_path = Some(new_binary_path);
272 } else {
273 assert_eq!(*binary_path, Some(new_binary_path));
274 }
275 }
276
277 pub async fn create_genesis_config(
279 &self,
280 num_other_initial_chains: u32,
281 initial_funding: Amount,
282 policy_config: ResourceControlPolicyConfig,
283 http_allow_list: Option<Vec<String>>,
284 ) -> Result<()> {
285 let mut command = self.command().await?;
286 command
287 .args([
288 "create-genesis-config",
289 &num_other_initial_chains.to_string(),
290 ])
291 .args(["--initial-funding", &initial_funding.to_string()])
292 .args(["--committee", "committee.json"])
293 .args(["--genesis", "genesis.json"])
294 .args([
295 "--policy-config",
296 &policy_config.to_string().to_kebab_case(),
297 ]);
298 if let Some(allow_list) = http_allow_list {
299 command
300 .arg("--http-request-allow-list")
301 .arg(allow_list.join(","));
302 }
303 if let Some(seed) = self.testing_prng_seed {
304 command.arg("--testing-prng-seed").arg(seed.to_string());
305 }
306 command.spawn_and_wait_for_stdout().await?;
307 Ok(())
308 }
309
310 pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
313 let mut command = self.command().await?;
314 command.args(["wallet", "init"]);
315 match faucet {
316 None => command.args(["--genesis", "genesis.json"]),
317 Some(faucet) => command.args(["--faucet", faucet.url()]),
318 };
319 if let Some(seed) = self.testing_prng_seed {
320 command.arg("--testing-prng-seed").arg(seed.to_string());
321 }
322 command.spawn_and_wait_for_stdout().await?;
323 Ok(())
324 }
325
326 pub async fn request_chain(
328 &self,
329 faucet: &Faucet,
330 set_default: bool,
331 ) -> Result<(ChainId, AccountOwner)> {
332 let mut command = self.command().await?;
333 command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
334 if set_default {
335 command.arg("--set-default");
336 }
337 let stdout = command.spawn_and_wait_for_stdout().await?;
338 let mut lines = stdout.split_whitespace();
339 let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
340 let owner = lines.next().context("missing chain owner")?.parse()?;
341 Ok((chain_id, owner))
342 }
343
344 #[expect(clippy::too_many_arguments)]
346 pub async fn publish_and_create<
347 A: ContractAbi,
348 Parameters: Serialize,
349 InstantiationArgument: Serialize,
350 >(
351 &self,
352 contract: PathBuf,
353 service: PathBuf,
354 vm_runtime: VmRuntime,
355 parameters: &Parameters,
356 argument: &InstantiationArgument,
357 required_application_ids: &[ApplicationId],
358 publisher: impl Into<Option<ChainId>>,
359 ) -> Result<ApplicationId<A>> {
360 let json_parameters = serde_json::to_string(parameters)?;
361 let json_argument = serde_json::to_string(argument)?;
362 let mut command = self.command().await?;
363 let vm_runtime = format!("{}", vm_runtime);
364 command
365 .arg("publish-and-create")
366 .args([contract, service])
367 .args(["--vm-runtime", &vm_runtime.to_lowercase()])
368 .args(publisher.into().iter().map(ChainId::to_string))
369 .args(["--json-parameters", &json_parameters])
370 .args(["--json-argument", &json_argument]);
371 if !required_application_ids.is_empty() {
372 command.arg("--required-application-ids");
373 command.args(
374 required_application_ids
375 .iter()
376 .map(ApplicationId::to_string),
377 );
378 }
379 let stdout = command.spawn_and_wait_for_stdout().await?;
380 Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
381 }
382
383 pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
385 &self,
386 contract: PathBuf,
387 service: PathBuf,
388 vm_runtime: VmRuntime,
389 publisher: impl Into<Option<ChainId>>,
390 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
391 let stdout = self
392 .command()
393 .await?
394 .arg("publish-module")
395 .args([contract, service])
396 .args(["--vm-runtime", &format!("{}", vm_runtime).to_lowercase()])
397 .args(publisher.into().iter().map(ChainId::to_string))
398 .spawn_and_wait_for_stdout()
399 .await?;
400 let module_id: ModuleId = stdout.trim().parse()?;
401 Ok(module_id.with_abi())
402 }
403
404 pub async fn create_application<
406 Abi: ContractAbi,
407 Parameters: Serialize,
408 InstantiationArgument: Serialize,
409 >(
410 &self,
411 module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
412 parameters: &Parameters,
413 argument: &InstantiationArgument,
414 required_application_ids: &[ApplicationId],
415 creator: impl Into<Option<ChainId>>,
416 ) -> Result<ApplicationId<Abi>> {
417 let json_parameters = serde_json::to_string(parameters)?;
418 let json_argument = serde_json::to_string(argument)?;
419 let mut command = self.command().await?;
420 command
421 .arg("create-application")
422 .arg(module_id.forget_abi().to_string())
423 .args(["--json-parameters", &json_parameters])
424 .args(["--json-argument", &json_argument])
425 .args(creator.into().iter().map(ChainId::to_string));
426 if !required_application_ids.is_empty() {
427 command.arg("--required-application-ids");
428 command.args(
429 required_application_ids
430 .iter()
431 .map(ApplicationId::to_string),
432 );
433 }
434 let stdout = command.spawn_and_wait_for_stdout().await?;
435 Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
436 }
437
438 pub async fn run_node_service(
440 &self,
441 port: impl Into<Option<u16>>,
442 process_inbox: ProcessInbox,
443 ) -> Result<NodeService> {
444 let port = port.into().unwrap_or(8080);
445 let mut command = self.command().await?;
446 command.arg("service");
447 if let ProcessInbox::Skip = process_inbox {
448 command.arg("--listener-skip-process-inbox");
449 }
450 if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
451 command.args(var.split_whitespace());
452 }
453 let child = command
454 .args(["--port".to_string(), port.to_string()])
455 .spawn_into()?;
456 let client = reqwest_client();
457 for i in 0..10 {
458 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
459 let request = client
460 .get(format!("http://localhost:{}/", port))
461 .send()
462 .await;
463 if request.is_ok() {
464 info!("Node service has started");
465 return Ok(NodeService::new(port, child));
466 } else {
467 warn!("Waiting for node service to start");
468 }
469 }
470 bail!("Failed to start node service");
471 }
472
473 pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
475 let mut command = self.command().await?;
476 command.arg("query-validator").arg(address);
477 let stdout = command.spawn_and_wait_for_stdout().await?;
478 let hash = stdout
479 .trim()
480 .parse()
481 .context("error while parsing the result of `linera query-validator`")?;
482 Ok(hash)
483 }
484
485 pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
487 let mut command = self.command().await?;
488 command.arg("query-validators");
489 if let Some(chain_id) = chain_id {
490 command.arg(chain_id.to_string());
491 }
492 command.spawn_and_wait_for_stdout().await?;
493 Ok(())
494 }
495
496 pub async fn sync_validator(
498 &self,
499 chain_ids: impl IntoIterator<Item = &ChainId>,
500 validator_address: impl Into<String>,
501 ) -> Result<()> {
502 let mut command = self.command().await?;
503 command.arg("sync-validator").arg(validator_address.into());
504 let mut chain_ids = chain_ids.into_iter().peekable();
505 if chain_ids.peek().is_some() {
506 command
507 .arg("--chains")
508 .args(chain_ids.map(ChainId::to_string));
509 }
510 command.spawn_and_wait_for_stdout().await?;
511 Ok(())
512 }
513
514 pub async fn run_faucet(
516 &self,
517 port: impl Into<Option<u16>>,
518 chain_id: ChainId,
519 amount: Amount,
520 ) -> Result<FaucetService> {
521 let port = port.into().unwrap_or(8080);
522 let mut command = self.command().await?;
523 let child = command
524 .arg("faucet")
525 .arg(chain_id.to_string())
526 .args(["--port".to_string(), port.to_string()])
527 .args(["--amount".to_string(), amount.to_string()])
528 .spawn_into()?;
529 let client = reqwest_client();
530 for i in 0..10 {
531 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
532 let request = client
533 .get(format!("http://localhost:{}/", port))
534 .send()
535 .await;
536 if request.is_ok() {
537 info!("Faucet has started");
538 return Ok(FaucetService::new(port, child));
539 } else {
540 warn!("Waiting for faucet to start");
541 }
542 }
543 bail!("Failed to start faucet");
544 }
545
546 pub async fn local_balance(&self, account: Account) -> Result<Amount> {
548 let stdout = self
549 .command()
550 .await?
551 .arg("local-balance")
552 .arg(account.to_string())
553 .spawn_and_wait_for_stdout()
554 .await?;
555 let amount = stdout
556 .trim()
557 .parse()
558 .context("error while parsing the result of `linera local-balance`")?;
559 Ok(amount)
560 }
561
562 pub async fn query_balance(&self, account: Account) -> Result<Amount> {
564 let stdout = self
565 .command()
566 .await?
567 .arg("query-balance")
568 .arg(account.to_string())
569 .spawn_and_wait_for_stdout()
570 .await?;
571 let amount = stdout
572 .trim()
573 .parse()
574 .context("error while parsing the result of `linera query-balance`")?;
575 Ok(amount)
576 }
577
578 pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
580 self.command()
581 .await?
582 .arg("sync")
583 .arg(chain_id.to_string())
584 .spawn_and_wait_for_stdout()
585 .await?;
586 Ok(())
587 }
588
589 pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
591 self.command()
592 .await?
593 .arg("process-inbox")
594 .arg(chain_id.to_string())
595 .spawn_and_wait_for_stdout()
596 .await?;
597 Ok(())
598 }
599
600 pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
602 self.command()
603 .await?
604 .arg("transfer")
605 .arg(amount.to_string())
606 .args(["--from", &from.to_string()])
607 .args(["--to", &to.to_string()])
608 .spawn_and_wait_for_stdout()
609 .await?;
610 Ok(())
611 }
612
613 pub async fn transfer_with_silent_logs(
615 &self,
616 amount: Amount,
617 from: ChainId,
618 to: ChainId,
619 ) -> Result<()> {
620 self.command()
621 .await?
622 .env("RUST_LOG", "off")
623 .arg("transfer")
624 .arg(amount.to_string())
625 .args(["--from", &from.to_string()])
626 .args(["--to", &to.to_string()])
627 .spawn_and_wait_for_stdout()
628 .await?;
629 Ok(())
630 }
631
632 pub async fn transfer_with_accounts(
634 &self,
635 amount: Amount,
636 from: Account,
637 to: Account,
638 ) -> Result<()> {
639 self.command()
640 .await?
641 .arg("transfer")
642 .arg(amount.to_string())
643 .args(["--from", &from.to_string()])
644 .args(["--to", &to.to_string()])
645 .spawn_and_wait_for_stdout()
646 .await?;
647 Ok(())
648 }
649
650 #[cfg(feature = "benchmark")]
651 fn benchmark_command_internal(command: &mut Command, args: BenchmarkCommand) -> Result<()> {
652 let args = to_args(&args)?
653 .chunks_exact(2)
654 .flat_map(|pair| {
655 let option = format!("--{}", pair[0]);
656 match pair[1].as_str() {
657 "true" => vec![option],
658 "false" => vec![],
659 _ => vec![option, pair[1].clone()],
660 }
661 })
662 .collect::<Vec<_>>();
663 command.arg("benchmark").args(args);
664 Ok(())
665 }
666
667 #[cfg(feature = "benchmark")]
668 async fn benchmark_command_with_envs(
669 &self,
670 args: BenchmarkCommand,
671 envs: &[(&str, &str)],
672 ) -> Result<Command> {
673 let mut command = self.command_with_envs(envs).await?;
674 Self::benchmark_command_internal(&mut command, args)?;
675 Ok(command)
676 }
677
678 #[cfg(feature = "benchmark")]
679 async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
680 let mut command = self.command().await?;
681 Self::benchmark_command_internal(&mut command, args)?;
682 Ok(command)
683 }
684
685 #[cfg(feature = "benchmark")]
687 pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
688 let mut command = self.benchmark_command(args).await?;
689 command.spawn_and_wait_for_stdout().await?;
690 Ok(())
691 }
692
693 #[cfg(feature = "benchmark")]
696 pub async fn benchmark_detached(
697 &self,
698 args: BenchmarkCommand,
699 tx: oneshot::Sender<()>,
700 ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
701 let mut child = self
702 .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
703 .await?
704 .kill_on_drop(true)
705 .stdin(Stdio::piped())
706 .stdout(Stdio::piped())
707 .stderr(Stdio::piped())
708 .spawn()?;
709
710 let pid = child.id().expect("failed to get pid");
711 let stdout = child.stdout.take().expect("stdout not open");
712 let stdout_handle = tokio::spawn(async move {
713 let mut lines = BufReader::new(stdout).lines();
714 while let Ok(Some(line)) = lines.next_line().await {
715 println!("benchmark{{pid={pid}}} {line}");
716 }
717 });
718
719 let stderr = child.stderr.take().expect("stderr not open");
720 let stderr_handle = tokio::spawn(async move {
721 let mut lines = BufReader::new(stderr).lines();
722 let mut tx = Some(tx);
723 while let Ok(Some(line)) = lines.next_line().await {
724 if line.contains("Ready to start benchmark") {
725 tx.take()
726 .expect("Should only send signal once")
727 .send(())
728 .expect("failed to send ready signal to main thread");
729 } else {
730 println!("benchmark{{pid={pid}}} {line}");
731 }
732 }
733 });
734 Ok((child, stdout_handle, stderr_handle))
735 }
736
737 pub async fn open_chain(
739 &self,
740 from: ChainId,
741 owner: Option<AccountOwner>,
742 initial_balance: Amount,
743 ) -> Result<(ChainId, AccountOwner)> {
744 let mut command = self.command().await?;
745 command
746 .arg("open-chain")
747 .args(["--from", &from.to_string()])
748 .args(["--initial-balance", &initial_balance.to_string()]);
749
750 if let Some(owner) = owner {
751 command.args(["--owner", &owner.to_string()]);
752 }
753
754 let stdout = command.spawn_and_wait_for_stdout().await?;
755 let mut split = stdout.split('\n');
756 let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
757 let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
758 if let Some(owner) = owner {
759 assert_eq!(owner, new_owner);
760 }
761 Ok((chain_id, new_owner))
762 }
763
764 pub async fn open_and_assign(
766 &self,
767 client: &ClientWrapper,
768 initial_balance: Amount,
769 ) -> Result<ChainId> {
770 let our_chain = self
771 .load_wallet()?
772 .default_chain()
773 .context("no default chain found")?;
774 let owner = client.keygen().await?;
775 let (new_chain, _) = self
776 .open_chain(our_chain, Some(owner), initial_balance)
777 .await?;
778 client.assign(owner, new_chain).await?;
779 Ok(new_chain)
780 }
781
782 pub async fn open_multi_owner_chain(
783 &self,
784 from: ChainId,
785 owners: Vec<AccountOwner>,
786 weights: Vec<u64>,
787 multi_leader_rounds: u32,
788 balance: Amount,
789 base_timeout_ms: u64,
790 ) -> Result<ChainId> {
791 let mut command = self.command().await?;
792 command
793 .arg("open-multi-owner-chain")
794 .args(["--from", &from.to_string()])
795 .arg("--owners")
796 .args(owners.iter().map(AccountOwner::to_string))
797 .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
798 if !weights.is_empty() {
799 command
800 .arg("--owner-weights")
801 .args(weights.iter().map(u64::to_string));
802 };
803 command
804 .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
805 .args(["--initial-balance", &balance.to_string()]);
806
807 let stdout = command.spawn_and_wait_for_stdout().await?;
808 let mut split = stdout.split('\n');
809 let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
810
811 Ok(chain_id)
812 }
813
814 pub async fn change_ownership(
815 &self,
816 chain_id: ChainId,
817 super_owners: Vec<AccountOwner>,
818 owners: Vec<AccountOwner>,
819 ) -> Result<()> {
820 let mut command = self.command().await?;
821 command
822 .arg("change-ownership")
823 .args(["--chain-id", &chain_id.to_string()]);
824 if !super_owners.is_empty() {
825 command
826 .arg("--super-owners")
827 .args(super_owners.iter().map(AccountOwner::to_string));
828 }
829 if !owners.is_empty() {
830 command
831 .arg("--owners")
832 .args(owners.iter().map(AccountOwner::to_string));
833 }
834 command.spawn_and_wait_for_stdout().await?;
835 Ok(())
836 }
837
838 pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
840 let mut command = self.command().await?;
841 command
842 .args(["wallet", "follow-chain"])
843 .arg(chain_id.to_string());
844 if sync {
845 command.arg("--sync");
846 }
847 command.spawn_and_wait_for_stdout().await?;
848 Ok(())
849 }
850
851 pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
853 let mut command = self.command().await?;
854 command
855 .args(["wallet", "forget-chain"])
856 .arg(chain_id.to_string());
857 command.spawn_and_wait_for_stdout().await?;
858 Ok(())
859 }
860
861 pub async fn retry_pending_block(
862 &self,
863 chain_id: Option<ChainId>,
864 ) -> Result<Option<CryptoHash>> {
865 let mut command = self.command().await?;
866 command.arg("retry-pending-block");
867 if let Some(chain_id) = chain_id {
868 command.arg(chain_id.to_string());
869 }
870 let stdout = command.spawn_and_wait_for_stdout().await?;
871 let stdout = stdout.trim();
872 if stdout.is_empty() {
873 Ok(None)
874 } else {
875 Ok(Some(CryptoHash::from_str(stdout)?))
876 }
877 }
878
879 pub async fn publish_data_blob(
881 &self,
882 path: &Path,
883 chain_id: Option<ChainId>,
884 ) -> Result<CryptoHash> {
885 let mut command = self.command().await?;
886 command.arg("publish-data-blob").arg(path);
887 if let Some(chain_id) = chain_id {
888 command.arg(chain_id.to_string());
889 }
890 let stdout = command.spawn_and_wait_for_stdout().await?;
891 let stdout = stdout.trim();
892 Ok(CryptoHash::from_str(stdout)?)
893 }
894
895 pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
897 let mut command = self.command().await?;
898 command.arg("read-data-blob").arg(hash.to_string());
899 if let Some(chain_id) = chain_id {
900 command.arg(chain_id.to_string());
901 }
902 command.spawn_and_wait_for_stdout().await?;
903 Ok(())
904 }
905
906 pub fn load_wallet(&self) -> Result<Wallet> {
907 util::read_json(self.wallet_path())
908 }
909
910 pub fn load_keystore(&self) -> Result<InMemorySigner> {
911 util::read_json(self.keystore_path())
912 }
913
914 pub fn wallet_path(&self) -> PathBuf {
915 self.path_provider.path().join(&self.wallet)
916 }
917
918 pub fn keystore_path(&self) -> PathBuf {
919 self.path_provider.path().join(&self.keystore)
920 }
921
922 pub fn storage_path(&self) -> &str {
923 &self.storage
924 }
925
926 pub fn get_owner(&self) -> Option<AccountOwner> {
927 let wallet = self.load_wallet().ok()?;
928 let chain_id = wallet.default_chain()?;
929 wallet.get(chain_id)?.owner
930 }
931
932 pub async fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
933 self.load_wallet()
934 .ok()
935 .is_some_and(|wallet| wallet.get(chain).is_some())
936 }
937
938 pub async fn set_validator(
939 &self,
940 validator_key: &(String, String),
941 port: usize,
942 votes: usize,
943 ) -> Result<()> {
944 let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
945 self.command()
946 .await?
947 .arg("set-validator")
948 .args(["--public-key", &validator_key.0])
949 .args(["--account-key", &validator_key.1])
950 .args(["--address", &address])
951 .args(["--votes", &votes.to_string()])
952 .spawn_and_wait_for_stdout()
953 .await?;
954 Ok(())
955 }
956
957 pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
958 self.command()
959 .await?
960 .arg("remove-validator")
961 .args(["--public-key", validator_key])
962 .spawn_and_wait_for_stdout()
963 .await?;
964 Ok(())
965 }
966
967 pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
968 self.command()
969 .await?
970 .arg("revoke-epochs")
971 .arg(epoch.to_string())
972 .spawn_and_wait_for_stdout()
973 .await?;
974 Ok(())
975 }
976
977 pub async fn keygen(&self) -> Result<AccountOwner> {
979 let stdout = self
980 .command()
981 .await?
982 .arg("keygen")
983 .spawn_and_wait_for_stdout()
984 .await?;
985 AccountOwner::from_str(stdout.as_str().trim())
986 }
987
988 pub fn default_chain(&self) -> Option<ChainId> {
990 self.load_wallet().ok()?.default_chain()
991 }
992
993 pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
995 let _stdout = self
996 .command()
997 .await?
998 .arg("assign")
999 .args(["--owner", &owner.to_string()])
1000 .args(["--chain-id", &chain_id.to_string()])
1001 .spawn_and_wait_for_stdout()
1002 .await?;
1003 Ok(())
1004 }
1005
1006 pub async fn set_preffered_owner(
1008 &self,
1009 chain_id: ChainId,
1010 owner: Option<AccountOwner>,
1011 ) -> Result<()> {
1012 let mut owner_arg = vec!["--owner".to_string()];
1013 if let Some(owner) = owner {
1014 owner_arg.push(owner.to_string());
1015 };
1016 self.command()
1017 .await?
1018 .arg("set-preferred-owner")
1019 .args(["--chain-id", &chain_id.to_string()])
1020 .args(owner_arg)
1021 .spawn_and_wait_for_stdout()
1022 .await?;
1023 Ok(())
1024 }
1025
1026 pub async fn build_application(
1027 &self,
1028 path: &Path,
1029 name: &str,
1030 is_workspace: bool,
1031 ) -> Result<(PathBuf, PathBuf)> {
1032 Command::new("cargo")
1033 .current_dir(self.path_provider.path())
1034 .arg("build")
1035 .arg("--release")
1036 .args(["--target", "wasm32-unknown-unknown"])
1037 .arg("--manifest-path")
1038 .arg(path.join("Cargo.toml"))
1039 .spawn_and_wait_for_stdout()
1040 .await?;
1041
1042 let release_dir = match is_workspace {
1043 true => path.join("../target/wasm32-unknown-unknown/release"),
1044 false => path.join("target/wasm32-unknown-unknown/release"),
1045 };
1046
1047 let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1048 let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1049
1050 let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1051 let service_size = fs_err::tokio::metadata(&service).await?.len();
1052 info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1053
1054 Ok((contract, service))
1055 }
1056}
1057
1058impl Drop for ClientWrapper {
1059 fn drop(&mut self) {
1060 use std::process::Command as SyncCommand;
1061
1062 if self.on_drop != OnClientDrop::CloseChains {
1063 return;
1064 }
1065
1066 let Ok(binary_path) = self.binary_path.lock() else {
1067 error!("Failed to close chains because a thread panicked with a lock to `binary_path`");
1068 return;
1069 };
1070
1071 let Some(binary_path) = binary_path.as_ref() else {
1072 warn!(
1073 "Assuming no chains need to be closed, because the command binary was never \
1074 resolved and therefore presumably never called"
1075 );
1076 return;
1077 };
1078
1079 let working_directory = self.path_provider.path();
1080 let mut wallet_show_command = SyncCommand::new(binary_path);
1081
1082 for argument in self.command_arguments() {
1083 wallet_show_command.arg(&*argument);
1084 }
1085
1086 let Ok(wallet_show_output) = wallet_show_command
1087 .current_dir(working_directory)
1088 .args(["wallet", "show", "--short", "--owned"])
1089 .output()
1090 else {
1091 warn!("Failed to execute `wallet show --short` to list chains to close");
1092 return;
1093 };
1094
1095 if !wallet_show_output.status.success() {
1096 warn!("Failed to list chains in the wallet to close them");
1097 return;
1098 }
1099
1100 let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1101 warn!(
1102 "Failed to close chains because `linera wallet show --short` \
1103 returned a non-UTF-8 output"
1104 );
1105 return;
1106 };
1107
1108 let chain_ids = chain_list_string
1109 .split('\n')
1110 .map(|line| line.trim())
1111 .filter(|line| !line.is_empty());
1112
1113 for chain_id in chain_ids {
1114 let mut close_chain_command = SyncCommand::new(binary_path);
1115
1116 for argument in self.command_arguments() {
1117 close_chain_command.arg(&*argument);
1118 }
1119
1120 close_chain_command.current_dir(working_directory);
1121
1122 match close_chain_command.args(["close-chain", chain_id]).status() {
1123 Ok(status) if status.success() => (),
1124 Ok(failure) => warn!("Failed to close chain {chain_id}: {failure}"),
1125 Err(error) => warn!("Failed to close chain {chain_id}: {error}"),
1126 }
1127 }
1128 }
1129}
1130
1131#[cfg(with_testing)]
1132impl ClientWrapper {
1133 pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1134 self.build_application(Self::example_path(name)?.as_path(), name, true)
1135 .await
1136 }
1137
1138 pub fn example_path(name: &str) -> Result<PathBuf> {
1139 Ok(env::current_dir()?.join("../examples/").join(name))
1140 }
1141}
1142
1143fn truncate_query_output(input: &str) -> String {
1144 let max_len = 1000;
1145 if input.len() < max_len {
1146 input.to_string()
1147 } else {
1148 format!("{} ...", input.get(..max_len).unwrap())
1149 }
1150}
1151
1152fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1153 let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1154 let max_len = 200;
1155 if query.len() < max_len {
1156 query
1157 } else {
1158 format!("{} ...", query.get(..max_len).unwrap())
1159 }
1160}
1161
1162pub struct NodeService {
1164 port: u16,
1165 child: Child,
1166}
1167
1168impl NodeService {
1169 fn new(port: u16, child: Child) -> Self {
1170 Self { port, child }
1171 }
1172
1173 pub async fn terminate(mut self) -> Result<()> {
1174 self.child.kill().await.context("terminating node service")
1175 }
1176
1177 pub fn port(&self) -> u16 {
1178 self.port
1179 }
1180
1181 pub fn ensure_is_running(&mut self) -> Result<()> {
1182 self.child.ensure_is_running()
1183 }
1184
1185 pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1186 let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1187 let mut data = self.query_node(query).await?;
1188 Ok(serde_json::from_value(data["processInbox"].take())?)
1189 }
1190
1191 pub async fn make_application<A: ContractAbi>(
1192 &self,
1193 chain_id: &ChainId,
1194 application_id: &ApplicationId<A>,
1195 ) -> Result<ApplicationWrapper<A>> {
1196 let application_id = application_id.forget_abi().to_string();
1197 let link = format!(
1198 "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1199 self.port
1200 );
1201 Ok(ApplicationWrapper::from(link))
1202 }
1203
1204 pub async fn publish_data_blob(
1205 &self,
1206 chain_id: &ChainId,
1207 bytes: Vec<u8>,
1208 ) -> Result<CryptoHash> {
1209 let query = format!(
1210 "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1211 chain_id.to_value(),
1212 bytes.to_value(),
1213 );
1214 let data = self.query_node(query).await?;
1215 serde_json::from_value(data["publishDataBlob"].clone())
1216 .context("missing publishDataBlob field in response")
1217 }
1218
1219 pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1220 &self,
1221 chain_id: &ChainId,
1222 contract: PathBuf,
1223 service: PathBuf,
1224 vm_runtime: VmRuntime,
1225 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1226 let contract_code = Bytecode::load_from_file(&contract).await?;
1227 let service_code = Bytecode::load_from_file(&service).await?;
1228 let query = format!(
1229 "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1230 chain_id.to_value(),
1231 contract_code.to_value(),
1232 service_code.to_value(),
1233 vm_runtime.to_value(),
1234 );
1235 let data = self.query_node(query).await?;
1236 let module_str = data["publishModule"]
1237 .as_str()
1238 .context("module ID not found")?;
1239 let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1240 Ok(module_id.with_abi())
1241 }
1242
1243 pub async fn query_committees(&self, chain_id: &ChainId) -> Result<BTreeMap<Epoch, Committee>> {
1244 let query = format!(
1245 "query {{ chain(chainId:\"{chain_id}\") {{
1246 executionState {{ system {{ committees }} }}
1247 }} }}"
1248 );
1249 let mut response = self.query_node(query).await?;
1250 let committees = response["chain"]["executionState"]["system"]["committees"].take();
1251 Ok(serde_json::from_value(committees)?)
1252 }
1253
1254 pub async fn events_from_index(
1255 &self,
1256 chain_id: &ChainId,
1257 stream_id: &StreamId,
1258 start_index: u32,
1259 ) -> Result<Vec<IndexAndEvent>> {
1260 let query = format!(
1261 "query {{
1262 eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1263 {{ index event }}
1264 }}",
1265 stream_id.to_value()
1266 );
1267 let mut response = self.query_node(query).await?;
1268 let response = response["eventsFromIndex"].take();
1269 Ok(serde_json::from_value(response)?)
1270 }
1271
1272 pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1273 let n_try = 5;
1274 let query = query.as_ref();
1275 for i in 0..n_try {
1276 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1277 let url = format!("http://localhost:{}/", self.port);
1278 let client = reqwest_client();
1279 let result = client
1280 .post(url)
1281 .json(&json!({ "query": query }))
1282 .send()
1283 .await;
1284 if matches!(result, Err(ref error) if error.is_timeout()) {
1285 warn!(
1286 "Timeout when sending query {} to the node service",
1287 truncate_query_output(query)
1288 );
1289 continue;
1290 }
1291 let response = result.with_context(|| {
1292 format!(
1293 "query_node: failed to post query={}",
1294 truncate_query_output(query)
1295 )
1296 })?;
1297 anyhow::ensure!(
1298 response.status().is_success(),
1299 "Query \"{}\" failed: {}",
1300 truncate_query_output(query),
1301 response
1302 .text()
1303 .await
1304 .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1305 );
1306 let value: Value = response.json().await.context("invalid JSON")?;
1307 if let Some(errors) = value.get("errors") {
1308 warn!(
1309 "Query \"{}\" failed: {}",
1310 truncate_query_output(query),
1311 errors
1312 );
1313 } else {
1314 return Ok(value["data"].clone());
1315 }
1316 }
1317 bail!(
1318 "Query \"{}\" failed after {} retries.",
1319 truncate_query_output(query),
1320 n_try
1321 );
1322 }
1323
1324 pub async fn create_application<
1325 Abi: ContractAbi,
1326 Parameters: Serialize,
1327 InstantiationArgument: Serialize,
1328 >(
1329 &self,
1330 chain_id: &ChainId,
1331 module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1332 parameters: &Parameters,
1333 argument: &InstantiationArgument,
1334 required_application_ids: &[ApplicationId],
1335 ) -> Result<ApplicationId<Abi>> {
1336 let module_id = module_id.forget_abi();
1337 let json_required_applications_ids = required_application_ids
1338 .iter()
1339 .map(ApplicationId::to_string)
1340 .collect::<Vec<_>>()
1341 .to_value();
1342 let new_parameters = serde_json::to_value(parameters)
1344 .context("could not create parameters JSON")?
1345 .to_value();
1346 let new_argument = serde_json::to_value(argument)
1347 .context("could not create argument JSON")?
1348 .to_value();
1349 let query = format!(
1350 "mutation {{ createApplication(\
1351 chainId: \"{chain_id}\",
1352 moduleId: \"{module_id}\", \
1353 parameters: {new_parameters}, \
1354 instantiationArgument: {new_argument}, \
1355 requiredApplicationIds: {json_required_applications_ids}) \
1356 }}"
1357 );
1358 let data = self.query_node(query).await?;
1359 let app_id_str = data["createApplication"]
1360 .as_str()
1361 .context("missing createApplication string in response")?
1362 .trim();
1363 Ok(app_id_str
1364 .parse::<ApplicationId>()
1365 .context("invalid application ID")?
1366 .with_abi())
1367 }
1368
1369 pub async fn chain_tip_hash(&self, chain: ChainId) -> Result<Option<CryptoHash>> {
1371 let query = format!(r#"query {{ block(chainId: "{chain}") {{ hash }} }}"#);
1372
1373 let mut response = self.query_node(&query).await?;
1374
1375 match mem::take(&mut response["block"]["hash"]) {
1376 Value::Null => Ok(None),
1377 Value::String(hash) => Ok(Some(
1378 hash.parse()
1379 .context("Received an invalid hash {hash:?} for chain tip")?,
1380 )),
1381 invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
1382 }
1383 }
1384
1385 pub async fn notifications(
1387 &self,
1388 chain_id: ChainId,
1389 ) -> Result<impl Stream<Item = Result<Notification>>> {
1390 let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
1391 let url = format!("ws://localhost:{}/ws", self.port);
1392 let mut request = url.into_client_request()?;
1393 request.headers_mut().insert(
1394 "Sec-WebSocket-Protocol",
1395 HeaderValue::from_str("graphql-transport-ws")?,
1396 );
1397 let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1398 let init_json = json!({
1399 "type": "connection_init",
1400 "payload": {}
1401 });
1402 websocket.send(init_json.to_string().into()).await?;
1403 let text = websocket
1404 .next()
1405 .await
1406 .context("Failed to establish connection")??
1407 .into_text()?;
1408 ensure!(
1409 text == "{\"type\":\"connection_ack\"}",
1410 "Unexpected response: {text}"
1411 );
1412 let query_json = json!({
1413 "id": "1",
1414 "type": "start",
1415 "payload": {
1416 "query": query,
1417 "variables": {},
1418 "operationName": null
1419 }
1420 });
1421 websocket.send(query_json.to_string().into()).await?;
1422 Ok(websocket
1423 .map_err(anyhow::Error::from)
1424 .and_then(|message| async {
1425 let text = message.into_text()?;
1426 let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1427 if let Some(errors) = value["payload"].get("errors") {
1428 bail!("Notification subscription failed: {errors:?}");
1429 }
1430 serde_json::from_value(value["payload"]["data"]["notifications"].clone())
1431 .context("Failed to deserialize notification")
1432 }))
1433 }
1434}
1435
1436pub struct FaucetService {
1438 port: u16,
1439 child: Child,
1440}
1441
1442impl FaucetService {
1443 fn new(port: u16, child: Child) -> Self {
1444 Self { port, child }
1445 }
1446
1447 pub async fn terminate(mut self) -> Result<()> {
1448 self.child
1449 .kill()
1450 .await
1451 .context("terminating faucet service")
1452 }
1453
1454 pub fn ensure_is_running(&mut self) -> Result<()> {
1455 self.child.ensure_is_running()
1456 }
1457
1458 pub fn instance(&self) -> Faucet {
1459 Faucet::new(format!("http://localhost:{}/", self.port))
1460 }
1461}
1462
1463pub struct ApplicationWrapper<A> {
1465 uri: String,
1466 _phantom: PhantomData<A>,
1467}
1468
1469impl<A> ApplicationWrapper<A> {
1470 pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
1471 let query = query.as_ref();
1472 let value = self.run_json_query(json!({ "query": query })).await?;
1473 Ok(value["data"].clone())
1474 }
1475
1476 pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
1477 const MAX_RETRIES: usize = 5;
1478
1479 for i in 0.. {
1480 let client = reqwest_client();
1481 let result = client.post(&self.uri).json(&query).send().await;
1482 let response = match result {
1483 Ok(response) => response,
1484 Err(error) if i < MAX_RETRIES => {
1485 warn!(
1486 "Failed to post query \"{}\": {error}; retrying",
1487 truncate_query_output_serialize(&query),
1488 );
1489 continue;
1490 }
1491 Err(error) => {
1492 let query = truncate_query_output_serialize(&query);
1493 return Err(error)
1494 .with_context(|| format!("run_json_query: failed to post query={query}"));
1495 }
1496 };
1497 anyhow::ensure!(
1498 response.status().is_success(),
1499 "Query \"{}\" failed: {}",
1500 truncate_query_output_serialize(&query),
1501 response
1502 .text()
1503 .await
1504 .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1505 );
1506 let value: Value = response.json().await.context("invalid JSON")?;
1507 if let Some(errors) = value.get("errors") {
1508 bail!(
1509 "Query \"{}\" failed: {}",
1510 truncate_query_output_serialize(&query),
1511 errors
1512 );
1513 }
1514 return Ok(value);
1515 }
1516 unreachable!()
1517 }
1518
1519 pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
1520 let query = query.as_ref();
1521 self.run_graphql_query(&format!("query {{ {query} }}"))
1522 .await
1523 }
1524
1525 pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
1526 let query = query.as_ref().trim();
1527 let name = query
1528 .split_once(|ch: char| !ch.is_alphanumeric())
1529 .map_or(query, |(name, _)| name);
1530 let data = self.query(query).await?;
1531 serde_json::from_value(data[name].clone())
1532 .with_context(|| format!("{name} field missing in response"))
1533 }
1534
1535 pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
1536 let mutation = mutation.as_ref();
1537 self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
1538 .await
1539 }
1540}
1541
1542impl<A> From<String> for ApplicationWrapper<A> {
1543 fn from(uri: String) -> ApplicationWrapper<A> {
1544 ApplicationWrapper {
1545 uri,
1546 _phantom: PhantomData,
1547 }
1548 }
1549}