linera_service/cli_wrappers/
wallet.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    borrow::Cow,
6    collections::BTreeMap,
7    env,
8    marker::PhantomData,
9    mem,
10    path::{Path, PathBuf},
11    str::FromStr,
12    sync,
13    time::Duration,
14};
15
16use anyhow::{bail, ensure, Context, Result};
17use async_graphql::InputType;
18use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
19use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
20use heck::ToKebabCase;
21use linera_base::{
22    abi::ContractAbi,
23    command::{resolve_binary, CommandExt},
24    crypto::{CryptoHash, InMemorySigner},
25    data_types::{Amount, Bytecode, Epoch},
26    identifiers::{
27        Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
28    },
29    vm::VmRuntime,
30};
31use linera_client::{client_options::ResourceControlPolicyConfig, wallet::Wallet};
32use linera_core::worker::Notification;
33use linera_execution::committee::Committee;
34use linera_faucet_client::Faucet;
35use serde::{de::DeserializeOwned, ser::Serialize};
36use serde_json::{json, Value};
37use tempfile::TempDir;
38use tokio::process::{Child, Command};
39use tracing::{error, info, warn};
40#[cfg(feature = "benchmark")]
41use {
42    crate::cli::command::BenchmarkCommand,
43    serde_command_opts::to_args,
44    std::process::Stdio,
45    tokio::{
46        io::{AsyncBufReadExt, BufReader},
47        sync::oneshot,
48        task::JoinHandle,
49    },
50};
51
52use crate::{
53    cli_wrappers::{
54        local_net::{PathProvider, ProcessInbox},
55        Network,
56    },
57    util::{self, ChildExt},
58};
59
60/// The name of the environment variable that allows specifying additional arguments to be passed
61/// to the node-service command of the client.
62const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
63
64fn reqwest_client() -> reqwest::Client {
65    reqwest::ClientBuilder::new()
66        .timeout(Duration::from_secs(30))
67        .build()
68        .unwrap()
69}
70
71/// Wrapper to run a Linera client command.
72pub struct ClientWrapper {
73    binary_path: sync::Mutex<Option<PathBuf>>,
74    testing_prng_seed: Option<u64>,
75    storage: String,
76    wallet: String,
77    keystore: String,
78    max_pending_message_bundles: usize,
79    network: Network,
80    pub path_provider: PathProvider,
81    on_drop: OnClientDrop,
82    extra_args: Vec<String>,
83}
84
85/// Action to perform when the [`ClientWrapper`] is dropped.
86#[derive(Clone, Copy, Debug, Eq, PartialEq)]
87pub enum OnClientDrop {
88    /// Close all the chains on the wallet.
89    CloseChains,
90    /// Do not close any chains, leaving them active.
91    LeakChains,
92}
93
94impl ClientWrapper {
95    pub fn new(
96        path_provider: PathProvider,
97        network: Network,
98        testing_prng_seed: Option<u64>,
99        id: usize,
100        on_drop: OnClientDrop,
101    ) -> Self {
102        Self::new_with_extra_args(
103            path_provider,
104            network,
105            testing_prng_seed,
106            id,
107            on_drop,
108            vec!["--wait-for-outgoing-messages".to_string()],
109        )
110    }
111
112    pub fn new_with_extra_args(
113        path_provider: PathProvider,
114        network: Network,
115        testing_prng_seed: Option<u64>,
116        id: usize,
117        on_drop: OnClientDrop,
118        extra_args: Vec<String>,
119    ) -> Self {
120        let storage = format!(
121            "rocksdb:{}/client_{}.db",
122            path_provider.path().display(),
123            id
124        );
125        let wallet = format!("wallet_{}.json", id);
126        let keystore = format!("keystore_{}.json", id);
127        Self {
128            binary_path: sync::Mutex::new(None),
129            testing_prng_seed,
130            storage,
131            wallet,
132            keystore,
133            max_pending_message_bundles: 10_000,
134            network,
135            path_provider,
136            on_drop,
137            extra_args,
138        }
139    }
140
141    /// Runs `linera project new`.
142    pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
143        let tmp = TempDir::new()?;
144        let mut command = self.command().await?;
145        command
146            .current_dir(tmp.path())
147            .arg("project")
148            .arg("new")
149            .arg(project_name)
150            .arg("--linera-root")
151            .arg(linera_root)
152            .spawn_and_wait_for_stdout()
153            .await?;
154        Ok(tmp)
155    }
156
157    /// Runs `linera project publish`.
158    pub async fn project_publish<T: Serialize>(
159        &self,
160        path: PathBuf,
161        required_application_ids: Vec<String>,
162        publisher: impl Into<Option<ChainId>>,
163        argument: &T,
164    ) -> Result<String> {
165        let json_parameters = serde_json::to_string(&())?;
166        let json_argument = serde_json::to_string(argument)?;
167        let mut command = self.command().await?;
168        command
169            .arg("project")
170            .arg("publish-and-create")
171            .arg(path)
172            .args(publisher.into().iter().map(ChainId::to_string))
173            .args(["--json-parameters", &json_parameters])
174            .args(["--json-argument", &json_argument]);
175        if !required_application_ids.is_empty() {
176            command.arg("--required-application-ids");
177            command.args(required_application_ids);
178        }
179        let stdout = command.spawn_and_wait_for_stdout().await?;
180        Ok(stdout.trim().to_string())
181    }
182
183    /// Runs `linera project test`.
184    pub async fn project_test(&self, path: &Path) -> Result<()> {
185        self.command()
186            .await
187            .context("failed to create project test command")?
188            .current_dir(path)
189            .arg("project")
190            .arg("test")
191            .spawn_and_wait_for_stdout()
192            .await?;
193        Ok(())
194    }
195
196    async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
197        let mut command = self.command_binary().await?;
198        command.current_dir(self.path_provider.path());
199        for (key, value) in envs {
200            command.env(key, value);
201        }
202        for argument in self.command_arguments() {
203            command.arg(&*argument);
204        }
205        Ok(command)
206    }
207
208    async fn command(&self) -> Result<Command> {
209        self.command_with_envs(&[(
210            "RUST_LOG",
211            &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
212        )])
213        .await
214    }
215
216    /// Returns an iterator over the arguments that should be added to all command invocations.
217    fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
218        [
219            "--wallet".into(),
220            self.wallet.as_str().into(),
221            "--keystore".into(),
222            self.keystore.as_str().into(),
223            "--storage".into(),
224            self.storage.as_str().into(),
225            "--max-pending-message-bundles".into(),
226            self.max_pending_message_bundles.to_string().into(),
227            "--send-timeout-ms".into(),
228            "500000".into(),
229            "--recv-timeout-ms".into(),
230            "500000".into(),
231        ]
232        .into_iter()
233        .chain(self.extra_args.iter().map(|s| s.as_str().into()))
234    }
235
236    /// Returns the [`Command`] instance configured to run the appropriate binary.
237    ///
238    /// The path is resolved once and cached inside `self` for subsequent usages.
239    async fn command_binary(&self) -> Result<Command> {
240        match self.command_with_cached_binary_path() {
241            Some(command) => Ok(command),
242            None => {
243                let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
244                let command = Command::new(&resolved_path);
245
246                self.set_cached_binary_path(resolved_path);
247
248                Ok(command)
249            }
250        }
251    }
252
253    /// Returns a [`Command`] instance configured with the cached `binary_path`, if available.
254    fn command_with_cached_binary_path(&self) -> Option<Command> {
255        let binary_path = self.binary_path.lock().unwrap();
256
257        binary_path.as_ref().map(Command::new)
258    }
259
260    /// Sets the cached `binary_path` with the `new_binary_path`.
261    ///
262    /// # Panics
263    ///
264    /// If the cache is already set to a different value. In theory the two threads calling
265    /// `command_binary` can race and resolve the binary path twice, but they should always be the
266    /// same path.
267    fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
268        let mut binary_path = self.binary_path.lock().unwrap();
269
270        if binary_path.is_none() {
271            *binary_path = Some(new_binary_path);
272        } else {
273            assert_eq!(*binary_path, Some(new_binary_path));
274        }
275    }
276
277    /// Runs `linera create-genesis-config`.
278    pub async fn create_genesis_config(
279        &self,
280        num_other_initial_chains: u32,
281        initial_funding: Amount,
282        policy_config: ResourceControlPolicyConfig,
283        http_allow_list: Option<Vec<String>>,
284    ) -> Result<()> {
285        let mut command = self.command().await?;
286        command
287            .args([
288                "create-genesis-config",
289                &num_other_initial_chains.to_string(),
290            ])
291            .args(["--initial-funding", &initial_funding.to_string()])
292            .args(["--committee", "committee.json"])
293            .args(["--genesis", "genesis.json"])
294            .args([
295                "--policy-config",
296                &policy_config.to_string().to_kebab_case(),
297            ]);
298        if let Some(allow_list) = http_allow_list {
299            command
300                .arg("--http-request-allow-list")
301                .arg(allow_list.join(","));
302        }
303        if let Some(seed) = self.testing_prng_seed {
304            command.arg("--testing-prng-seed").arg(seed.to_string());
305        }
306        command.spawn_and_wait_for_stdout().await?;
307        Ok(())
308    }
309
310    /// Runs `linera wallet init`. The genesis config is read from `genesis.json`, or from the
311    /// faucet if provided.
312    pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
313        let mut command = self.command().await?;
314        command.args(["wallet", "init"]);
315        match faucet {
316            None => command.args(["--genesis", "genesis.json"]),
317            Some(faucet) => command.args(["--faucet", faucet.url()]),
318        };
319        if let Some(seed) = self.testing_prng_seed {
320            command.arg("--testing-prng-seed").arg(seed.to_string());
321        }
322        command.spawn_and_wait_for_stdout().await?;
323        Ok(())
324    }
325
326    /// Runs `linera wallet request-chain`.
327    pub async fn request_chain(
328        &self,
329        faucet: &Faucet,
330        set_default: bool,
331    ) -> Result<(ChainId, AccountOwner)> {
332        let mut command = self.command().await?;
333        command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
334        if set_default {
335            command.arg("--set-default");
336        }
337        let stdout = command.spawn_and_wait_for_stdout().await?;
338        let mut lines = stdout.split_whitespace();
339        let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
340        let owner = lines.next().context("missing chain owner")?.parse()?;
341        Ok((chain_id, owner))
342    }
343
344    /// Runs `linera wallet publish-and-create`.
345    #[expect(clippy::too_many_arguments)]
346    pub async fn publish_and_create<
347        A: ContractAbi,
348        Parameters: Serialize,
349        InstantiationArgument: Serialize,
350    >(
351        &self,
352        contract: PathBuf,
353        service: PathBuf,
354        vm_runtime: VmRuntime,
355        parameters: &Parameters,
356        argument: &InstantiationArgument,
357        required_application_ids: &[ApplicationId],
358        publisher: impl Into<Option<ChainId>>,
359    ) -> Result<ApplicationId<A>> {
360        let json_parameters = serde_json::to_string(parameters)?;
361        let json_argument = serde_json::to_string(argument)?;
362        let mut command = self.command().await?;
363        let vm_runtime = format!("{}", vm_runtime);
364        command
365            .arg("publish-and-create")
366            .args([contract, service])
367            .args(["--vm-runtime", &vm_runtime.to_lowercase()])
368            .args(publisher.into().iter().map(ChainId::to_string))
369            .args(["--json-parameters", &json_parameters])
370            .args(["--json-argument", &json_argument]);
371        if !required_application_ids.is_empty() {
372            command.arg("--required-application-ids");
373            command.args(
374                required_application_ids
375                    .iter()
376                    .map(ApplicationId::to_string),
377            );
378        }
379        let stdout = command.spawn_and_wait_for_stdout().await?;
380        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
381    }
382
383    /// Runs `linera publish-module`.
384    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
385        &self,
386        contract: PathBuf,
387        service: PathBuf,
388        vm_runtime: VmRuntime,
389        publisher: impl Into<Option<ChainId>>,
390    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
391        let stdout = self
392            .command()
393            .await?
394            .arg("publish-module")
395            .args([contract, service])
396            .args(["--vm-runtime", &format!("{}", vm_runtime).to_lowercase()])
397            .args(publisher.into().iter().map(ChainId::to_string))
398            .spawn_and_wait_for_stdout()
399            .await?;
400        let module_id: ModuleId = stdout.trim().parse()?;
401        Ok(module_id.with_abi())
402    }
403
404    /// Runs `linera create-application`.
405    pub async fn create_application<
406        Abi: ContractAbi,
407        Parameters: Serialize,
408        InstantiationArgument: Serialize,
409    >(
410        &self,
411        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
412        parameters: &Parameters,
413        argument: &InstantiationArgument,
414        required_application_ids: &[ApplicationId],
415        creator: impl Into<Option<ChainId>>,
416    ) -> Result<ApplicationId<Abi>> {
417        let json_parameters = serde_json::to_string(parameters)?;
418        let json_argument = serde_json::to_string(argument)?;
419        let mut command = self.command().await?;
420        command
421            .arg("create-application")
422            .arg(module_id.forget_abi().to_string())
423            .args(["--json-parameters", &json_parameters])
424            .args(["--json-argument", &json_argument])
425            .args(creator.into().iter().map(ChainId::to_string));
426        if !required_application_ids.is_empty() {
427            command.arg("--required-application-ids");
428            command.args(
429                required_application_ids
430                    .iter()
431                    .map(ApplicationId::to_string),
432            );
433        }
434        let stdout = command.spawn_and_wait_for_stdout().await?;
435        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
436    }
437
438    /// Runs `linera service`.
439    pub async fn run_node_service(
440        &self,
441        port: impl Into<Option<u16>>,
442        process_inbox: ProcessInbox,
443    ) -> Result<NodeService> {
444        let port = port.into().unwrap_or(8080);
445        let mut command = self.command().await?;
446        command.arg("service");
447        if let ProcessInbox::Skip = process_inbox {
448            command.arg("--listener-skip-process-inbox");
449        }
450        if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
451            command.args(var.split_whitespace());
452        }
453        let child = command
454            .args(["--port".to_string(), port.to_string()])
455            .spawn_into()?;
456        let client = reqwest_client();
457        for i in 0..10 {
458            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
459            let request = client
460                .get(format!("http://localhost:{}/", port))
461                .send()
462                .await;
463            if request.is_ok() {
464                info!("Node service has started");
465                return Ok(NodeService::new(port, child));
466            } else {
467                warn!("Waiting for node service to start");
468            }
469        }
470        bail!("Failed to start node service");
471    }
472
473    /// Runs `linera query-validator`
474    pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
475        let mut command = self.command().await?;
476        command.arg("query-validator").arg(address);
477        let stdout = command.spawn_and_wait_for_stdout().await?;
478        let hash = stdout
479            .trim()
480            .parse()
481            .context("error while parsing the result of `linera query-validator`")?;
482        Ok(hash)
483    }
484
485    /// Runs `linera query-validators`.
486    pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
487        let mut command = self.command().await?;
488        command.arg("query-validators");
489        if let Some(chain_id) = chain_id {
490            command.arg(chain_id.to_string());
491        }
492        command.spawn_and_wait_for_stdout().await?;
493        Ok(())
494    }
495
496    /// Runs `linera sync-validator`.
497    pub async fn sync_validator(
498        &self,
499        chain_ids: impl IntoIterator<Item = &ChainId>,
500        validator_address: impl Into<String>,
501    ) -> Result<()> {
502        let mut command = self.command().await?;
503        command.arg("sync-validator").arg(validator_address.into());
504        let mut chain_ids = chain_ids.into_iter().peekable();
505        if chain_ids.peek().is_some() {
506            command
507                .arg("--chains")
508                .args(chain_ids.map(ChainId::to_string));
509        }
510        command.spawn_and_wait_for_stdout().await?;
511        Ok(())
512    }
513
514    /// Runs `linera faucet`.
515    pub async fn run_faucet(
516        &self,
517        port: impl Into<Option<u16>>,
518        chain_id: ChainId,
519        amount: Amount,
520    ) -> Result<FaucetService> {
521        let port = port.into().unwrap_or(8080);
522        let mut command = self.command().await?;
523        let child = command
524            .arg("faucet")
525            .arg(chain_id.to_string())
526            .args(["--port".to_string(), port.to_string()])
527            .args(["--amount".to_string(), amount.to_string()])
528            .spawn_into()?;
529        let client = reqwest_client();
530        for i in 0..10 {
531            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
532            let request = client
533                .get(format!("http://localhost:{}/", port))
534                .send()
535                .await;
536            if request.is_ok() {
537                info!("Faucet has started");
538                return Ok(FaucetService::new(port, child));
539            } else {
540                warn!("Waiting for faucet to start");
541            }
542        }
543        bail!("Failed to start faucet");
544    }
545
546    /// Runs `linera local-balance`.
547    pub async fn local_balance(&self, account: Account) -> Result<Amount> {
548        let stdout = self
549            .command()
550            .await?
551            .arg("local-balance")
552            .arg(account.to_string())
553            .spawn_and_wait_for_stdout()
554            .await?;
555        let amount = stdout
556            .trim()
557            .parse()
558            .context("error while parsing the result of `linera local-balance`")?;
559        Ok(amount)
560    }
561
562    /// Runs `linera query-balance`.
563    pub async fn query_balance(&self, account: Account) -> Result<Amount> {
564        let stdout = self
565            .command()
566            .await?
567            .arg("query-balance")
568            .arg(account.to_string())
569            .spawn_and_wait_for_stdout()
570            .await?;
571        let amount = stdout
572            .trim()
573            .parse()
574            .context("error while parsing the result of `linera query-balance`")?;
575        Ok(amount)
576    }
577
578    /// Runs `linera sync`.
579    pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
580        self.command()
581            .await?
582            .arg("sync")
583            .arg(chain_id.to_string())
584            .spawn_and_wait_for_stdout()
585            .await?;
586        Ok(())
587    }
588
589    /// Runs `linera process-inbox`.
590    pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
591        self.command()
592            .await?
593            .arg("process-inbox")
594            .arg(chain_id.to_string())
595            .spawn_and_wait_for_stdout()
596            .await?;
597        Ok(())
598    }
599
600    /// Runs `linera transfer`.
601    pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
602        self.command()
603            .await?
604            .arg("transfer")
605            .arg(amount.to_string())
606            .args(["--from", &from.to_string()])
607            .args(["--to", &to.to_string()])
608            .spawn_and_wait_for_stdout()
609            .await?;
610        Ok(())
611    }
612
613    /// Runs `linera transfer` with no logging.
614    pub async fn transfer_with_silent_logs(
615        &self,
616        amount: Amount,
617        from: ChainId,
618        to: ChainId,
619    ) -> Result<()> {
620        self.command()
621            .await?
622            .env("RUST_LOG", "off")
623            .arg("transfer")
624            .arg(amount.to_string())
625            .args(["--from", &from.to_string()])
626            .args(["--to", &to.to_string()])
627            .spawn_and_wait_for_stdout()
628            .await?;
629        Ok(())
630    }
631
632    /// Runs `linera transfer` with owner accounts.
633    pub async fn transfer_with_accounts(
634        &self,
635        amount: Amount,
636        from: Account,
637        to: Account,
638    ) -> Result<()> {
639        self.command()
640            .await?
641            .arg("transfer")
642            .arg(amount.to_string())
643            .args(["--from", &from.to_string()])
644            .args(["--to", &to.to_string()])
645            .spawn_and_wait_for_stdout()
646            .await?;
647        Ok(())
648    }
649
650    #[cfg(feature = "benchmark")]
651    fn benchmark_command_internal(command: &mut Command, args: BenchmarkCommand) -> Result<()> {
652        let args = to_args(&args)?
653            .chunks_exact(2)
654            .flat_map(|pair| {
655                let option = format!("--{}", pair[0]);
656                match pair[1].as_str() {
657                    "true" => vec![option],
658                    "false" => vec![],
659                    _ => vec![option, pair[1].clone()],
660                }
661            })
662            .collect::<Vec<_>>();
663        command.arg("benchmark").args(args);
664        Ok(())
665    }
666
667    #[cfg(feature = "benchmark")]
668    async fn benchmark_command_with_envs(
669        &self,
670        args: BenchmarkCommand,
671        envs: &[(&str, &str)],
672    ) -> Result<Command> {
673        let mut command = self.command_with_envs(envs).await?;
674        Self::benchmark_command_internal(&mut command, args)?;
675        Ok(command)
676    }
677
678    #[cfg(feature = "benchmark")]
679    async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
680        let mut command = self.command().await?;
681        Self::benchmark_command_internal(&mut command, args)?;
682        Ok(command)
683    }
684
685    /// Runs `linera benchmark`.
686    #[cfg(feature = "benchmark")]
687    pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
688        let mut command = self.benchmark_command(args).await?;
689        command.spawn_and_wait_for_stdout().await?;
690        Ok(())
691    }
692
693    /// Runs `linera benchmark`, but detached: don't wait for the command to finish, just spawn it
694    /// and return the child process, and the handles to the stdout and stderr.
695    #[cfg(feature = "benchmark")]
696    pub async fn benchmark_detached(
697        &self,
698        args: BenchmarkCommand,
699        tx: oneshot::Sender<()>,
700    ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
701        let mut child = self
702            .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
703            .await?
704            .kill_on_drop(true)
705            .stdin(Stdio::piped())
706            .stdout(Stdio::piped())
707            .stderr(Stdio::piped())
708            .spawn()?;
709
710        let pid = child.id().expect("failed to get pid");
711        let stdout = child.stdout.take().expect("stdout not open");
712        let stdout_handle = tokio::spawn(async move {
713            let mut lines = BufReader::new(stdout).lines();
714            while let Ok(Some(line)) = lines.next_line().await {
715                println!("benchmark{{pid={pid}}} {line}");
716            }
717        });
718
719        let stderr = child.stderr.take().expect("stderr not open");
720        let stderr_handle = tokio::spawn(async move {
721            let mut lines = BufReader::new(stderr).lines();
722            let mut tx = Some(tx);
723            while let Ok(Some(line)) = lines.next_line().await {
724                if line.contains("Ready to start benchmark") {
725                    tx.take()
726                        .expect("Should only send signal once")
727                        .send(())
728                        .expect("failed to send ready signal to main thread");
729                } else {
730                    println!("benchmark{{pid={pid}}} {line}");
731                }
732            }
733        });
734        Ok((child, stdout_handle, stderr_handle))
735    }
736
737    /// Runs `linera open-chain`.
738    pub async fn open_chain(
739        &self,
740        from: ChainId,
741        owner: Option<AccountOwner>,
742        initial_balance: Amount,
743    ) -> Result<(ChainId, AccountOwner)> {
744        let mut command = self.command().await?;
745        command
746            .arg("open-chain")
747            .args(["--from", &from.to_string()])
748            .args(["--initial-balance", &initial_balance.to_string()]);
749
750        if let Some(owner) = owner {
751            command.args(["--owner", &owner.to_string()]);
752        }
753
754        let stdout = command.spawn_and_wait_for_stdout().await?;
755        let mut split = stdout.split('\n');
756        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
757        let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
758        if let Some(owner) = owner {
759            assert_eq!(owner, new_owner);
760        }
761        Ok((chain_id, new_owner))
762    }
763
764    /// Runs `linera open-chain` then `linera assign`.
765    pub async fn open_and_assign(
766        &self,
767        client: &ClientWrapper,
768        initial_balance: Amount,
769    ) -> Result<ChainId> {
770        let our_chain = self
771            .load_wallet()?
772            .default_chain()
773            .context("no default chain found")?;
774        let owner = client.keygen().await?;
775        let (new_chain, _) = self
776            .open_chain(our_chain, Some(owner), initial_balance)
777            .await?;
778        client.assign(owner, new_chain).await?;
779        Ok(new_chain)
780    }
781
782    pub async fn open_multi_owner_chain(
783        &self,
784        from: ChainId,
785        owners: Vec<AccountOwner>,
786        weights: Vec<u64>,
787        multi_leader_rounds: u32,
788        balance: Amount,
789        base_timeout_ms: u64,
790    ) -> Result<ChainId> {
791        let mut command = self.command().await?;
792        command
793            .arg("open-multi-owner-chain")
794            .args(["--from", &from.to_string()])
795            .arg("--owners")
796            .args(owners.iter().map(AccountOwner::to_string))
797            .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
798        if !weights.is_empty() {
799            command
800                .arg("--owner-weights")
801                .args(weights.iter().map(u64::to_string));
802        };
803        command
804            .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
805            .args(["--initial-balance", &balance.to_string()]);
806
807        let stdout = command.spawn_and_wait_for_stdout().await?;
808        let mut split = stdout.split('\n');
809        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
810
811        Ok(chain_id)
812    }
813
814    pub async fn change_ownership(
815        &self,
816        chain_id: ChainId,
817        super_owners: Vec<AccountOwner>,
818        owners: Vec<AccountOwner>,
819    ) -> Result<()> {
820        let mut command = self.command().await?;
821        command
822            .arg("change-ownership")
823            .args(["--chain-id", &chain_id.to_string()]);
824        if !super_owners.is_empty() {
825            command
826                .arg("--super-owners")
827                .args(super_owners.iter().map(AccountOwner::to_string));
828        }
829        if !owners.is_empty() {
830            command
831                .arg("--owners")
832                .args(owners.iter().map(AccountOwner::to_string));
833        }
834        command.spawn_and_wait_for_stdout().await?;
835        Ok(())
836    }
837
838    /// Runs `linera wallet follow-chain CHAIN_ID`.
839    pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
840        let mut command = self.command().await?;
841        command
842            .args(["wallet", "follow-chain"])
843            .arg(chain_id.to_string());
844        if sync {
845            command.arg("--sync");
846        }
847        command.spawn_and_wait_for_stdout().await?;
848        Ok(())
849    }
850
851    /// Runs `linera wallet forget-chain CHAIN_ID`.
852    pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
853        let mut command = self.command().await?;
854        command
855            .args(["wallet", "forget-chain"])
856            .arg(chain_id.to_string());
857        command.spawn_and_wait_for_stdout().await?;
858        Ok(())
859    }
860
861    pub async fn retry_pending_block(
862        &self,
863        chain_id: Option<ChainId>,
864    ) -> Result<Option<CryptoHash>> {
865        let mut command = self.command().await?;
866        command.arg("retry-pending-block");
867        if let Some(chain_id) = chain_id {
868            command.arg(chain_id.to_string());
869        }
870        let stdout = command.spawn_and_wait_for_stdout().await?;
871        let stdout = stdout.trim();
872        if stdout.is_empty() {
873            Ok(None)
874        } else {
875            Ok(Some(CryptoHash::from_str(stdout)?))
876        }
877    }
878
879    /// Runs `linera publish-data-blob`.
880    pub async fn publish_data_blob(
881        &self,
882        path: &Path,
883        chain_id: Option<ChainId>,
884    ) -> Result<CryptoHash> {
885        let mut command = self.command().await?;
886        command.arg("publish-data-blob").arg(path);
887        if let Some(chain_id) = chain_id {
888            command.arg(chain_id.to_string());
889        }
890        let stdout = command.spawn_and_wait_for_stdout().await?;
891        let stdout = stdout.trim();
892        Ok(CryptoHash::from_str(stdout)?)
893    }
894
895    /// Runs `linera read-data-blob`.
896    pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
897        let mut command = self.command().await?;
898        command.arg("read-data-blob").arg(hash.to_string());
899        if let Some(chain_id) = chain_id {
900            command.arg(chain_id.to_string());
901        }
902        command.spawn_and_wait_for_stdout().await?;
903        Ok(())
904    }
905
906    pub fn load_wallet(&self) -> Result<Wallet> {
907        util::read_json(self.wallet_path())
908    }
909
910    pub fn load_keystore(&self) -> Result<InMemorySigner> {
911        util::read_json(self.keystore_path())
912    }
913
914    pub fn wallet_path(&self) -> PathBuf {
915        self.path_provider.path().join(&self.wallet)
916    }
917
918    pub fn keystore_path(&self) -> PathBuf {
919        self.path_provider.path().join(&self.keystore)
920    }
921
922    pub fn storage_path(&self) -> &str {
923        &self.storage
924    }
925
926    pub fn get_owner(&self) -> Option<AccountOwner> {
927        let wallet = self.load_wallet().ok()?;
928        let chain_id = wallet.default_chain()?;
929        wallet.get(chain_id)?.owner
930    }
931
932    pub async fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
933        self.load_wallet()
934            .ok()
935            .is_some_and(|wallet| wallet.get(chain).is_some())
936    }
937
938    pub async fn set_validator(
939        &self,
940        validator_key: &(String, String),
941        port: usize,
942        votes: usize,
943    ) -> Result<()> {
944        let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
945        self.command()
946            .await?
947            .arg("set-validator")
948            .args(["--public-key", &validator_key.0])
949            .args(["--account-key", &validator_key.1])
950            .args(["--address", &address])
951            .args(["--votes", &votes.to_string()])
952            .spawn_and_wait_for_stdout()
953            .await?;
954        Ok(())
955    }
956
957    pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
958        self.command()
959            .await?
960            .arg("remove-validator")
961            .args(["--public-key", validator_key])
962            .spawn_and_wait_for_stdout()
963            .await?;
964        Ok(())
965    }
966
967    pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
968        self.command()
969            .await?
970            .arg("revoke-epochs")
971            .arg(epoch.to_string())
972            .spawn_and_wait_for_stdout()
973            .await?;
974        Ok(())
975    }
976
977    /// Runs `linera keygen`.
978    pub async fn keygen(&self) -> Result<AccountOwner> {
979        let stdout = self
980            .command()
981            .await?
982            .arg("keygen")
983            .spawn_and_wait_for_stdout()
984            .await?;
985        AccountOwner::from_str(stdout.as_str().trim())
986    }
987
988    /// Returns the default chain.
989    pub fn default_chain(&self) -> Option<ChainId> {
990        self.load_wallet().ok()?.default_chain()
991    }
992
993    /// Runs `linera assign`.
994    pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
995        let _stdout = self
996            .command()
997            .await?
998            .arg("assign")
999            .args(["--owner", &owner.to_string()])
1000            .args(["--chain-id", &chain_id.to_string()])
1001            .spawn_and_wait_for_stdout()
1002            .await?;
1003        Ok(())
1004    }
1005
1006    /// Runs `linera set-preferred-owner` for `chain_id`.
1007    pub async fn set_preffered_owner(
1008        &self,
1009        chain_id: ChainId,
1010        owner: Option<AccountOwner>,
1011    ) -> Result<()> {
1012        let mut owner_arg = vec!["--owner".to_string()];
1013        if let Some(owner) = owner {
1014            owner_arg.push(owner.to_string());
1015        };
1016        self.command()
1017            .await?
1018            .arg("set-preferred-owner")
1019            .args(["--chain-id", &chain_id.to_string()])
1020            .args(owner_arg)
1021            .spawn_and_wait_for_stdout()
1022            .await?;
1023        Ok(())
1024    }
1025
1026    pub async fn build_application(
1027        &self,
1028        path: &Path,
1029        name: &str,
1030        is_workspace: bool,
1031    ) -> Result<(PathBuf, PathBuf)> {
1032        Command::new("cargo")
1033            .current_dir(self.path_provider.path())
1034            .arg("build")
1035            .arg("--release")
1036            .args(["--target", "wasm32-unknown-unknown"])
1037            .arg("--manifest-path")
1038            .arg(path.join("Cargo.toml"))
1039            .spawn_and_wait_for_stdout()
1040            .await?;
1041
1042        let release_dir = match is_workspace {
1043            true => path.join("../target/wasm32-unknown-unknown/release"),
1044            false => path.join("target/wasm32-unknown-unknown/release"),
1045        };
1046
1047        let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1048        let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1049
1050        let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1051        let service_size = fs_err::tokio::metadata(&service).await?.len();
1052        info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1053
1054        Ok((contract, service))
1055    }
1056}
1057
1058impl Drop for ClientWrapper {
1059    fn drop(&mut self) {
1060        use std::process::Command as SyncCommand;
1061
1062        if self.on_drop != OnClientDrop::CloseChains {
1063            return;
1064        }
1065
1066        let Ok(binary_path) = self.binary_path.lock() else {
1067            error!("Failed to close chains because a thread panicked with a lock to `binary_path`");
1068            return;
1069        };
1070
1071        let Some(binary_path) = binary_path.as_ref() else {
1072            warn!(
1073                "Assuming no chains need to be closed, because the command binary was never \
1074                resolved and therefore presumably never called"
1075            );
1076            return;
1077        };
1078
1079        let working_directory = self.path_provider.path();
1080        let mut wallet_show_command = SyncCommand::new(binary_path);
1081
1082        for argument in self.command_arguments() {
1083            wallet_show_command.arg(&*argument);
1084        }
1085
1086        let Ok(wallet_show_output) = wallet_show_command
1087            .current_dir(working_directory)
1088            .args(["wallet", "show", "--short", "--owned"])
1089            .output()
1090        else {
1091            warn!("Failed to execute `wallet show --short` to list chains to close");
1092            return;
1093        };
1094
1095        if !wallet_show_output.status.success() {
1096            warn!("Failed to list chains in the wallet to close them");
1097            return;
1098        }
1099
1100        let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1101            warn!(
1102                "Failed to close chains because `linera wallet show --short` \
1103                returned a non-UTF-8 output"
1104            );
1105            return;
1106        };
1107
1108        let chain_ids = chain_list_string
1109            .split('\n')
1110            .map(|line| line.trim())
1111            .filter(|line| !line.is_empty());
1112
1113        for chain_id in chain_ids {
1114            let mut close_chain_command = SyncCommand::new(binary_path);
1115
1116            for argument in self.command_arguments() {
1117                close_chain_command.arg(&*argument);
1118            }
1119
1120            close_chain_command.current_dir(working_directory);
1121
1122            match close_chain_command.args(["close-chain", chain_id]).status() {
1123                Ok(status) if status.success() => (),
1124                Ok(failure) => warn!("Failed to close chain {chain_id}: {failure}"),
1125                Err(error) => warn!("Failed to close chain {chain_id}: {error}"),
1126            }
1127        }
1128    }
1129}
1130
1131#[cfg(with_testing)]
1132impl ClientWrapper {
1133    pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1134        self.build_application(Self::example_path(name)?.as_path(), name, true)
1135            .await
1136    }
1137
1138    pub fn example_path(name: &str) -> Result<PathBuf> {
1139        Ok(env::current_dir()?.join("../examples/").join(name))
1140    }
1141}
1142
1143fn truncate_query_output(input: &str) -> String {
1144    let max_len = 1000;
1145    if input.len() < max_len {
1146        input.to_string()
1147    } else {
1148        format!("{} ...", input.get(..max_len).unwrap())
1149    }
1150}
1151
1152fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1153    let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1154    let max_len = 200;
1155    if query.len() < max_len {
1156        query
1157    } else {
1158        format!("{} ...", query.get(..max_len).unwrap())
1159    }
1160}
1161
1162/// A running node service.
1163pub struct NodeService {
1164    port: u16,
1165    child: Child,
1166}
1167
1168impl NodeService {
1169    fn new(port: u16, child: Child) -> Self {
1170        Self { port, child }
1171    }
1172
1173    pub async fn terminate(mut self) -> Result<()> {
1174        self.child.kill().await.context("terminating node service")
1175    }
1176
1177    pub fn port(&self) -> u16 {
1178        self.port
1179    }
1180
1181    pub fn ensure_is_running(&mut self) -> Result<()> {
1182        self.child.ensure_is_running()
1183    }
1184
1185    pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1186        let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1187        let mut data = self.query_node(query).await?;
1188        Ok(serde_json::from_value(data["processInbox"].take())?)
1189    }
1190
1191    pub async fn make_application<A: ContractAbi>(
1192        &self,
1193        chain_id: &ChainId,
1194        application_id: &ApplicationId<A>,
1195    ) -> Result<ApplicationWrapper<A>> {
1196        let application_id = application_id.forget_abi().to_string();
1197        let link = format!(
1198            "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1199            self.port
1200        );
1201        Ok(ApplicationWrapper::from(link))
1202    }
1203
1204    pub async fn publish_data_blob(
1205        &self,
1206        chain_id: &ChainId,
1207        bytes: Vec<u8>,
1208    ) -> Result<CryptoHash> {
1209        let query = format!(
1210            "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1211            chain_id.to_value(),
1212            bytes.to_value(),
1213        );
1214        let data = self.query_node(query).await?;
1215        serde_json::from_value(data["publishDataBlob"].clone())
1216            .context("missing publishDataBlob field in response")
1217    }
1218
1219    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1220        &self,
1221        chain_id: &ChainId,
1222        contract: PathBuf,
1223        service: PathBuf,
1224        vm_runtime: VmRuntime,
1225    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1226        let contract_code = Bytecode::load_from_file(&contract).await?;
1227        let service_code = Bytecode::load_from_file(&service).await?;
1228        let query = format!(
1229            "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1230            chain_id.to_value(),
1231            contract_code.to_value(),
1232            service_code.to_value(),
1233            vm_runtime.to_value(),
1234        );
1235        let data = self.query_node(query).await?;
1236        let module_str = data["publishModule"]
1237            .as_str()
1238            .context("module ID not found")?;
1239        let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1240        Ok(module_id.with_abi())
1241    }
1242
1243    pub async fn query_committees(&self, chain_id: &ChainId) -> Result<BTreeMap<Epoch, Committee>> {
1244        let query = format!(
1245            "query {{ chain(chainId:\"{chain_id}\") {{
1246                executionState {{ system {{ committees }} }}
1247            }} }}"
1248        );
1249        let mut response = self.query_node(query).await?;
1250        let committees = response["chain"]["executionState"]["system"]["committees"].take();
1251        Ok(serde_json::from_value(committees)?)
1252    }
1253
1254    pub async fn events_from_index(
1255        &self,
1256        chain_id: &ChainId,
1257        stream_id: &StreamId,
1258        start_index: u32,
1259    ) -> Result<Vec<IndexAndEvent>> {
1260        let query = format!(
1261            "query {{
1262               eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1263               {{ index event }}
1264             }}",
1265            stream_id.to_value()
1266        );
1267        let mut response = self.query_node(query).await?;
1268        let response = response["eventsFromIndex"].take();
1269        Ok(serde_json::from_value(response)?)
1270    }
1271
1272    pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1273        let n_try = 5;
1274        let query = query.as_ref();
1275        for i in 0..n_try {
1276            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1277            let url = format!("http://localhost:{}/", self.port);
1278            let client = reqwest_client();
1279            let result = client
1280                .post(url)
1281                .json(&json!({ "query": query }))
1282                .send()
1283                .await;
1284            if matches!(result, Err(ref error) if error.is_timeout()) {
1285                warn!(
1286                    "Timeout when sending query {} to the node service",
1287                    truncate_query_output(query)
1288                );
1289                continue;
1290            }
1291            let response = result.with_context(|| {
1292                format!(
1293                    "query_node: failed to post query={}",
1294                    truncate_query_output(query)
1295                )
1296            })?;
1297            anyhow::ensure!(
1298                response.status().is_success(),
1299                "Query \"{}\" failed: {}",
1300                truncate_query_output(query),
1301                response
1302                    .text()
1303                    .await
1304                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1305            );
1306            let value: Value = response.json().await.context("invalid JSON")?;
1307            if let Some(errors) = value.get("errors") {
1308                warn!(
1309                    "Query \"{}\" failed: {}",
1310                    truncate_query_output(query),
1311                    errors
1312                );
1313            } else {
1314                return Ok(value["data"].clone());
1315            }
1316        }
1317        bail!(
1318            "Query \"{}\" failed after {} retries.",
1319            truncate_query_output(query),
1320            n_try
1321        );
1322    }
1323
1324    pub async fn create_application<
1325        Abi: ContractAbi,
1326        Parameters: Serialize,
1327        InstantiationArgument: Serialize,
1328    >(
1329        &self,
1330        chain_id: &ChainId,
1331        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1332        parameters: &Parameters,
1333        argument: &InstantiationArgument,
1334        required_application_ids: &[ApplicationId],
1335    ) -> Result<ApplicationId<Abi>> {
1336        let module_id = module_id.forget_abi();
1337        let json_required_applications_ids = required_application_ids
1338            .iter()
1339            .map(ApplicationId::to_string)
1340            .collect::<Vec<_>>()
1341            .to_value();
1342        // Convert to `serde_json::Value` then `async_graphql::Value` via the trait `InputType`.
1343        let new_parameters = serde_json::to_value(parameters)
1344            .context("could not create parameters JSON")?
1345            .to_value();
1346        let new_argument = serde_json::to_value(argument)
1347            .context("could not create argument JSON")?
1348            .to_value();
1349        let query = format!(
1350            "mutation {{ createApplication(\
1351                 chainId: \"{chain_id}\",
1352                 moduleId: \"{module_id}\", \
1353                 parameters: {new_parameters}, \
1354                 instantiationArgument: {new_argument}, \
1355                 requiredApplicationIds: {json_required_applications_ids}) \
1356             }}"
1357        );
1358        let data = self.query_node(query).await?;
1359        let app_id_str = data["createApplication"]
1360            .as_str()
1361            .context("missing createApplication string in response")?
1362            .trim();
1363        Ok(app_id_str
1364            .parse::<ApplicationId>()
1365            .context("invalid application ID")?
1366            .with_abi())
1367    }
1368
1369    /// Obtains the hash of the `chain`'s tip block, as known by this node service.
1370    pub async fn chain_tip_hash(&self, chain: ChainId) -> Result<Option<CryptoHash>> {
1371        let query = format!(r#"query {{ block(chainId: "{chain}") {{ hash }} }}"#);
1372
1373        let mut response = self.query_node(&query).await?;
1374
1375        match mem::take(&mut response["block"]["hash"]) {
1376            Value::Null => Ok(None),
1377            Value::String(hash) => Ok(Some(
1378                hash.parse()
1379                    .context("Received an invalid hash {hash:?} for chain tip")?,
1380            )),
1381            invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
1382        }
1383    }
1384
1385    /// Subscribes to the node service and returns a stream of notifications about a chain.
1386    pub async fn notifications(
1387        &self,
1388        chain_id: ChainId,
1389    ) -> Result<impl Stream<Item = Result<Notification>>> {
1390        let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
1391        let url = format!("ws://localhost:{}/ws", self.port);
1392        let mut request = url.into_client_request()?;
1393        request.headers_mut().insert(
1394            "Sec-WebSocket-Protocol",
1395            HeaderValue::from_str("graphql-transport-ws")?,
1396        );
1397        let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1398        let init_json = json!({
1399          "type": "connection_init",
1400          "payload": {}
1401        });
1402        websocket.send(init_json.to_string().into()).await?;
1403        let text = websocket
1404            .next()
1405            .await
1406            .context("Failed to establish connection")??
1407            .into_text()?;
1408        ensure!(
1409            text == "{\"type\":\"connection_ack\"}",
1410            "Unexpected response: {text}"
1411        );
1412        let query_json = json!({
1413          "id": "1",
1414          "type": "start",
1415          "payload": {
1416            "query": query,
1417            "variables": {},
1418            "operationName": null
1419          }
1420        });
1421        websocket.send(query_json.to_string().into()).await?;
1422        Ok(websocket
1423            .map_err(anyhow::Error::from)
1424            .and_then(|message| async {
1425                let text = message.into_text()?;
1426                let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1427                if let Some(errors) = value["payload"].get("errors") {
1428                    bail!("Notification subscription failed: {errors:?}");
1429                }
1430                serde_json::from_value(value["payload"]["data"]["notifications"].clone())
1431                    .context("Failed to deserialize notification")
1432            }))
1433    }
1434}
1435
1436/// A running faucet service.
1437pub struct FaucetService {
1438    port: u16,
1439    child: Child,
1440}
1441
1442impl FaucetService {
1443    fn new(port: u16, child: Child) -> Self {
1444        Self { port, child }
1445    }
1446
1447    pub async fn terminate(mut self) -> Result<()> {
1448        self.child
1449            .kill()
1450            .await
1451            .context("terminating faucet service")
1452    }
1453
1454    pub fn ensure_is_running(&mut self) -> Result<()> {
1455        self.child.ensure_is_running()
1456    }
1457
1458    pub fn instance(&self) -> Faucet {
1459        Faucet::new(format!("http://localhost:{}/", self.port))
1460    }
1461}
1462
1463/// A running `Application` to be queried in GraphQL.
1464pub struct ApplicationWrapper<A> {
1465    uri: String,
1466    _phantom: PhantomData<A>,
1467}
1468
1469impl<A> ApplicationWrapper<A> {
1470    pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
1471        let query = query.as_ref();
1472        let value = self.run_json_query(json!({ "query": query })).await?;
1473        Ok(value["data"].clone())
1474    }
1475
1476    pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
1477        const MAX_RETRIES: usize = 5;
1478
1479        for i in 0.. {
1480            let client = reqwest_client();
1481            let result = client.post(&self.uri).json(&query).send().await;
1482            let response = match result {
1483                Ok(response) => response,
1484                Err(error) if i < MAX_RETRIES => {
1485                    warn!(
1486                        "Failed to post query \"{}\": {error}; retrying",
1487                        truncate_query_output_serialize(&query),
1488                    );
1489                    continue;
1490                }
1491                Err(error) => {
1492                    let query = truncate_query_output_serialize(&query);
1493                    return Err(error)
1494                        .with_context(|| format!("run_json_query: failed to post query={query}"));
1495                }
1496            };
1497            anyhow::ensure!(
1498                response.status().is_success(),
1499                "Query \"{}\" failed: {}",
1500                truncate_query_output_serialize(&query),
1501                response
1502                    .text()
1503                    .await
1504                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1505            );
1506            let value: Value = response.json().await.context("invalid JSON")?;
1507            if let Some(errors) = value.get("errors") {
1508                bail!(
1509                    "Query \"{}\" failed: {}",
1510                    truncate_query_output_serialize(&query),
1511                    errors
1512                );
1513            }
1514            return Ok(value);
1515        }
1516        unreachable!()
1517    }
1518
1519    pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
1520        let query = query.as_ref();
1521        self.run_graphql_query(&format!("query {{ {query} }}"))
1522            .await
1523    }
1524
1525    pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
1526        let query = query.as_ref().trim();
1527        let name = query
1528            .split_once(|ch: char| !ch.is_alphanumeric())
1529            .map_or(query, |(name, _)| name);
1530        let data = self.query(query).await?;
1531        serde_json::from_value(data[name].clone())
1532            .with_context(|| format!("{name} field missing in response"))
1533    }
1534
1535    pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
1536        let mutation = mutation.as_ref();
1537        self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
1538            .await
1539    }
1540}
1541
1542impl<A> From<String> for ApplicationWrapper<A> {
1543    fn from(uri: String) -> ApplicationWrapper<A> {
1544        ApplicationWrapper {
1545            uri,
1546            _phantom: PhantomData,
1547        }
1548    }
1549}