1use std::{
5 borrow::Cow,
6 collections::BTreeMap,
7 env,
8 marker::PhantomData,
9 mem,
10 path::{Path, PathBuf},
11 str::FromStr,
12 sync,
13 time::Duration,
14};
15
16use anyhow::{bail, ensure, Context, Result};
17use async_graphql::InputType;
18use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
19use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
20use heck::ToKebabCase;
21use linera_base::{
22 abi::ContractAbi,
23 command::{resolve_binary, CommandExt},
24 crypto::{CryptoHash, InMemorySigner},
25 data_types::{Amount, Bytecode, Epoch},
26 identifiers::{
27 Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
28 },
29 vm::VmRuntime,
30};
31use linera_client::{client_options::ResourceControlPolicyConfig, wallet::Wallet};
32use linera_core::worker::Notification;
33use linera_execution::committee::Committee;
34use linera_faucet_client::Faucet;
35use serde::{de::DeserializeOwned, ser::Serialize};
36use serde_json::{json, Value};
37use tempfile::TempDir;
38use tokio::process::{Child, Command};
39use tracing::{error, info, warn};
40#[cfg(feature = "benchmark")]
41use {
42 crate::cli::command::BenchmarkCommand,
43 serde_command_opts::to_args,
44 std::process::Stdio,
45 tokio::{
46 io::{AsyncBufReadExt, BufReader},
47 sync::oneshot,
48 task::JoinHandle,
49 },
50};
51
52use crate::{
53 cli_wrappers::{
54 local_net::{PathProvider, ProcessInbox},
55 Network,
56 },
57 util::{self, ChildExt},
58};
59
60const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
63
64fn reqwest_client() -> reqwest::Client {
65 reqwest::ClientBuilder::new()
66 .timeout(Duration::from_secs(30))
67 .build()
68 .unwrap()
69}
70
71pub struct ClientWrapper {
73 binary_path: sync::Mutex<Option<PathBuf>>,
74 testing_prng_seed: Option<u64>,
75 storage: String,
76 wallet: String,
77 keystore: String,
78 max_pending_message_bundles: usize,
79 network: Network,
80 pub path_provider: PathProvider,
81 on_drop: OnClientDrop,
82 extra_args: Vec<String>,
83}
84
85#[derive(Clone, Copy, Debug, Eq, PartialEq)]
87pub enum OnClientDrop {
88 CloseChains,
90 LeakChains,
92}
93
94impl ClientWrapper {
95 pub fn new(
96 path_provider: PathProvider,
97 network: Network,
98 testing_prng_seed: Option<u64>,
99 id: usize,
100 on_drop: OnClientDrop,
101 ) -> Self {
102 Self::new_with_extra_args(
103 path_provider,
104 network,
105 testing_prng_seed,
106 id,
107 on_drop,
108 vec!["--wait-for-outgoing-messages".to_string()],
109 )
110 }
111
112 pub fn new_with_extra_args(
113 path_provider: PathProvider,
114 network: Network,
115 testing_prng_seed: Option<u64>,
116 id: usize,
117 on_drop: OnClientDrop,
118 extra_args: Vec<String>,
119 ) -> Self {
120 let storage = format!(
121 "rocksdb:{}/client_{}.db",
122 path_provider.path().display(),
123 id
124 );
125 let wallet = format!("wallet_{}.json", id);
126 let keystore = format!("keystore_{}.json", id);
127 Self {
128 binary_path: sync::Mutex::new(None),
129 testing_prng_seed,
130 storage,
131 wallet,
132 keystore,
133 max_pending_message_bundles: 10_000,
134 network,
135 path_provider,
136 on_drop,
137 extra_args,
138 }
139 }
140
141 pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
143 let tmp = TempDir::new()?;
144 let mut command = self.command().await?;
145 command
146 .current_dir(tmp.path())
147 .arg("project")
148 .arg("new")
149 .arg(project_name)
150 .arg("--linera-root")
151 .arg(linera_root)
152 .spawn_and_wait_for_stdout()
153 .await?;
154 Ok(tmp)
155 }
156
157 pub async fn project_publish<T: Serialize>(
159 &self,
160 path: PathBuf,
161 required_application_ids: Vec<String>,
162 publisher: impl Into<Option<ChainId>>,
163 argument: &T,
164 ) -> Result<String> {
165 let json_parameters = serde_json::to_string(&())?;
166 let json_argument = serde_json::to_string(argument)?;
167 let mut command = self.command().await?;
168 command
169 .arg("project")
170 .arg("publish-and-create")
171 .arg(path)
172 .args(publisher.into().iter().map(ChainId::to_string))
173 .args(["--json-parameters", &json_parameters])
174 .args(["--json-argument", &json_argument]);
175 if !required_application_ids.is_empty() {
176 command.arg("--required-application-ids");
177 command.args(required_application_ids);
178 }
179 let stdout = command.spawn_and_wait_for_stdout().await?;
180 Ok(stdout.trim().to_string())
181 }
182
183 pub async fn project_test(&self, path: &Path) -> Result<()> {
185 self.command()
186 .await
187 .context("failed to create project test command")?
188 .current_dir(path)
189 .arg("project")
190 .arg("test")
191 .spawn_and_wait_for_stdout()
192 .await?;
193 Ok(())
194 }
195
196 async fn command_with_envs_and_arguments(
197 &self,
198 envs: &[(&str, &str)],
199 arguments: impl IntoIterator<Item = Cow<'_, str>>,
200 ) -> Result<Command> {
201 let mut command = self.command_binary().await?;
202 command.current_dir(self.path_provider.path());
203 for (key, value) in envs {
204 command.env(key, value);
205 }
206 for argument in arguments {
207 command.arg(&*argument);
208 }
209 Ok(command)
210 }
211
212 async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
213 self.command_with_envs_and_arguments(envs, self.command_arguments())
214 .await
215 }
216
217 #[cfg(feature = "benchmark")]
218 async fn command_with_arguments(
219 &self,
220 arguments: impl IntoIterator<Item = Cow<'_, str>>,
221 ) -> Result<Command> {
222 self.command_with_envs_and_arguments(
223 &[(
224 "RUST_LOG",
225 &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
226 )],
227 arguments,
228 )
229 .await
230 }
231
232 async fn command(&self) -> Result<Command> {
233 self.command_with_envs(&[(
234 "RUST_LOG",
235 &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
236 )])
237 .await
238 }
239
240 fn required_command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
241 [
242 "--wallet".into(),
243 self.wallet.as_str().into(),
244 "--keystore".into(),
245 self.keystore.as_str().into(),
246 "--storage".into(),
247 self.storage.as_str().into(),
248 "--send-timeout-ms".into(),
249 "500000".into(),
250 "--recv-timeout-ms".into(),
251 "500000".into(),
252 ]
253 .into_iter()
254 .chain(self.extra_args.iter().map(|s| s.as_str().into()))
255 }
256
257 fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
259 self.required_command_arguments().chain([
260 "--max-pending-message-bundles".into(),
261 self.max_pending_message_bundles.to_string().into(),
262 ])
263 }
264
265 async fn command_binary(&self) -> Result<Command> {
269 match self.command_with_cached_binary_path() {
270 Some(command) => Ok(command),
271 None => {
272 let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
273 let command = Command::new(&resolved_path);
274
275 self.set_cached_binary_path(resolved_path);
276
277 Ok(command)
278 }
279 }
280 }
281
282 fn command_with_cached_binary_path(&self) -> Option<Command> {
284 let binary_path = self.binary_path.lock().unwrap();
285
286 binary_path.as_ref().map(Command::new)
287 }
288
289 fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
297 let mut binary_path = self.binary_path.lock().unwrap();
298
299 if binary_path.is_none() {
300 *binary_path = Some(new_binary_path);
301 } else {
302 assert_eq!(*binary_path, Some(new_binary_path));
303 }
304 }
305
306 pub async fn create_genesis_config(
308 &self,
309 num_other_initial_chains: u32,
310 initial_funding: Amount,
311 policy_config: ResourceControlPolicyConfig,
312 http_allow_list: Option<Vec<String>>,
313 ) -> Result<()> {
314 let mut command = self.command().await?;
315 command
316 .args([
317 "create-genesis-config",
318 &num_other_initial_chains.to_string(),
319 ])
320 .args(["--initial-funding", &initial_funding.to_string()])
321 .args(["--committee", "committee.json"])
322 .args(["--genesis", "genesis.json"])
323 .args([
324 "--policy-config",
325 &policy_config.to_string().to_kebab_case(),
326 ]);
327 if let Some(allow_list) = http_allow_list {
328 command
329 .arg("--http-request-allow-list")
330 .arg(allow_list.join(","));
331 }
332 if let Some(seed) = self.testing_prng_seed {
333 command.arg("--testing-prng-seed").arg(seed.to_string());
334 }
335 command.spawn_and_wait_for_stdout().await?;
336 Ok(())
337 }
338
339 pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
342 let mut command = self.command().await?;
343 command.args(["wallet", "init"]);
344 match faucet {
345 None => command.args(["--genesis", "genesis.json"]),
346 Some(faucet) => command.args(["--faucet", faucet.url()]),
347 };
348 if let Some(seed) = self.testing_prng_seed {
349 command.arg("--testing-prng-seed").arg(seed.to_string());
350 }
351 command.spawn_and_wait_for_stdout().await?;
352 Ok(())
353 }
354
355 pub async fn request_chain(
357 &self,
358 faucet: &Faucet,
359 set_default: bool,
360 ) -> Result<(ChainId, AccountOwner)> {
361 let mut command = self.command().await?;
362 command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
363 if set_default {
364 command.arg("--set-default");
365 }
366 let stdout = command.spawn_and_wait_for_stdout().await?;
367 let mut lines = stdout.split_whitespace();
368 let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
369 let owner = lines.next().context("missing chain owner")?.parse()?;
370 Ok((chain_id, owner))
371 }
372
373 #[expect(clippy::too_many_arguments)]
375 pub async fn publish_and_create<
376 A: ContractAbi,
377 Parameters: Serialize,
378 InstantiationArgument: Serialize,
379 >(
380 &self,
381 contract: PathBuf,
382 service: PathBuf,
383 vm_runtime: VmRuntime,
384 parameters: &Parameters,
385 argument: &InstantiationArgument,
386 required_application_ids: &[ApplicationId],
387 publisher: impl Into<Option<ChainId>>,
388 ) -> Result<ApplicationId<A>> {
389 let json_parameters = serde_json::to_string(parameters)?;
390 let json_argument = serde_json::to_string(argument)?;
391 let mut command = self.command().await?;
392 let vm_runtime = format!("{}", vm_runtime);
393 command
394 .arg("publish-and-create")
395 .args([contract, service])
396 .args(["--vm-runtime", &vm_runtime.to_lowercase()])
397 .args(publisher.into().iter().map(ChainId::to_string))
398 .args(["--json-parameters", &json_parameters])
399 .args(["--json-argument", &json_argument]);
400 if !required_application_ids.is_empty() {
401 command.arg("--required-application-ids");
402 command.args(
403 required_application_ids
404 .iter()
405 .map(ApplicationId::to_string),
406 );
407 }
408 let stdout = command.spawn_and_wait_for_stdout().await?;
409 Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
410 }
411
412 pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
414 &self,
415 contract: PathBuf,
416 service: PathBuf,
417 vm_runtime: VmRuntime,
418 publisher: impl Into<Option<ChainId>>,
419 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
420 let stdout = self
421 .command()
422 .await?
423 .arg("publish-module")
424 .args([contract, service])
425 .args(["--vm-runtime", &format!("{}", vm_runtime).to_lowercase()])
426 .args(publisher.into().iter().map(ChainId::to_string))
427 .spawn_and_wait_for_stdout()
428 .await?;
429 let module_id: ModuleId = stdout.trim().parse()?;
430 Ok(module_id.with_abi())
431 }
432
433 pub async fn create_application<
435 Abi: ContractAbi,
436 Parameters: Serialize,
437 InstantiationArgument: Serialize,
438 >(
439 &self,
440 module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
441 parameters: &Parameters,
442 argument: &InstantiationArgument,
443 required_application_ids: &[ApplicationId],
444 creator: impl Into<Option<ChainId>>,
445 ) -> Result<ApplicationId<Abi>> {
446 let json_parameters = serde_json::to_string(parameters)?;
447 let json_argument = serde_json::to_string(argument)?;
448 let mut command = self.command().await?;
449 command
450 .arg("create-application")
451 .arg(module_id.forget_abi().to_string())
452 .args(["--json-parameters", &json_parameters])
453 .args(["--json-argument", &json_argument])
454 .args(creator.into().iter().map(ChainId::to_string));
455 if !required_application_ids.is_empty() {
456 command.arg("--required-application-ids");
457 command.args(
458 required_application_ids
459 .iter()
460 .map(ApplicationId::to_string),
461 );
462 }
463 let stdout = command.spawn_and_wait_for_stdout().await?;
464 Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
465 }
466
467 pub async fn run_node_service(
469 &self,
470 port: impl Into<Option<u16>>,
471 process_inbox: ProcessInbox,
472 ) -> Result<NodeService> {
473 let port = port.into().unwrap_or(8080);
474 let mut command = self.command().await?;
475 command.arg("service");
476 if let ProcessInbox::Skip = process_inbox {
477 command.arg("--listener-skip-process-inbox");
478 }
479 if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
480 command.args(var.split_whitespace());
481 }
482 let child = command
483 .args(["--port".to_string(), port.to_string()])
484 .spawn_into()?;
485 let client = reqwest_client();
486 for i in 0..10 {
487 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
488 let request = client
489 .get(format!("http://localhost:{}/", port))
490 .send()
491 .await;
492 if request.is_ok() {
493 info!("Node service has started");
494 return Ok(NodeService::new(port, child));
495 } else {
496 warn!("Waiting for node service to start");
497 }
498 }
499 bail!("Failed to start node service");
500 }
501
502 pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
504 let mut command = self.command().await?;
505 command.arg("query-validator").arg(address);
506 let stdout = command.spawn_and_wait_for_stdout().await?;
507 let hash = stdout
508 .trim()
509 .parse()
510 .context("error while parsing the result of `linera query-validator`")?;
511 Ok(hash)
512 }
513
514 pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
516 let mut command = self.command().await?;
517 command.arg("query-validators");
518 if let Some(chain_id) = chain_id {
519 command.arg(chain_id.to_string());
520 }
521 command.spawn_and_wait_for_stdout().await?;
522 Ok(())
523 }
524
525 pub async fn sync_validator(
527 &self,
528 chain_ids: impl IntoIterator<Item = &ChainId>,
529 validator_address: impl Into<String>,
530 ) -> Result<()> {
531 let mut command = self.command().await?;
532 command.arg("sync-validator").arg(validator_address.into());
533 let mut chain_ids = chain_ids.into_iter().peekable();
534 if chain_ids.peek().is_some() {
535 command
536 .arg("--chains")
537 .args(chain_ids.map(ChainId::to_string));
538 }
539 command.spawn_and_wait_for_stdout().await?;
540 Ok(())
541 }
542
543 pub async fn run_faucet(
545 &self,
546 port: impl Into<Option<u16>>,
547 chain_id: ChainId,
548 amount: Amount,
549 ) -> Result<FaucetService> {
550 let port = port.into().unwrap_or(8080);
551 let mut command = self.command().await?;
552 let child = command
553 .arg("faucet")
554 .arg(chain_id.to_string())
555 .args(["--port".to_string(), port.to_string()])
556 .args(["--amount".to_string(), amount.to_string()])
557 .spawn_into()?;
558 let client = reqwest_client();
559 for i in 0..10 {
560 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
561 let request = client
562 .get(format!("http://localhost:{}/", port))
563 .send()
564 .await;
565 if request.is_ok() {
566 info!("Faucet has started");
567 return Ok(FaucetService::new(port, child));
568 } else {
569 warn!("Waiting for faucet to start");
570 }
571 }
572 bail!("Failed to start faucet");
573 }
574
575 pub async fn local_balance(&self, account: Account) -> Result<Amount> {
577 let stdout = self
578 .command()
579 .await?
580 .arg("local-balance")
581 .arg(account.to_string())
582 .spawn_and_wait_for_stdout()
583 .await?;
584 let amount = stdout
585 .trim()
586 .parse()
587 .context("error while parsing the result of `linera local-balance`")?;
588 Ok(amount)
589 }
590
591 pub async fn query_balance(&self, account: Account) -> Result<Amount> {
593 let stdout = self
594 .command()
595 .await?
596 .arg("query-balance")
597 .arg(account.to_string())
598 .spawn_and_wait_for_stdout()
599 .await?;
600 let amount = stdout
601 .trim()
602 .parse()
603 .context("error while parsing the result of `linera query-balance`")?;
604 Ok(amount)
605 }
606
607 pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
609 self.command()
610 .await?
611 .arg("sync")
612 .arg(chain_id.to_string())
613 .spawn_and_wait_for_stdout()
614 .await?;
615 Ok(())
616 }
617
618 pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
620 self.command()
621 .await?
622 .arg("process-inbox")
623 .arg(chain_id.to_string())
624 .spawn_and_wait_for_stdout()
625 .await?;
626 Ok(())
627 }
628
629 pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
631 self.command()
632 .await?
633 .arg("transfer")
634 .arg(amount.to_string())
635 .args(["--from", &from.to_string()])
636 .args(["--to", &to.to_string()])
637 .spawn_and_wait_for_stdout()
638 .await?;
639 Ok(())
640 }
641
642 pub async fn transfer_with_silent_logs(
644 &self,
645 amount: Amount,
646 from: ChainId,
647 to: ChainId,
648 ) -> Result<()> {
649 self.command()
650 .await?
651 .env("RUST_LOG", "off")
652 .arg("transfer")
653 .arg(amount.to_string())
654 .args(["--from", &from.to_string()])
655 .args(["--to", &to.to_string()])
656 .spawn_and_wait_for_stdout()
657 .await?;
658 Ok(())
659 }
660
661 pub async fn transfer_with_accounts(
663 &self,
664 amount: Amount,
665 from: Account,
666 to: Account,
667 ) -> Result<()> {
668 self.command()
669 .await?
670 .arg("transfer")
671 .arg(amount.to_string())
672 .args(["--from", &from.to_string()])
673 .args(["--to", &to.to_string()])
674 .spawn_and_wait_for_stdout()
675 .await?;
676 Ok(())
677 }
678
679 #[cfg(feature = "benchmark")]
680 fn benchmark_command_internal(command: &mut Command, args: BenchmarkCommand) -> Result<()> {
681 let formatted_args = to_args(&args)?
682 .chunks_exact(2)
683 .flat_map(|pair| {
684 let option = format!("--{}", pair[0]);
685 match pair[1].as_str() {
686 "true" => vec![option],
687 "false" => vec![],
688 _ => vec![option, pair[1].clone()],
689 }
690 })
691 .collect::<Vec<_>>();
692 command
693 .args([
695 "--max-pending-message-bundles",
696 &args.transactions_per_block.to_string(),
697 ])
698 .arg("benchmark")
699 .args(formatted_args);
700 Ok(())
701 }
702
703 #[cfg(feature = "benchmark")]
704 async fn benchmark_command_with_envs(
705 &self,
706 args: BenchmarkCommand,
707 envs: &[(&str, &str)],
708 ) -> Result<Command> {
709 let mut command = self
710 .command_with_envs_and_arguments(envs, self.required_command_arguments())
711 .await?;
712 Self::benchmark_command_internal(&mut command, args)?;
713 Ok(command)
714 }
715
716 #[cfg(feature = "benchmark")]
717 async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
718 let mut command = self
719 .command_with_arguments(self.required_command_arguments())
720 .await?;
721 Self::benchmark_command_internal(&mut command, args)?;
722 Ok(command)
723 }
724
725 #[cfg(feature = "benchmark")]
727 pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
728 let mut command = self.benchmark_command(args).await?;
729 command.spawn_and_wait_for_stdout().await?;
730 Ok(())
731 }
732
733 #[cfg(feature = "benchmark")]
736 pub async fn benchmark_detached(
737 &self,
738 args: BenchmarkCommand,
739 tx: oneshot::Sender<()>,
740 ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
741 let mut child = self
742 .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
743 .await?
744 .kill_on_drop(true)
745 .stdin(Stdio::piped())
746 .stdout(Stdio::piped())
747 .stderr(Stdio::piped())
748 .spawn()?;
749
750 let pid = child.id().expect("failed to get pid");
751 let stdout = child.stdout.take().expect("stdout not open");
752 let stdout_handle = tokio::spawn(async move {
753 let mut lines = BufReader::new(stdout).lines();
754 while let Ok(Some(line)) = lines.next_line().await {
755 println!("benchmark{{pid={pid}}} {line}");
756 }
757 });
758
759 let stderr = child.stderr.take().expect("stderr not open");
760 let stderr_handle = tokio::spawn(async move {
761 let mut lines = BufReader::new(stderr).lines();
762 let mut tx = Some(tx);
763 while let Ok(Some(line)) = lines.next_line().await {
764 if line.contains("Ready to start benchmark") {
765 tx.take()
766 .expect("Should only send signal once")
767 .send(())
768 .expect("failed to send ready signal to main thread");
769 } else {
770 println!("benchmark{{pid={pid}}} {line}");
771 }
772 }
773 });
774 Ok((child, stdout_handle, stderr_handle))
775 }
776
777 pub async fn open_chain(
779 &self,
780 from: ChainId,
781 owner: Option<AccountOwner>,
782 initial_balance: Amount,
783 ) -> Result<(ChainId, AccountOwner)> {
784 let mut command = self.command().await?;
785 command
786 .arg("open-chain")
787 .args(["--from", &from.to_string()])
788 .args(["--initial-balance", &initial_balance.to_string()]);
789
790 if let Some(owner) = owner {
791 command.args(["--owner", &owner.to_string()]);
792 }
793
794 let stdout = command.spawn_and_wait_for_stdout().await?;
795 let mut split = stdout.split('\n');
796 let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
797 let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
798 if let Some(owner) = owner {
799 assert_eq!(owner, new_owner);
800 }
801 Ok((chain_id, new_owner))
802 }
803
804 pub async fn open_and_assign(
806 &self,
807 client: &ClientWrapper,
808 initial_balance: Amount,
809 ) -> Result<ChainId> {
810 let our_chain = self
811 .load_wallet()?
812 .default_chain()
813 .context("no default chain found")?;
814 let owner = client.keygen().await?;
815 let (new_chain, _) = self
816 .open_chain(our_chain, Some(owner), initial_balance)
817 .await?;
818 client.assign(owner, new_chain).await?;
819 Ok(new_chain)
820 }
821
822 pub async fn open_multi_owner_chain(
823 &self,
824 from: ChainId,
825 owners: Vec<AccountOwner>,
826 weights: Vec<u64>,
827 multi_leader_rounds: u32,
828 balance: Amount,
829 base_timeout_ms: u64,
830 ) -> Result<ChainId> {
831 let mut command = self.command().await?;
832 command
833 .arg("open-multi-owner-chain")
834 .args(["--from", &from.to_string()])
835 .arg("--owners")
836 .args(owners.iter().map(AccountOwner::to_string))
837 .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
838 if !weights.is_empty() {
839 command
840 .arg("--owner-weights")
841 .args(weights.iter().map(u64::to_string));
842 };
843 command
844 .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
845 .args(["--initial-balance", &balance.to_string()]);
846
847 let stdout = command.spawn_and_wait_for_stdout().await?;
848 let mut split = stdout.split('\n');
849 let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
850
851 Ok(chain_id)
852 }
853
854 pub async fn change_ownership(
855 &self,
856 chain_id: ChainId,
857 super_owners: Vec<AccountOwner>,
858 owners: Vec<AccountOwner>,
859 ) -> Result<()> {
860 let mut command = self.command().await?;
861 command
862 .arg("change-ownership")
863 .args(["--chain-id", &chain_id.to_string()]);
864 if !super_owners.is_empty() {
865 command
866 .arg("--super-owners")
867 .args(super_owners.iter().map(AccountOwner::to_string));
868 }
869 if !owners.is_empty() {
870 command
871 .arg("--owners")
872 .args(owners.iter().map(AccountOwner::to_string));
873 }
874 command.spawn_and_wait_for_stdout().await?;
875 Ok(())
876 }
877
878 pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
880 let mut command = self.command().await?;
881 command
882 .args(["wallet", "follow-chain"])
883 .arg(chain_id.to_string());
884 if sync {
885 command.arg("--sync");
886 }
887 command.spawn_and_wait_for_stdout().await?;
888 Ok(())
889 }
890
891 pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
893 let mut command = self.command().await?;
894 command
895 .args(["wallet", "forget-chain"])
896 .arg(chain_id.to_string());
897 command.spawn_and_wait_for_stdout().await?;
898 Ok(())
899 }
900
901 pub async fn retry_pending_block(
902 &self,
903 chain_id: Option<ChainId>,
904 ) -> Result<Option<CryptoHash>> {
905 let mut command = self.command().await?;
906 command.arg("retry-pending-block");
907 if let Some(chain_id) = chain_id {
908 command.arg(chain_id.to_string());
909 }
910 let stdout = command.spawn_and_wait_for_stdout().await?;
911 let stdout = stdout.trim();
912 if stdout.is_empty() {
913 Ok(None)
914 } else {
915 Ok(Some(CryptoHash::from_str(stdout)?))
916 }
917 }
918
919 pub async fn publish_data_blob(
921 &self,
922 path: &Path,
923 chain_id: Option<ChainId>,
924 ) -> Result<CryptoHash> {
925 let mut command = self.command().await?;
926 command.arg("publish-data-blob").arg(path);
927 if let Some(chain_id) = chain_id {
928 command.arg(chain_id.to_string());
929 }
930 let stdout = command.spawn_and_wait_for_stdout().await?;
931 let stdout = stdout.trim();
932 Ok(CryptoHash::from_str(stdout)?)
933 }
934
935 pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
937 let mut command = self.command().await?;
938 command.arg("read-data-blob").arg(hash.to_string());
939 if let Some(chain_id) = chain_id {
940 command.arg(chain_id.to_string());
941 }
942 command.spawn_and_wait_for_stdout().await?;
943 Ok(())
944 }
945
946 pub fn load_wallet(&self) -> Result<Wallet> {
947 util::read_json(self.wallet_path())
948 }
949
950 pub fn load_keystore(&self) -> Result<InMemorySigner> {
951 util::read_json(self.keystore_path())
952 }
953
954 pub fn wallet_path(&self) -> PathBuf {
955 self.path_provider.path().join(&self.wallet)
956 }
957
958 pub fn keystore_path(&self) -> PathBuf {
959 self.path_provider.path().join(&self.keystore)
960 }
961
962 pub fn storage_path(&self) -> &str {
963 &self.storage
964 }
965
966 pub fn get_owner(&self) -> Option<AccountOwner> {
967 let wallet = self.load_wallet().ok()?;
968 let chain_id = wallet.default_chain()?;
969 wallet.get(chain_id)?.owner
970 }
971
972 pub async fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
973 self.load_wallet()
974 .ok()
975 .is_some_and(|wallet| wallet.get(chain).is_some())
976 }
977
978 pub async fn set_validator(
979 &self,
980 validator_key: &(String, String),
981 port: usize,
982 votes: usize,
983 ) -> Result<()> {
984 let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
985 self.command()
986 .await?
987 .arg("set-validator")
988 .args(["--public-key", &validator_key.0])
989 .args(["--account-key", &validator_key.1])
990 .args(["--address", &address])
991 .args(["--votes", &votes.to_string()])
992 .spawn_and_wait_for_stdout()
993 .await?;
994 Ok(())
995 }
996
997 pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
998 self.command()
999 .await?
1000 .arg("remove-validator")
1001 .args(["--public-key", validator_key])
1002 .spawn_and_wait_for_stdout()
1003 .await?;
1004 Ok(())
1005 }
1006
1007 pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
1008 self.command()
1009 .await?
1010 .arg("revoke-epochs")
1011 .arg(epoch.to_string())
1012 .spawn_and_wait_for_stdout()
1013 .await?;
1014 Ok(())
1015 }
1016
1017 pub async fn keygen(&self) -> Result<AccountOwner> {
1019 let stdout = self
1020 .command()
1021 .await?
1022 .arg("keygen")
1023 .spawn_and_wait_for_stdout()
1024 .await?;
1025 AccountOwner::from_str(stdout.as_str().trim())
1026 }
1027
1028 pub fn default_chain(&self) -> Option<ChainId> {
1030 self.load_wallet().ok()?.default_chain()
1031 }
1032
1033 pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
1035 let _stdout = self
1036 .command()
1037 .await?
1038 .arg("assign")
1039 .args(["--owner", &owner.to_string()])
1040 .args(["--chain-id", &chain_id.to_string()])
1041 .spawn_and_wait_for_stdout()
1042 .await?;
1043 Ok(())
1044 }
1045
1046 pub async fn set_preferred_owner(
1048 &self,
1049 chain_id: ChainId,
1050 owner: Option<AccountOwner>,
1051 ) -> Result<()> {
1052 let mut owner_arg = vec!["--owner".to_string()];
1053 if let Some(owner) = owner {
1054 owner_arg.push(owner.to_string());
1055 };
1056 self.command()
1057 .await?
1058 .arg("set-preferred-owner")
1059 .args(["--chain-id", &chain_id.to_string()])
1060 .args(owner_arg)
1061 .spawn_and_wait_for_stdout()
1062 .await?;
1063 Ok(())
1064 }
1065
1066 pub async fn build_application(
1067 &self,
1068 path: &Path,
1069 name: &str,
1070 is_workspace: bool,
1071 ) -> Result<(PathBuf, PathBuf)> {
1072 Command::new("cargo")
1073 .current_dir(self.path_provider.path())
1074 .arg("build")
1075 .arg("--release")
1076 .args(["--target", "wasm32-unknown-unknown"])
1077 .arg("--manifest-path")
1078 .arg(path.join("Cargo.toml"))
1079 .spawn_and_wait_for_stdout()
1080 .await?;
1081
1082 let release_dir = match is_workspace {
1083 true => path.join("../target/wasm32-unknown-unknown/release"),
1084 false => path.join("target/wasm32-unknown-unknown/release"),
1085 };
1086
1087 let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1088 let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1089
1090 let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1091 let service_size = fs_err::tokio::metadata(&service).await?.len();
1092 info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1093
1094 Ok((contract, service))
1095 }
1096}
1097
1098impl Drop for ClientWrapper {
1099 fn drop(&mut self) {
1100 use std::process::Command as SyncCommand;
1101
1102 if self.on_drop != OnClientDrop::CloseChains {
1103 return;
1104 }
1105
1106 let Ok(binary_path) = self.binary_path.lock() else {
1107 error!("Failed to close chains because a thread panicked with a lock to `binary_path`");
1108 return;
1109 };
1110
1111 let Some(binary_path) = binary_path.as_ref() else {
1112 warn!(
1113 "Assuming no chains need to be closed, because the command binary was never \
1114 resolved and therefore presumably never called"
1115 );
1116 return;
1117 };
1118
1119 let working_directory = self.path_provider.path();
1120 let mut wallet_show_command = SyncCommand::new(binary_path);
1121
1122 for argument in self.command_arguments() {
1123 wallet_show_command.arg(&*argument);
1124 }
1125
1126 let Ok(wallet_show_output) = wallet_show_command
1127 .current_dir(working_directory)
1128 .args(["wallet", "show", "--short", "--owned"])
1129 .output()
1130 else {
1131 warn!("Failed to execute `wallet show --short` to list chains to close");
1132 return;
1133 };
1134
1135 if !wallet_show_output.status.success() {
1136 warn!("Failed to list chains in the wallet to close them");
1137 return;
1138 }
1139
1140 let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1141 warn!(
1142 "Failed to close chains because `linera wallet show --short` \
1143 returned a non-UTF-8 output"
1144 );
1145 return;
1146 };
1147
1148 let chain_ids = chain_list_string
1149 .split('\n')
1150 .map(|line| line.trim())
1151 .filter(|line| !line.is_empty());
1152
1153 for chain_id in chain_ids {
1154 let mut close_chain_command = SyncCommand::new(binary_path);
1155
1156 for argument in self.command_arguments() {
1157 close_chain_command.arg(&*argument);
1158 }
1159
1160 close_chain_command.current_dir(working_directory);
1161
1162 match close_chain_command.args(["close-chain", chain_id]).status() {
1163 Ok(status) if status.success() => (),
1164 Ok(failure) => warn!("Failed to close chain {chain_id}: {failure}"),
1165 Err(error) => warn!("Failed to close chain {chain_id}: {error}"),
1166 }
1167 }
1168 }
1169}
1170
1171#[cfg(with_testing)]
1172impl ClientWrapper {
1173 pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1174 self.build_application(Self::example_path(name)?.as_path(), name, true)
1175 .await
1176 }
1177
1178 pub fn example_path(name: &str) -> Result<PathBuf> {
1179 Ok(env::current_dir()?.join("../examples/").join(name))
1180 }
1181}
1182
1183fn truncate_query_output(input: &str) -> String {
1184 let max_len = 1000;
1185 if input.len() < max_len {
1186 input.to_string()
1187 } else {
1188 format!("{} ...", input.get(..max_len).unwrap())
1189 }
1190}
1191
1192fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1193 let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1194 let max_len = 200;
1195 if query.len() < max_len {
1196 query
1197 } else {
1198 format!("{} ...", query.get(..max_len).unwrap())
1199 }
1200}
1201
1202pub struct NodeService {
1204 port: u16,
1205 child: Child,
1206}
1207
1208impl NodeService {
1209 fn new(port: u16, child: Child) -> Self {
1210 Self { port, child }
1211 }
1212
1213 pub async fn terminate(mut self) -> Result<()> {
1214 self.child.kill().await.context("terminating node service")
1215 }
1216
1217 pub fn port(&self) -> u16 {
1218 self.port
1219 }
1220
1221 pub fn ensure_is_running(&mut self) -> Result<()> {
1222 self.child.ensure_is_running()
1223 }
1224
1225 pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1226 let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1227 let mut data = self.query_node(query).await?;
1228 Ok(serde_json::from_value(data["processInbox"].take())?)
1229 }
1230
1231 pub async fn balance(&self, account: &Account) -> Result<Amount> {
1232 let chain = account.chain_id;
1233 let owner = account.owner;
1234 if matches!(owner, AccountOwner::CHAIN) {
1235 let query = format!(
1236 "query {{ chain(chainId:\"{chain}\") {{
1237 executionState {{ system {{ balance }} }}
1238 }} }}"
1239 );
1240 let response = self.query_node(query).await?;
1241 let balance = &response["chain"]["executionState"]["system"]["balance"]
1242 .as_str()
1243 .unwrap();
1244 return Ok(Amount::from_str(balance)?);
1245 }
1246 let query = format!(
1247 "query {{ chain(chainId:\"{chain}\") {{
1248 executionState {{ system {{ balances {{
1249 entry(key:\"{owner}\") {{ value }}
1250 }} }} }}
1251 }} }}"
1252 );
1253 let response = self.query_node(query).await?;
1254 let balances = &response["chain"]["executionState"]["system"]["balances"];
1255 let balance = balances["entry"]["value"].as_str();
1256 match balance {
1257 None => Ok(Amount::ZERO),
1258 Some(amount) => Ok(Amount::from_str(amount)?),
1259 }
1260 }
1261
1262 pub async fn make_application<A: ContractAbi>(
1263 &self,
1264 chain_id: &ChainId,
1265 application_id: &ApplicationId<A>,
1266 ) -> Result<ApplicationWrapper<A>> {
1267 let application_id = application_id.forget_abi().to_string();
1268 let link = format!(
1269 "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1270 self.port
1271 );
1272 Ok(ApplicationWrapper::from(link))
1273 }
1274
1275 pub async fn publish_data_blob(
1276 &self,
1277 chain_id: &ChainId,
1278 bytes: Vec<u8>,
1279 ) -> Result<CryptoHash> {
1280 let query = format!(
1281 "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1282 chain_id.to_value(),
1283 bytes.to_value(),
1284 );
1285 let data = self.query_node(query).await?;
1286 serde_json::from_value(data["publishDataBlob"].clone())
1287 .context("missing publishDataBlob field in response")
1288 }
1289
1290 pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1291 &self,
1292 chain_id: &ChainId,
1293 contract: PathBuf,
1294 service: PathBuf,
1295 vm_runtime: VmRuntime,
1296 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1297 let contract_code = Bytecode::load_from_file(&contract).await?;
1298 let service_code = Bytecode::load_from_file(&service).await?;
1299 let query = format!(
1300 "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1301 chain_id.to_value(),
1302 contract_code.to_value(),
1303 service_code.to_value(),
1304 vm_runtime.to_value(),
1305 );
1306 let data = self.query_node(query).await?;
1307 let module_str = data["publishModule"]
1308 .as_str()
1309 .context("module ID not found")?;
1310 let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1311 Ok(module_id.with_abi())
1312 }
1313
1314 pub async fn query_committees(&self, chain_id: &ChainId) -> Result<BTreeMap<Epoch, Committee>> {
1315 let query = format!(
1316 "query {{ chain(chainId:\"{chain_id}\") {{
1317 executionState {{ system {{ committees }} }}
1318 }} }}"
1319 );
1320 let mut response = self.query_node(query).await?;
1321 let committees = response["chain"]["executionState"]["system"]["committees"].take();
1322 Ok(serde_json::from_value(committees)?)
1323 }
1324
1325 pub async fn events_from_index(
1326 &self,
1327 chain_id: &ChainId,
1328 stream_id: &StreamId,
1329 start_index: u32,
1330 ) -> Result<Vec<IndexAndEvent>> {
1331 let query = format!(
1332 "query {{
1333 eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1334 {{ index event }}
1335 }}",
1336 stream_id.to_value()
1337 );
1338 let mut response = self.query_node(query).await?;
1339 let response = response["eventsFromIndex"].take();
1340 Ok(serde_json::from_value(response)?)
1341 }
1342
1343 pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1344 let n_try = 5;
1345 let query = query.as_ref();
1346 for i in 0..n_try {
1347 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1348 let url = format!("http://localhost:{}/", self.port);
1349 let client = reqwest_client();
1350 let result = client
1351 .post(url)
1352 .json(&json!({ "query": query }))
1353 .send()
1354 .await;
1355 if matches!(result, Err(ref error) if error.is_timeout()) {
1356 warn!(
1357 "Timeout when sending query {} to the node service",
1358 truncate_query_output(query)
1359 );
1360 continue;
1361 }
1362 let response = result.with_context(|| {
1363 format!(
1364 "query_node: failed to post query={}",
1365 truncate_query_output(query)
1366 )
1367 })?;
1368 anyhow::ensure!(
1369 response.status().is_success(),
1370 "Query \"{}\" failed: {}",
1371 truncate_query_output(query),
1372 response
1373 .text()
1374 .await
1375 .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1376 );
1377 let value: Value = response.json().await.context("invalid JSON")?;
1378 if let Some(errors) = value.get("errors") {
1379 warn!(
1380 "Query \"{}\" failed: {}",
1381 truncate_query_output(query),
1382 errors
1383 );
1384 } else {
1385 return Ok(value["data"].clone());
1386 }
1387 }
1388 bail!(
1389 "Query \"{}\" failed after {} retries.",
1390 truncate_query_output(query),
1391 n_try
1392 );
1393 }
1394
1395 pub async fn create_application<
1396 Abi: ContractAbi,
1397 Parameters: Serialize,
1398 InstantiationArgument: Serialize,
1399 >(
1400 &self,
1401 chain_id: &ChainId,
1402 module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1403 parameters: &Parameters,
1404 argument: &InstantiationArgument,
1405 required_application_ids: &[ApplicationId],
1406 ) -> Result<ApplicationId<Abi>> {
1407 let module_id = module_id.forget_abi();
1408 let json_required_applications_ids = required_application_ids
1409 .iter()
1410 .map(ApplicationId::to_string)
1411 .collect::<Vec<_>>()
1412 .to_value();
1413 let new_parameters = serde_json::to_value(parameters)
1415 .context("could not create parameters JSON")?
1416 .to_value();
1417 let new_argument = serde_json::to_value(argument)
1418 .context("could not create argument JSON")?
1419 .to_value();
1420 let query = format!(
1421 "mutation {{ createApplication(\
1422 chainId: \"{chain_id}\",
1423 moduleId: \"{module_id}\", \
1424 parameters: {new_parameters}, \
1425 instantiationArgument: {new_argument}, \
1426 requiredApplicationIds: {json_required_applications_ids}) \
1427 }}"
1428 );
1429 let data = self.query_node(query).await?;
1430 let app_id_str = data["createApplication"]
1431 .as_str()
1432 .context("missing createApplication string in response")?
1433 .trim();
1434 Ok(app_id_str
1435 .parse::<ApplicationId>()
1436 .context("invalid application ID")?
1437 .with_abi())
1438 }
1439
1440 pub async fn chain_tip_hash(&self, chain: ChainId) -> Result<Option<CryptoHash>> {
1442 let query = format!(r#"query {{ block(chainId: "{chain}") {{ hash }} }}"#);
1443
1444 let mut response = self.query_node(&query).await?;
1445
1446 match mem::take(&mut response["block"]["hash"]) {
1447 Value::Null => Ok(None),
1448 Value::String(hash) => Ok(Some(
1449 hash.parse()
1450 .context("Received an invalid hash {hash:?} for chain tip")?,
1451 )),
1452 invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
1453 }
1454 }
1455
1456 pub async fn notifications(
1458 &self,
1459 chain_id: ChainId,
1460 ) -> Result<impl Stream<Item = Result<Notification>>> {
1461 let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
1462 let url = format!("ws://localhost:{}/ws", self.port);
1463 let mut request = url.into_client_request()?;
1464 request.headers_mut().insert(
1465 "Sec-WebSocket-Protocol",
1466 HeaderValue::from_str("graphql-transport-ws")?,
1467 );
1468 let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1469 let init_json = json!({
1470 "type": "connection_init",
1471 "payload": {}
1472 });
1473 websocket.send(init_json.to_string().into()).await?;
1474 let text = websocket
1475 .next()
1476 .await
1477 .context("Failed to establish connection")??
1478 .into_text()?;
1479 ensure!(
1480 text == "{\"type\":\"connection_ack\"}",
1481 "Unexpected response: {text}"
1482 );
1483 let query_json = json!({
1484 "id": "1",
1485 "type": "start",
1486 "payload": {
1487 "query": query,
1488 "variables": {},
1489 "operationName": null
1490 }
1491 });
1492 websocket.send(query_json.to_string().into()).await?;
1493 Ok(websocket
1494 .map_err(anyhow::Error::from)
1495 .and_then(|message| async {
1496 let text = message.into_text()?;
1497 let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1498 if let Some(errors) = value["payload"].get("errors") {
1499 bail!("Notification subscription failed: {errors:?}");
1500 }
1501 serde_json::from_value(value["payload"]["data"]["notifications"].clone())
1502 .context("Failed to deserialize notification")
1503 }))
1504 }
1505}
1506
1507pub struct FaucetService {
1509 port: u16,
1510 child: Child,
1511}
1512
1513impl FaucetService {
1514 fn new(port: u16, child: Child) -> Self {
1515 Self { port, child }
1516 }
1517
1518 pub async fn terminate(mut self) -> Result<()> {
1519 self.child
1520 .kill()
1521 .await
1522 .context("terminating faucet service")
1523 }
1524
1525 pub fn ensure_is_running(&mut self) -> Result<()> {
1526 self.child.ensure_is_running()
1527 }
1528
1529 pub fn instance(&self) -> Faucet {
1530 Faucet::new(format!("http://localhost:{}/", self.port))
1531 }
1532}
1533
1534pub struct ApplicationWrapper<A> {
1536 uri: String,
1537 _phantom: PhantomData<A>,
1538}
1539
1540impl<A> ApplicationWrapper<A> {
1541 pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
1542 let query = query.as_ref();
1543 let value = self.run_json_query(json!({ "query": query })).await?;
1544 Ok(value["data"].clone())
1545 }
1546
1547 pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
1548 const MAX_RETRIES: usize = 5;
1549
1550 for i in 0.. {
1551 let client = reqwest_client();
1552 let result = client.post(&self.uri).json(&query).send().await;
1553 let response = match result {
1554 Ok(response) => response,
1555 Err(error) if i < MAX_RETRIES => {
1556 warn!(
1557 "Failed to post query \"{}\": {error}; retrying",
1558 truncate_query_output_serialize(&query),
1559 );
1560 continue;
1561 }
1562 Err(error) => {
1563 let query = truncate_query_output_serialize(&query);
1564 return Err(error)
1565 .with_context(|| format!("run_json_query: failed to post query={query}"));
1566 }
1567 };
1568 anyhow::ensure!(
1569 response.status().is_success(),
1570 "Query \"{}\" failed: {}",
1571 truncate_query_output_serialize(&query),
1572 response
1573 .text()
1574 .await
1575 .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1576 );
1577 let value: Value = response.json().await.context("invalid JSON")?;
1578 if let Some(errors) = value.get("errors") {
1579 bail!(
1580 "Query \"{}\" failed: {}",
1581 truncate_query_output_serialize(&query),
1582 errors
1583 );
1584 }
1585 return Ok(value);
1586 }
1587 unreachable!()
1588 }
1589
1590 pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
1591 let query = query.as_ref();
1592 self.run_graphql_query(&format!("query {{ {query} }}"))
1593 .await
1594 }
1595
1596 pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
1597 let query = query.as_ref().trim();
1598 let name = query
1599 .split_once(|ch: char| !ch.is_alphanumeric())
1600 .map_or(query, |(name, _)| name);
1601 let data = self.query(query).await?;
1602 serde_json::from_value(data[name].clone())
1603 .with_context(|| format!("{name} field missing in response"))
1604 }
1605
1606 pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
1607 let mutation = mutation.as_ref();
1608 self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
1609 .await
1610 }
1611}
1612
1613impl<A> From<String> for ApplicationWrapper<A> {
1614 fn from(uri: String) -> ApplicationWrapper<A> {
1615 ApplicationWrapper {
1616 uri,
1617 _phantom: PhantomData,
1618 }
1619 }
1620}