1use std::{
5 borrow::Cow,
6 collections::BTreeMap,
7 env,
8 marker::PhantomData,
9 mem,
10 path::{Path, PathBuf},
11 process::Stdio,
12 str::FromStr,
13 sync,
14 time::Duration,
15};
16
17use anyhow::{bail, ensure, Context, Result};
18use async_graphql::InputType;
19use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
20use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
21use heck::ToKebabCase;
22use linera_base::{
23 abi::ContractAbi,
24 command::{resolve_binary, CommandExt},
25 crypto::{CryptoHash, InMemorySigner},
26 data_types::{Amount, Bytecode, Epoch},
27 identifiers::{
28 Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
29 },
30 vm::VmRuntime,
31};
32use linera_client::{client_options::ResourceControlPolicyConfig, wallet::Wallet};
33use linera_core::worker::Notification;
34use linera_execution::committee::Committee;
35use linera_faucet_client::Faucet;
36use serde::{de::DeserializeOwned, ser::Serialize};
37use serde_command_opts::to_args;
38use serde_json::{json, Value};
39use tempfile::TempDir;
40use tokio::{
41 io::{AsyncBufReadExt, BufReader},
42 process::{Child, Command},
43 sync::oneshot,
44 task::JoinHandle,
45};
46use tracing::{error, info, warn};
47
48use crate::{
49 cli::command::BenchmarkCommand,
50 cli_wrappers::{
51 local_net::{PathProvider, ProcessInbox},
52 Network,
53 },
54 util::{self, ChildExt},
55};
56
57const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
60
61fn reqwest_client() -> reqwest::Client {
62 reqwest::ClientBuilder::new()
63 .timeout(Duration::from_secs(30))
64 .build()
65 .unwrap()
66}
67
68pub struct ClientWrapper {
70 binary_path: sync::Mutex<Option<PathBuf>>,
71 testing_prng_seed: Option<u64>,
72 storage: String,
73 wallet: String,
74 keystore: String,
75 max_pending_message_bundles: usize,
76 network: Network,
77 pub path_provider: PathProvider,
78 on_drop: OnClientDrop,
79 extra_args: Vec<String>,
80}
81
82#[derive(Clone, Copy, Debug, Eq, PartialEq)]
84pub enum OnClientDrop {
85 CloseChains,
87 LeakChains,
89}
90
91impl ClientWrapper {
92 pub fn new(
93 path_provider: PathProvider,
94 network: Network,
95 testing_prng_seed: Option<u64>,
96 id: usize,
97 on_drop: OnClientDrop,
98 ) -> Self {
99 Self::new_with_extra_args(
100 path_provider,
101 network,
102 testing_prng_seed,
103 id,
104 on_drop,
105 vec!["--wait-for-outgoing-messages".to_string()],
106 )
107 }
108
109 pub fn new_with_extra_args(
110 path_provider: PathProvider,
111 network: Network,
112 testing_prng_seed: Option<u64>,
113 id: usize,
114 on_drop: OnClientDrop,
115 extra_args: Vec<String>,
116 ) -> Self {
117 let storage = format!(
118 "rocksdb:{}/client_{}.db",
119 path_provider.path().display(),
120 id
121 );
122 let wallet = format!("wallet_{}.json", id);
123 let keystore = format!("keystore_{}.json", id);
124 Self {
125 binary_path: sync::Mutex::new(None),
126 testing_prng_seed,
127 storage,
128 wallet,
129 keystore,
130 max_pending_message_bundles: 10_000,
131 network,
132 path_provider,
133 on_drop,
134 extra_args,
135 }
136 }
137
138 pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
140 let tmp = TempDir::new()?;
141 let mut command = self.command().await?;
142 command
143 .current_dir(tmp.path())
144 .arg("project")
145 .arg("new")
146 .arg(project_name)
147 .arg("--linera-root")
148 .arg(linera_root)
149 .spawn_and_wait_for_stdout()
150 .await?;
151 Ok(tmp)
152 }
153
154 pub async fn project_publish<T: Serialize>(
156 &self,
157 path: PathBuf,
158 required_application_ids: Vec<String>,
159 publisher: impl Into<Option<ChainId>>,
160 argument: &T,
161 ) -> Result<String> {
162 let json_parameters = serde_json::to_string(&())?;
163 let json_argument = serde_json::to_string(argument)?;
164 let mut command = self.command().await?;
165 command
166 .arg("project")
167 .arg("publish-and-create")
168 .arg(path)
169 .args(publisher.into().iter().map(ChainId::to_string))
170 .args(["--json-parameters", &json_parameters])
171 .args(["--json-argument", &json_argument]);
172 if !required_application_ids.is_empty() {
173 command.arg("--required-application-ids");
174 command.args(required_application_ids);
175 }
176 let stdout = command.spawn_and_wait_for_stdout().await?;
177 Ok(stdout.trim().to_string())
178 }
179
180 pub async fn project_test(&self, path: &Path) -> Result<()> {
182 self.command()
183 .await
184 .context("failed to create project test command")?
185 .current_dir(path)
186 .arg("project")
187 .arg("test")
188 .spawn_and_wait_for_stdout()
189 .await?;
190 Ok(())
191 }
192
193 async fn command_with_envs_and_arguments(
194 &self,
195 envs: &[(&str, &str)],
196 arguments: impl IntoIterator<Item = Cow<'_, str>>,
197 ) -> Result<Command> {
198 let mut command = self.command_binary().await?;
199 command.current_dir(self.path_provider.path());
200 for (key, value) in envs {
201 command.env(key, value);
202 }
203 for argument in arguments {
204 command.arg(&*argument);
205 }
206 Ok(command)
207 }
208
209 async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
210 self.command_with_envs_and_arguments(envs, self.command_arguments())
211 .await
212 }
213
214 async fn command_with_arguments(
215 &self,
216 arguments: impl IntoIterator<Item = Cow<'_, str>>,
217 ) -> Result<Command> {
218 self.command_with_envs_and_arguments(
219 &[(
220 "RUST_LOG",
221 &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
222 )],
223 arguments,
224 )
225 .await
226 }
227
228 async fn command(&self) -> Result<Command> {
229 self.command_with_envs(&[(
230 "RUST_LOG",
231 &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
232 )])
233 .await
234 }
235
236 fn required_command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
237 [
238 "--wallet".into(),
239 self.wallet.as_str().into(),
240 "--keystore".into(),
241 self.keystore.as_str().into(),
242 "--storage".into(),
243 self.storage.as_str().into(),
244 "--send-timeout-ms".into(),
245 "500000".into(),
246 "--recv-timeout-ms".into(),
247 "500000".into(),
248 ]
249 .into_iter()
250 .chain(self.extra_args.iter().map(|s| s.as_str().into()))
251 }
252
253 fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
255 self.required_command_arguments().chain([
256 "--max-pending-message-bundles".into(),
257 self.max_pending_message_bundles.to_string().into(),
258 ])
259 }
260
261 async fn command_binary(&self) -> Result<Command> {
265 match self.command_with_cached_binary_path() {
266 Some(command) => Ok(command),
267 None => {
268 let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
269 let command = Command::new(&resolved_path);
270
271 self.set_cached_binary_path(resolved_path);
272
273 Ok(command)
274 }
275 }
276 }
277
278 fn command_with_cached_binary_path(&self) -> Option<Command> {
280 let binary_path = self.binary_path.lock().unwrap();
281
282 binary_path.as_ref().map(Command::new)
283 }
284
285 fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
293 let mut binary_path = self.binary_path.lock().unwrap();
294
295 if binary_path.is_none() {
296 *binary_path = Some(new_binary_path);
297 } else {
298 assert_eq!(*binary_path, Some(new_binary_path));
299 }
300 }
301
302 pub async fn create_genesis_config(
304 &self,
305 num_other_initial_chains: u32,
306 initial_funding: Amount,
307 policy_config: ResourceControlPolicyConfig,
308 http_allow_list: Option<Vec<String>>,
309 ) -> Result<()> {
310 let mut command = self.command().await?;
311 command
312 .args([
313 "create-genesis-config",
314 &num_other_initial_chains.to_string(),
315 ])
316 .args(["--initial-funding", &initial_funding.to_string()])
317 .args(["--committee", "committee.json"])
318 .args(["--genesis", "genesis.json"])
319 .args([
320 "--policy-config",
321 &policy_config.to_string().to_kebab_case(),
322 ]);
323 if let Some(allow_list) = http_allow_list {
324 command
325 .arg("--http-request-allow-list")
326 .arg(allow_list.join(","));
327 }
328 if let Some(seed) = self.testing_prng_seed {
329 command.arg("--testing-prng-seed").arg(seed.to_string());
330 }
331 command.spawn_and_wait_for_stdout().await?;
332 Ok(())
333 }
334
335 pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
338 let mut command = self.command().await?;
339 command.args(["wallet", "init"]);
340 match faucet {
341 None => command.args(["--genesis", "genesis.json"]),
342 Some(faucet) => command.args(["--faucet", faucet.url()]),
343 };
344 if let Some(seed) = self.testing_prng_seed {
345 command.arg("--testing-prng-seed").arg(seed.to_string());
346 }
347 command.spawn_and_wait_for_stdout().await?;
348 Ok(())
349 }
350
351 pub async fn request_chain(
353 &self,
354 faucet: &Faucet,
355 set_default: bool,
356 ) -> Result<(ChainId, AccountOwner)> {
357 let mut command = self.command().await?;
358 command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
359 if set_default {
360 command.arg("--set-default");
361 }
362 let stdout = command.spawn_and_wait_for_stdout().await?;
363 let mut lines = stdout.split_whitespace();
364 let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
365 let owner = lines.next().context("missing chain owner")?.parse()?;
366 Ok((chain_id, owner))
367 }
368
369 #[expect(clippy::too_many_arguments)]
371 pub async fn publish_and_create<
372 A: ContractAbi,
373 Parameters: Serialize,
374 InstantiationArgument: Serialize,
375 >(
376 &self,
377 contract: PathBuf,
378 service: PathBuf,
379 vm_runtime: VmRuntime,
380 parameters: &Parameters,
381 argument: &InstantiationArgument,
382 required_application_ids: &[ApplicationId],
383 publisher: impl Into<Option<ChainId>>,
384 ) -> Result<ApplicationId<A>> {
385 let json_parameters = serde_json::to_string(parameters)?;
386 let json_argument = serde_json::to_string(argument)?;
387 let mut command = self.command().await?;
388 let vm_runtime = format!("{}", vm_runtime);
389 command
390 .arg("publish-and-create")
391 .args([contract, service])
392 .args(["--vm-runtime", &vm_runtime.to_lowercase()])
393 .args(publisher.into().iter().map(ChainId::to_string))
394 .args(["--json-parameters", &json_parameters])
395 .args(["--json-argument", &json_argument]);
396 if !required_application_ids.is_empty() {
397 command.arg("--required-application-ids");
398 command.args(
399 required_application_ids
400 .iter()
401 .map(ApplicationId::to_string),
402 );
403 }
404 let stdout = command.spawn_and_wait_for_stdout().await?;
405 Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
406 }
407
408 pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
410 &self,
411 contract: PathBuf,
412 service: PathBuf,
413 vm_runtime: VmRuntime,
414 publisher: impl Into<Option<ChainId>>,
415 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
416 let stdout = self
417 .command()
418 .await?
419 .arg("publish-module")
420 .args([contract, service])
421 .args(["--vm-runtime", &format!("{}", vm_runtime).to_lowercase()])
422 .args(publisher.into().iter().map(ChainId::to_string))
423 .spawn_and_wait_for_stdout()
424 .await?;
425 let module_id: ModuleId = stdout.trim().parse()?;
426 Ok(module_id.with_abi())
427 }
428
429 pub async fn create_application<
431 Abi: ContractAbi,
432 Parameters: Serialize,
433 InstantiationArgument: Serialize,
434 >(
435 &self,
436 module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
437 parameters: &Parameters,
438 argument: &InstantiationArgument,
439 required_application_ids: &[ApplicationId],
440 creator: impl Into<Option<ChainId>>,
441 ) -> Result<ApplicationId<Abi>> {
442 let json_parameters = serde_json::to_string(parameters)?;
443 let json_argument = serde_json::to_string(argument)?;
444 let mut command = self.command().await?;
445 command
446 .arg("create-application")
447 .arg(module_id.forget_abi().to_string())
448 .args(["--json-parameters", &json_parameters])
449 .args(["--json-argument", &json_argument])
450 .args(creator.into().iter().map(ChainId::to_string));
451 if !required_application_ids.is_empty() {
452 command.arg("--required-application-ids");
453 command.args(
454 required_application_ids
455 .iter()
456 .map(ApplicationId::to_string),
457 );
458 }
459 let stdout = command.spawn_and_wait_for_stdout().await?;
460 Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
461 }
462
463 pub async fn run_node_service(
465 &self,
466 port: impl Into<Option<u16>>,
467 process_inbox: ProcessInbox,
468 ) -> Result<NodeService> {
469 let port = port.into().unwrap_or(8080);
470 let mut command = self.command().await?;
471 command.arg("service");
472 if let ProcessInbox::Skip = process_inbox {
473 command.arg("--listener-skip-process-inbox");
474 }
475 if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
476 command.args(var.split_whitespace());
477 }
478 let child = command
479 .args(["--port".to_string(), port.to_string()])
480 .spawn_into()?;
481 let client = reqwest_client();
482 for i in 0..10 {
483 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
484 let request = client
485 .get(format!("http://localhost:{}/", port))
486 .send()
487 .await;
488 if request.is_ok() {
489 info!("Node service has started");
490 return Ok(NodeService::new(port, child));
491 } else {
492 warn!("Waiting for node service to start");
493 }
494 }
495 bail!("Failed to start node service");
496 }
497
498 pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
500 let mut command = self.command().await?;
501 command.arg("query-validator").arg(address);
502 let stdout = command.spawn_and_wait_for_stdout().await?;
503 let hash = stdout
504 .trim()
505 .parse()
506 .context("error while parsing the result of `linera query-validator`")?;
507 Ok(hash)
508 }
509
510 pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
512 let mut command = self.command().await?;
513 command.arg("query-validators");
514 if let Some(chain_id) = chain_id {
515 command.arg(chain_id.to_string());
516 }
517 command.spawn_and_wait_for_stdout().await?;
518 Ok(())
519 }
520
521 pub async fn sync_validator(
523 &self,
524 chain_ids: impl IntoIterator<Item = &ChainId>,
525 validator_address: impl Into<String>,
526 ) -> Result<()> {
527 let mut command = self.command().await?;
528 command.arg("sync-validator").arg(validator_address.into());
529 let mut chain_ids = chain_ids.into_iter().peekable();
530 if chain_ids.peek().is_some() {
531 command
532 .arg("--chains")
533 .args(chain_ids.map(ChainId::to_string));
534 }
535 command.spawn_and_wait_for_stdout().await?;
536 Ok(())
537 }
538
539 pub async fn run_faucet(
541 &self,
542 port: impl Into<Option<u16>>,
543 chain_id: ChainId,
544 amount: Amount,
545 ) -> Result<FaucetService> {
546 let port = port.into().unwrap_or(8080);
547 let mut command = self.command().await?;
548 let child = command
549 .arg("faucet")
550 .arg(chain_id.to_string())
551 .args(["--port".to_string(), port.to_string()])
552 .args(["--amount".to_string(), amount.to_string()])
553 .spawn_into()?;
554 let client = reqwest_client();
555 for i in 0..10 {
556 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
557 let request = client
558 .get(format!("http://localhost:{}/", port))
559 .send()
560 .await;
561 if request.is_ok() {
562 info!("Faucet has started");
563 return Ok(FaucetService::new(port, child));
564 } else {
565 warn!("Waiting for faucet to start");
566 }
567 }
568 bail!("Failed to start faucet");
569 }
570
571 pub async fn local_balance(&self, account: Account) -> Result<Amount> {
573 let stdout = self
574 .command()
575 .await?
576 .arg("local-balance")
577 .arg(account.to_string())
578 .spawn_and_wait_for_stdout()
579 .await?;
580 let amount = stdout
581 .trim()
582 .parse()
583 .context("error while parsing the result of `linera local-balance`")?;
584 Ok(amount)
585 }
586
587 pub async fn query_balance(&self, account: Account) -> Result<Amount> {
589 let stdout = self
590 .command()
591 .await?
592 .arg("query-balance")
593 .arg(account.to_string())
594 .spawn_and_wait_for_stdout()
595 .await?;
596 let amount = stdout
597 .trim()
598 .parse()
599 .context("error while parsing the result of `linera query-balance`")?;
600 Ok(amount)
601 }
602
603 pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
605 self.command()
606 .await?
607 .arg("sync")
608 .arg(chain_id.to_string())
609 .spawn_and_wait_for_stdout()
610 .await?;
611 Ok(())
612 }
613
614 pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
616 self.command()
617 .await?
618 .arg("process-inbox")
619 .arg(chain_id.to_string())
620 .spawn_and_wait_for_stdout()
621 .await?;
622 Ok(())
623 }
624
625 pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
627 self.command()
628 .await?
629 .arg("transfer")
630 .arg(amount.to_string())
631 .args(["--from", &from.to_string()])
632 .args(["--to", &to.to_string()])
633 .spawn_and_wait_for_stdout()
634 .await?;
635 Ok(())
636 }
637
638 pub async fn transfer_with_silent_logs(
640 &self,
641 amount: Amount,
642 from: ChainId,
643 to: ChainId,
644 ) -> Result<()> {
645 self.command()
646 .await?
647 .env("RUST_LOG", "off")
648 .arg("transfer")
649 .arg(amount.to_string())
650 .args(["--from", &from.to_string()])
651 .args(["--to", &to.to_string()])
652 .spawn_and_wait_for_stdout()
653 .await?;
654 Ok(())
655 }
656
657 pub async fn transfer_with_accounts(
659 &self,
660 amount: Amount,
661 from: Account,
662 to: Account,
663 ) -> Result<()> {
664 self.command()
665 .await?
666 .arg("transfer")
667 .arg(amount.to_string())
668 .args(["--from", &from.to_string()])
669 .args(["--to", &to.to_string()])
670 .spawn_and_wait_for_stdout()
671 .await?;
672 Ok(())
673 }
674
675 fn benchmark_command_internal(command: &mut Command, args: BenchmarkCommand) -> Result<()> {
676 let mut formatted_args = to_args(&args)?;
677 let subcommand = formatted_args.remove(0);
678 formatted_args.remove(0);
681 let options = formatted_args
682 .chunks_exact(2)
683 .flat_map(|pair| {
684 let option = format!("--{}", pair[0]);
685 match pair[1].as_str() {
686 "true" => vec![option],
687 "false" => vec![],
688 _ => vec![option, pair[1].clone()],
689 }
690 })
691 .collect::<Vec<_>>();
692 command
693 .args([
694 "--max-pending-message-bundles",
695 &args.transactions_per_block().to_string(),
696 ])
697 .arg("benchmark")
698 .arg(subcommand)
699 .args(options);
700 Ok(())
701 }
702
703 async fn benchmark_command_with_envs(
704 &self,
705 args: BenchmarkCommand,
706 envs: &[(&str, &str)],
707 ) -> Result<Command> {
708 let mut command = self
709 .command_with_envs_and_arguments(envs, self.required_command_arguments())
710 .await?;
711 Self::benchmark_command_internal(&mut command, args)?;
712 Ok(command)
713 }
714
715 async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
716 let mut command = self
717 .command_with_arguments(self.required_command_arguments())
718 .await?;
719 Self::benchmark_command_internal(&mut command, args)?;
720 Ok(command)
721 }
722
723 pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
725 let mut command = self.benchmark_command(args).await?;
726 command.spawn_and_wait_for_stdout().await?;
727 Ok(())
728 }
729
730 pub async fn benchmark_detached(
733 &self,
734 args: BenchmarkCommand,
735 tx: oneshot::Sender<()>,
736 ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
737 let mut child = self
738 .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
739 .await?
740 .kill_on_drop(true)
741 .stdin(Stdio::piped())
742 .stdout(Stdio::piped())
743 .stderr(Stdio::piped())
744 .spawn()?;
745
746 let pid = child.id().expect("failed to get pid");
747 let stdout = child.stdout.take().expect("stdout not open");
748 let stdout_handle = tokio::spawn(async move {
749 let mut lines = BufReader::new(stdout).lines();
750 while let Ok(Some(line)) = lines.next_line().await {
751 println!("benchmark{{pid={pid}}} {line}");
752 }
753 });
754
755 let stderr = child.stderr.take().expect("stderr not open");
756 let stderr_handle = tokio::spawn(async move {
757 let mut lines = BufReader::new(stderr).lines();
758 let mut tx = Some(tx);
759 while let Ok(Some(line)) = lines.next_line().await {
760 if line.contains("Ready to start benchmark") {
761 tx.take()
762 .expect("Should only send signal once")
763 .send(())
764 .expect("failed to send ready signal to main thread");
765 } else {
766 println!("benchmark{{pid={pid}}} {line}");
767 }
768 }
769 });
770 Ok((child, stdout_handle, stderr_handle))
771 }
772
773 async fn open_chain_internal(
774 &self,
775 from: ChainId,
776 owner: Option<AccountOwner>,
777 initial_balance: Amount,
778 super_owner: bool,
779 ) -> Result<(ChainId, AccountOwner)> {
780 let mut command = self.command().await?;
781 command
782 .arg("open-chain")
783 .args(["--from", &from.to_string()])
784 .args(["--initial-balance", &initial_balance.to_string()]);
785
786 if let Some(owner) = owner {
787 command.args(["--owner", &owner.to_string()]);
788 }
789
790 if super_owner {
791 command.arg("--super-owner");
792 }
793
794 let stdout = command.spawn_and_wait_for_stdout().await?;
795 let mut split = stdout.split('\n');
796 let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
797 let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
798 if let Some(owner) = owner {
799 assert_eq!(owner, new_owner);
800 }
801 Ok((chain_id, new_owner))
802 }
803
804 pub async fn open_chain_super_owner(
806 &self,
807 from: ChainId,
808 owner: Option<AccountOwner>,
809 initial_balance: Amount,
810 ) -> Result<(ChainId, AccountOwner)> {
811 self.open_chain_internal(from, owner, initial_balance, true)
812 .await
813 }
814
815 pub async fn open_chain(
817 &self,
818 from: ChainId,
819 owner: Option<AccountOwner>,
820 initial_balance: Amount,
821 ) -> Result<(ChainId, AccountOwner)> {
822 self.open_chain_internal(from, owner, initial_balance, false)
823 .await
824 }
825
826 pub async fn open_and_assign(
828 &self,
829 client: &ClientWrapper,
830 initial_balance: Amount,
831 ) -> Result<ChainId> {
832 let our_chain = self
833 .load_wallet()?
834 .default_chain()
835 .context("no default chain found")?;
836 let owner = client.keygen().await?;
837 let (new_chain, _) = self
838 .open_chain(our_chain, Some(owner), initial_balance)
839 .await?;
840 client.assign(owner, new_chain).await?;
841 Ok(new_chain)
842 }
843
844 pub async fn open_multi_owner_chain(
845 &self,
846 from: ChainId,
847 owners: Vec<AccountOwner>,
848 weights: Vec<u64>,
849 multi_leader_rounds: u32,
850 balance: Amount,
851 base_timeout_ms: u64,
852 ) -> Result<ChainId> {
853 let mut command = self.command().await?;
854 command
855 .arg("open-multi-owner-chain")
856 .args(["--from", &from.to_string()])
857 .arg("--owners")
858 .args(owners.iter().map(AccountOwner::to_string))
859 .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
860 if !weights.is_empty() {
861 command
862 .arg("--owner-weights")
863 .args(weights.iter().map(u64::to_string));
864 };
865 command
866 .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
867 .args(["--initial-balance", &balance.to_string()]);
868
869 let stdout = command.spawn_and_wait_for_stdout().await?;
870 let mut split = stdout.split('\n');
871 let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
872
873 Ok(chain_id)
874 }
875
876 pub async fn change_ownership(
877 &self,
878 chain_id: ChainId,
879 super_owners: Vec<AccountOwner>,
880 owners: Vec<AccountOwner>,
881 ) -> Result<()> {
882 let mut command = self.command().await?;
883 command
884 .arg("change-ownership")
885 .args(["--chain-id", &chain_id.to_string()]);
886 if !super_owners.is_empty() {
887 command
888 .arg("--super-owners")
889 .args(super_owners.iter().map(AccountOwner::to_string));
890 }
891 if !owners.is_empty() {
892 command
893 .arg("--owners")
894 .args(owners.iter().map(AccountOwner::to_string));
895 }
896 command.spawn_and_wait_for_stdout().await?;
897 Ok(())
898 }
899
900 pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
902 let mut command = self.command().await?;
903 command
904 .args(["wallet", "follow-chain"])
905 .arg(chain_id.to_string());
906 if sync {
907 command.arg("--sync");
908 }
909 command.spawn_and_wait_for_stdout().await?;
910 Ok(())
911 }
912
913 pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
915 let mut command = self.command().await?;
916 command
917 .args(["wallet", "forget-chain"])
918 .arg(chain_id.to_string());
919 command.spawn_and_wait_for_stdout().await?;
920 Ok(())
921 }
922
923 pub async fn retry_pending_block(
924 &self,
925 chain_id: Option<ChainId>,
926 ) -> Result<Option<CryptoHash>> {
927 let mut command = self.command().await?;
928 command.arg("retry-pending-block");
929 if let Some(chain_id) = chain_id {
930 command.arg(chain_id.to_string());
931 }
932 let stdout = command.spawn_and_wait_for_stdout().await?;
933 let stdout = stdout.trim();
934 if stdout.is_empty() {
935 Ok(None)
936 } else {
937 Ok(Some(CryptoHash::from_str(stdout)?))
938 }
939 }
940
941 pub async fn publish_data_blob(
943 &self,
944 path: &Path,
945 chain_id: Option<ChainId>,
946 ) -> Result<CryptoHash> {
947 let mut command = self.command().await?;
948 command.arg("publish-data-blob").arg(path);
949 if let Some(chain_id) = chain_id {
950 command.arg(chain_id.to_string());
951 }
952 let stdout = command.spawn_and_wait_for_stdout().await?;
953 let stdout = stdout.trim();
954 Ok(CryptoHash::from_str(stdout)?)
955 }
956
957 pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
959 let mut command = self.command().await?;
960 command.arg("read-data-blob").arg(hash.to_string());
961 if let Some(chain_id) = chain_id {
962 command.arg(chain_id.to_string());
963 }
964 command.spawn_and_wait_for_stdout().await?;
965 Ok(())
966 }
967
968 pub fn load_wallet(&self) -> Result<Wallet> {
969 util::read_json(self.wallet_path())
970 }
971
972 pub fn load_keystore(&self) -> Result<InMemorySigner> {
973 util::read_json(self.keystore_path())
974 }
975
976 pub fn wallet_path(&self) -> PathBuf {
977 self.path_provider.path().join(&self.wallet)
978 }
979
980 pub fn keystore_path(&self) -> PathBuf {
981 self.path_provider.path().join(&self.keystore)
982 }
983
984 pub fn storage_path(&self) -> &str {
985 &self.storage
986 }
987
988 pub fn get_owner(&self) -> Option<AccountOwner> {
989 let wallet = self.load_wallet().ok()?;
990 let chain_id = wallet.default_chain()?;
991 wallet.get(chain_id)?.owner
992 }
993
994 pub fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
995 self.load_wallet()
996 .ok()
997 .is_some_and(|wallet| wallet.get(chain).is_some())
998 }
999
1000 pub async fn set_validator(
1001 &self,
1002 validator_key: &(String, String),
1003 port: usize,
1004 votes: usize,
1005 ) -> Result<()> {
1006 let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1007 self.command()
1008 .await?
1009 .arg("set-validator")
1010 .args(["--public-key", &validator_key.0])
1011 .args(["--account-key", &validator_key.1])
1012 .args(["--address", &address])
1013 .args(["--votes", &votes.to_string()])
1014 .spawn_and_wait_for_stdout()
1015 .await?;
1016 Ok(())
1017 }
1018
1019 pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
1020 self.command()
1021 .await?
1022 .arg("remove-validator")
1023 .args(["--public-key", validator_key])
1024 .spawn_and_wait_for_stdout()
1025 .await?;
1026 Ok(())
1027 }
1028
1029 pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
1030 self.command()
1031 .await?
1032 .arg("revoke-epochs")
1033 .arg(epoch.to_string())
1034 .spawn_and_wait_for_stdout()
1035 .await?;
1036 Ok(())
1037 }
1038
1039 pub async fn keygen(&self) -> Result<AccountOwner> {
1041 let stdout = self
1042 .command()
1043 .await?
1044 .arg("keygen")
1045 .spawn_and_wait_for_stdout()
1046 .await?;
1047 AccountOwner::from_str(stdout.as_str().trim())
1048 }
1049
1050 pub fn default_chain(&self) -> Option<ChainId> {
1052 self.load_wallet().ok()?.default_chain()
1053 }
1054
1055 pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
1057 let _stdout = self
1058 .command()
1059 .await?
1060 .arg("assign")
1061 .args(["--owner", &owner.to_string()])
1062 .args(["--chain-id", &chain_id.to_string()])
1063 .spawn_and_wait_for_stdout()
1064 .await?;
1065 Ok(())
1066 }
1067
1068 pub async fn set_preferred_owner(
1070 &self,
1071 chain_id: ChainId,
1072 owner: Option<AccountOwner>,
1073 ) -> Result<()> {
1074 let mut owner_arg = vec!["--owner".to_string()];
1075 if let Some(owner) = owner {
1076 owner_arg.push(owner.to_string());
1077 };
1078 self.command()
1079 .await?
1080 .arg("set-preferred-owner")
1081 .args(["--chain-id", &chain_id.to_string()])
1082 .args(owner_arg)
1083 .spawn_and_wait_for_stdout()
1084 .await?;
1085 Ok(())
1086 }
1087
1088 pub async fn build_application(
1089 &self,
1090 path: &Path,
1091 name: &str,
1092 is_workspace: bool,
1093 ) -> Result<(PathBuf, PathBuf)> {
1094 Command::new("cargo")
1095 .current_dir(self.path_provider.path())
1096 .arg("build")
1097 .arg("--release")
1098 .args(["--target", "wasm32-unknown-unknown"])
1099 .arg("--manifest-path")
1100 .arg(path.join("Cargo.toml"))
1101 .spawn_and_wait_for_stdout()
1102 .await?;
1103
1104 let release_dir = match is_workspace {
1105 true => path.join("../target/wasm32-unknown-unknown/release"),
1106 false => path.join("target/wasm32-unknown-unknown/release"),
1107 };
1108
1109 let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1110 let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1111
1112 let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1113 let service_size = fs_err::tokio::metadata(&service).await?.len();
1114 info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1115
1116 Ok((contract, service))
1117 }
1118}
1119
1120impl Drop for ClientWrapper {
1121 fn drop(&mut self) {
1122 use std::process::Command as SyncCommand;
1123
1124 if self.on_drop != OnClientDrop::CloseChains {
1125 return;
1126 }
1127
1128 let Ok(binary_path) = self.binary_path.lock() else {
1129 error!("Failed to close chains because a thread panicked with a lock to `binary_path`");
1130 return;
1131 };
1132
1133 let Some(binary_path) = binary_path.as_ref() else {
1134 warn!(
1135 "Assuming no chains need to be closed, because the command binary was never \
1136 resolved and therefore presumably never called"
1137 );
1138 return;
1139 };
1140
1141 let working_directory = self.path_provider.path();
1142 let mut wallet_show_command = SyncCommand::new(binary_path);
1143
1144 for argument in self.command_arguments() {
1145 wallet_show_command.arg(&*argument);
1146 }
1147
1148 let Ok(wallet_show_output) = wallet_show_command
1149 .current_dir(working_directory)
1150 .args(["wallet", "show", "--short", "--owned"])
1151 .output()
1152 else {
1153 warn!("Failed to execute `wallet show --short` to list chains to close");
1154 return;
1155 };
1156
1157 if !wallet_show_output.status.success() {
1158 warn!("Failed to list chains in the wallet to close them");
1159 return;
1160 }
1161
1162 let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1163 warn!(
1164 "Failed to close chains because `linera wallet show --short` \
1165 returned a non-UTF-8 output"
1166 );
1167 return;
1168 };
1169
1170 let chain_ids = chain_list_string
1171 .split('\n')
1172 .map(|line| line.trim())
1173 .filter(|line| !line.is_empty());
1174
1175 for chain_id in chain_ids {
1176 let mut close_chain_command = SyncCommand::new(binary_path);
1177
1178 for argument in self.command_arguments() {
1179 close_chain_command.arg(&*argument);
1180 }
1181
1182 close_chain_command.current_dir(working_directory);
1183
1184 match close_chain_command.args(["close-chain", chain_id]).status() {
1185 Ok(status) if status.success() => (),
1186 Ok(failure) => warn!("Failed to close chain {chain_id}: {failure}"),
1187 Err(error) => warn!("Failed to close chain {chain_id}: {error}"),
1188 }
1189 }
1190 }
1191}
1192
1193#[cfg(with_testing)]
1194impl ClientWrapper {
1195 pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1196 self.build_application(Self::example_path(name)?.as_path(), name, true)
1197 .await
1198 }
1199
1200 pub fn example_path(name: &str) -> Result<PathBuf> {
1201 Ok(env::current_dir()?.join("../examples/").join(name))
1202 }
1203}
1204
1205fn truncate_query_output(input: &str) -> String {
1206 let max_len = 1000;
1207 if input.len() < max_len {
1208 input.to_string()
1209 } else {
1210 format!("{} ...", input.get(..max_len).unwrap())
1211 }
1212}
1213
1214fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1215 let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1216 let max_len = 200;
1217 if query.len() < max_len {
1218 query
1219 } else {
1220 format!("{} ...", query.get(..max_len).unwrap())
1221 }
1222}
1223
1224pub struct NodeService {
1226 port: u16,
1227 child: Child,
1228}
1229
1230impl NodeService {
1231 fn new(port: u16, child: Child) -> Self {
1232 Self { port, child }
1233 }
1234
1235 pub async fn terminate(mut self) -> Result<()> {
1236 self.child.kill().await.context("terminating node service")
1237 }
1238
1239 pub fn port(&self) -> u16 {
1240 self.port
1241 }
1242
1243 pub fn ensure_is_running(&mut self) -> Result<()> {
1244 self.child.ensure_is_running()
1245 }
1246
1247 pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1248 let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1249 let mut data = self.query_node(query).await?;
1250 Ok(serde_json::from_value(data["processInbox"].take())?)
1251 }
1252
1253 pub async fn transfer(
1254 &self,
1255 chain_id: ChainId,
1256 owner: AccountOwner,
1257 recipient: Account,
1258 amount: Amount,
1259 ) -> Result<CryptoHash> {
1260 let json_owner = owner.to_value();
1261 let json_recipient = recipient.to_value();
1262 let query = format!(
1263 "mutation {{ transfer(\
1264 chainId: \"{chain_id}\", \
1265 owner: {json_owner}, \
1266 recipient: {json_recipient}, \
1267 amount: \"{amount}\") \
1268 }}"
1269 );
1270 let data = self.query_node(query).await?;
1271 serde_json::from_value(data["transfer"].clone())
1272 .context("missing transfer field in response")
1273 }
1274
1275 pub async fn balance(&self, account: &Account) -> Result<Amount> {
1276 let chain = account.chain_id;
1277 let owner = account.owner;
1278 if matches!(owner, AccountOwner::CHAIN) {
1279 let query = format!(
1280 "query {{ chain(chainId:\"{chain}\") {{
1281 executionState {{ system {{ balance }} }}
1282 }} }}"
1283 );
1284 let response = self.query_node(query).await?;
1285 let balance = &response["chain"]["executionState"]["system"]["balance"]
1286 .as_str()
1287 .unwrap();
1288 return Ok(Amount::from_str(balance)?);
1289 }
1290 let query = format!(
1291 "query {{ chain(chainId:\"{chain}\") {{
1292 executionState {{ system {{ balances {{
1293 entry(key:\"{owner}\") {{ value }}
1294 }} }} }}
1295 }} }}"
1296 );
1297 let response = self.query_node(query).await?;
1298 let balances = &response["chain"]["executionState"]["system"]["balances"];
1299 let balance = balances["entry"]["value"].as_str();
1300 match balance {
1301 None => Ok(Amount::ZERO),
1302 Some(amount) => Ok(Amount::from_str(amount)?),
1303 }
1304 }
1305
1306 pub fn make_application<A: ContractAbi>(
1307 &self,
1308 chain_id: &ChainId,
1309 application_id: &ApplicationId<A>,
1310 ) -> Result<ApplicationWrapper<A>> {
1311 let application_id = application_id.forget_abi().to_string();
1312 let link = format!(
1313 "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1314 self.port
1315 );
1316 Ok(ApplicationWrapper::from(link))
1317 }
1318
1319 pub async fn publish_data_blob(
1320 &self,
1321 chain_id: &ChainId,
1322 bytes: Vec<u8>,
1323 ) -> Result<CryptoHash> {
1324 let query = format!(
1325 "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1326 chain_id.to_value(),
1327 bytes.to_value(),
1328 );
1329 let data = self.query_node(query).await?;
1330 serde_json::from_value(data["publishDataBlob"].clone())
1331 .context("missing publishDataBlob field in response")
1332 }
1333
1334 pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1335 &self,
1336 chain_id: &ChainId,
1337 contract: PathBuf,
1338 service: PathBuf,
1339 vm_runtime: VmRuntime,
1340 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1341 let contract_code = Bytecode::load_from_file(&contract)?;
1342 let service_code = Bytecode::load_from_file(&service)?;
1343 let query = format!(
1344 "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1345 chain_id.to_value(),
1346 contract_code.to_value(),
1347 service_code.to_value(),
1348 vm_runtime.to_value(),
1349 );
1350 let data = self.query_node(query).await?;
1351 let module_str = data["publishModule"]
1352 .as_str()
1353 .context("module ID not found")?;
1354 let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1355 Ok(module_id.with_abi())
1356 }
1357
1358 pub async fn query_committees(&self, chain_id: &ChainId) -> Result<BTreeMap<Epoch, Committee>> {
1359 let query = format!(
1360 "query {{ chain(chainId:\"{chain_id}\") {{
1361 executionState {{ system {{ committees }} }}
1362 }} }}"
1363 );
1364 let mut response = self.query_node(query).await?;
1365 let committees = response["chain"]["executionState"]["system"]["committees"].take();
1366 Ok(serde_json::from_value(committees)?)
1367 }
1368
1369 pub async fn events_from_index(
1370 &self,
1371 chain_id: &ChainId,
1372 stream_id: &StreamId,
1373 start_index: u32,
1374 ) -> Result<Vec<IndexAndEvent>> {
1375 let query = format!(
1376 "query {{
1377 eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1378 {{ index event }}
1379 }}",
1380 stream_id.to_value()
1381 );
1382 let mut response = self.query_node(query).await?;
1383 let response = response["eventsFromIndex"].take();
1384 Ok(serde_json::from_value(response)?)
1385 }
1386
1387 pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1388 let n_try = 5;
1389 let query = query.as_ref();
1390 for i in 0..n_try {
1391 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1392 let url = format!("http://localhost:{}/", self.port);
1393 let client = reqwest_client();
1394 let result = client
1395 .post(url)
1396 .json(&json!({ "query": query }))
1397 .send()
1398 .await;
1399 if matches!(result, Err(ref error) if error.is_timeout()) {
1400 warn!(
1401 "Timeout when sending query {} to the node service",
1402 truncate_query_output(query)
1403 );
1404 continue;
1405 }
1406 let response = result.with_context(|| {
1407 format!(
1408 "query_node: failed to post query={}",
1409 truncate_query_output(query)
1410 )
1411 })?;
1412 ensure!(
1413 response.status().is_success(),
1414 "Query \"{}\" failed: {}",
1415 truncate_query_output(query),
1416 response
1417 .text()
1418 .await
1419 .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1420 );
1421 let value: Value = response.json().await.context("invalid JSON")?;
1422 if let Some(errors) = value.get("errors") {
1423 warn!(
1424 "Query \"{}\" failed: {}",
1425 truncate_query_output(query),
1426 errors
1427 );
1428 } else {
1429 return Ok(value["data"].clone());
1430 }
1431 }
1432 bail!(
1433 "Query \"{}\" failed after {} retries.",
1434 truncate_query_output(query),
1435 n_try
1436 );
1437 }
1438
1439 pub async fn create_application<
1440 Abi: ContractAbi,
1441 Parameters: Serialize,
1442 InstantiationArgument: Serialize,
1443 >(
1444 &self,
1445 chain_id: &ChainId,
1446 module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1447 parameters: &Parameters,
1448 argument: &InstantiationArgument,
1449 required_application_ids: &[ApplicationId],
1450 ) -> Result<ApplicationId<Abi>> {
1451 let module_id = module_id.forget_abi();
1452 let json_required_applications_ids = required_application_ids
1453 .iter()
1454 .map(ApplicationId::to_string)
1455 .collect::<Vec<_>>()
1456 .to_value();
1457 let new_parameters = serde_json::to_value(parameters)
1459 .context("could not create parameters JSON")?
1460 .to_value();
1461 let new_argument = serde_json::to_value(argument)
1462 .context("could not create argument JSON")?
1463 .to_value();
1464 let query = format!(
1465 "mutation {{ createApplication(\
1466 chainId: \"{chain_id}\",
1467 moduleId: \"{module_id}\", \
1468 parameters: {new_parameters}, \
1469 instantiationArgument: {new_argument}, \
1470 requiredApplicationIds: {json_required_applications_ids}) \
1471 }}"
1472 );
1473 let data = self.query_node(query).await?;
1474 let app_id_str = data["createApplication"]
1475 .as_str()
1476 .context("missing createApplication string in response")?
1477 .trim();
1478 Ok(app_id_str
1479 .parse::<ApplicationId>()
1480 .context("invalid application ID")?
1481 .with_abi())
1482 }
1483
1484 pub async fn chain_tip_hash(&self, chain: ChainId) -> Result<Option<CryptoHash>> {
1486 let query = format!(r#"query {{ block(chainId: "{chain}") {{ hash }} }}"#);
1487
1488 let mut response = self.query_node(&query).await?;
1489
1490 match mem::take(&mut response["block"]["hash"]) {
1491 Value::Null => Ok(None),
1492 Value::String(hash) => Ok(Some(
1493 hash.parse()
1494 .context("Received an invalid hash {hash:?} for chain tip")?,
1495 )),
1496 invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
1497 }
1498 }
1499
1500 pub async fn notifications(
1502 &self,
1503 chain_id: ChainId,
1504 ) -> Result<impl Stream<Item = Result<Notification>>> {
1505 let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
1506 let url = format!("ws://localhost:{}/ws", self.port);
1507 let mut request = url.into_client_request()?;
1508 request.headers_mut().insert(
1509 "Sec-WebSocket-Protocol",
1510 HeaderValue::from_str("graphql-transport-ws")?,
1511 );
1512 let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1513 let init_json = json!({
1514 "type": "connection_init",
1515 "payload": {}
1516 });
1517 websocket.send(init_json.to_string().into()).await?;
1518 let text = websocket
1519 .next()
1520 .await
1521 .context("Failed to establish connection")??
1522 .into_text()?;
1523 ensure!(
1524 text == "{\"type\":\"connection_ack\"}",
1525 "Unexpected response: {text}"
1526 );
1527 let query_json = json!({
1528 "id": "1",
1529 "type": "start",
1530 "payload": {
1531 "query": query,
1532 "variables": {},
1533 "operationName": null
1534 }
1535 });
1536 websocket.send(query_json.to_string().into()).await?;
1537 Ok(websocket
1538 .map_err(anyhow::Error::from)
1539 .and_then(|message| async {
1540 let text = message.into_text()?;
1541 let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1542 if let Some(errors) = value["payload"].get("errors") {
1543 bail!("Notification subscription failed: {errors:?}");
1544 }
1545 serde_json::from_value(value["payload"]["data"]["notifications"].clone())
1546 .context("Failed to deserialize notification")
1547 }))
1548 }
1549}
1550
1551pub struct FaucetService {
1553 port: u16,
1554 child: Child,
1555}
1556
1557impl FaucetService {
1558 fn new(port: u16, child: Child) -> Self {
1559 Self { port, child }
1560 }
1561
1562 pub async fn terminate(mut self) -> Result<()> {
1563 self.child
1564 .kill()
1565 .await
1566 .context("terminating faucet service")
1567 }
1568
1569 pub fn ensure_is_running(&mut self) -> Result<()> {
1570 self.child.ensure_is_running()
1571 }
1572
1573 pub fn instance(&self) -> Faucet {
1574 Faucet::new(format!("http://localhost:{}/", self.port))
1575 }
1576}
1577
1578pub struct ApplicationWrapper<A> {
1580 uri: String,
1581 _phantom: PhantomData<A>,
1582}
1583
1584impl<A> ApplicationWrapper<A> {
1585 pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
1586 let query = query.as_ref();
1587 let value = self.run_json_query(json!({ "query": query })).await?;
1588 Ok(value["data"].clone())
1589 }
1590
1591 pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
1592 const MAX_RETRIES: usize = 5;
1593
1594 for i in 0.. {
1595 let client = reqwest_client();
1596 let result = client.post(&self.uri).json(&query).send().await;
1597 let response = match result {
1598 Ok(response) => response,
1599 Err(error) if i < MAX_RETRIES => {
1600 warn!(
1601 "Failed to post query \"{}\": {error}; retrying",
1602 truncate_query_output_serialize(&query),
1603 );
1604 continue;
1605 }
1606 Err(error) => {
1607 let query = truncate_query_output_serialize(&query);
1608 return Err(error)
1609 .with_context(|| format!("run_json_query: failed to post query={query}"));
1610 }
1611 };
1612 ensure!(
1613 response.status().is_success(),
1614 "Query \"{}\" failed: {}",
1615 truncate_query_output_serialize(&query),
1616 response
1617 .text()
1618 .await
1619 .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1620 );
1621 let value: Value = response.json().await.context("invalid JSON")?;
1622 if let Some(errors) = value.get("errors") {
1623 bail!(
1624 "Query \"{}\" failed: {}",
1625 truncate_query_output_serialize(&query),
1626 errors
1627 );
1628 }
1629 return Ok(value);
1630 }
1631 unreachable!()
1632 }
1633
1634 pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
1635 let query = query.as_ref();
1636 self.run_graphql_query(&format!("query {{ {query} }}"))
1637 .await
1638 }
1639
1640 pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
1641 let query = query.as_ref().trim();
1642 let name = query
1643 .split_once(|ch: char| !ch.is_alphanumeric())
1644 .map_or(query, |(name, _)| name);
1645 let data = self.query(query).await?;
1646 serde_json::from_value(data[name].clone())
1647 .with_context(|| format!("{name} field missing in response"))
1648 }
1649
1650 pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
1651 let mutation = mutation.as_ref();
1652 self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
1653 .await
1654 }
1655}
1656
1657impl<A> From<String> for ApplicationWrapper<A> {
1658 fn from(uri: String) -> ApplicationWrapper<A> {
1659 ApplicationWrapper {
1660 uri,
1661 _phantom: PhantomData,
1662 }
1663 }
1664}