linera_service/cli_wrappers/
wallet.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    borrow::Cow,
6    collections::BTreeMap,
7    env,
8    marker::PhantomData,
9    mem,
10    path::{Path, PathBuf},
11    str::FromStr,
12    sync,
13    time::Duration,
14};
15
16use anyhow::{bail, ensure, Context, Result};
17use async_graphql::InputType;
18use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
19use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
20use heck::ToKebabCase;
21use linera_base::{
22    abi::ContractAbi,
23    command::{resolve_binary, CommandExt},
24    crypto::{CryptoHash, InMemorySigner},
25    data_types::{Amount, Bytecode, Epoch},
26    identifiers::{
27        Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
28    },
29    vm::VmRuntime,
30};
31use linera_client::{client_options::ResourceControlPolicyConfig, wallet::Wallet};
32use linera_core::worker::Notification;
33use linera_execution::committee::Committee;
34use linera_faucet_client::Faucet;
35use serde::{de::DeserializeOwned, ser::Serialize};
36use serde_json::{json, Value};
37use tempfile::TempDir;
38use tokio::process::{Child, Command};
39use tracing::{error, info, warn};
40#[cfg(feature = "benchmark")]
41use {
42    crate::cli::command::BenchmarkCommand,
43    serde_command_opts::to_args,
44    std::process::Stdio,
45    tokio::{
46        io::{AsyncBufReadExt, BufReader},
47        sync::oneshot,
48        task::JoinHandle,
49    },
50};
51
52use crate::{
53    cli_wrappers::{
54        local_net::{PathProvider, ProcessInbox},
55        Network,
56    },
57    util::{self, ChildExt},
58};
59
60/// The name of the environment variable that allows specifying additional arguments to be passed
61/// to the node-service command of the client.
62const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
63
64fn reqwest_client() -> reqwest::Client {
65    reqwest::ClientBuilder::new()
66        .timeout(Duration::from_secs(30))
67        .build()
68        .unwrap()
69}
70
71/// Wrapper to run a Linera client command.
72pub struct ClientWrapper {
73    binary_path: sync::Mutex<Option<PathBuf>>,
74    testing_prng_seed: Option<u64>,
75    storage: String,
76    wallet: String,
77    keystore: String,
78    max_pending_message_bundles: usize,
79    network: Network,
80    pub path_provider: PathProvider,
81    on_drop: OnClientDrop,
82    extra_args: Vec<String>,
83}
84
85/// Action to perform when the [`ClientWrapper`] is dropped.
86#[derive(Clone, Copy, Debug, Eq, PartialEq)]
87pub enum OnClientDrop {
88    /// Close all the chains on the wallet.
89    CloseChains,
90    /// Do not close any chains, leaving them active.
91    LeakChains,
92}
93
94impl ClientWrapper {
95    pub fn new(
96        path_provider: PathProvider,
97        network: Network,
98        testing_prng_seed: Option<u64>,
99        id: usize,
100        on_drop: OnClientDrop,
101    ) -> Self {
102        Self::new_with_extra_args(
103            path_provider,
104            network,
105            testing_prng_seed,
106            id,
107            on_drop,
108            vec!["--wait-for-outgoing-messages".to_string()],
109        )
110    }
111
112    pub fn new_with_extra_args(
113        path_provider: PathProvider,
114        network: Network,
115        testing_prng_seed: Option<u64>,
116        id: usize,
117        on_drop: OnClientDrop,
118        extra_args: Vec<String>,
119    ) -> Self {
120        let storage = format!(
121            "rocksdb:{}/client_{}.db",
122            path_provider.path().display(),
123            id
124        );
125        let wallet = format!("wallet_{}.json", id);
126        let keystore = format!("keystore_{}.json", id);
127        Self {
128            binary_path: sync::Mutex::new(None),
129            testing_prng_seed,
130            storage,
131            wallet,
132            keystore,
133            max_pending_message_bundles: 10_000,
134            network,
135            path_provider,
136            on_drop,
137            extra_args,
138        }
139    }
140
141    /// Runs `linera project new`.
142    pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
143        let tmp = TempDir::new()?;
144        let mut command = self.command().await?;
145        command
146            .current_dir(tmp.path())
147            .arg("project")
148            .arg("new")
149            .arg(project_name)
150            .arg("--linera-root")
151            .arg(linera_root)
152            .spawn_and_wait_for_stdout()
153            .await?;
154        Ok(tmp)
155    }
156
157    /// Runs `linera project publish`.
158    pub async fn project_publish<T: Serialize>(
159        &self,
160        path: PathBuf,
161        required_application_ids: Vec<String>,
162        publisher: impl Into<Option<ChainId>>,
163        argument: &T,
164    ) -> Result<String> {
165        let json_parameters = serde_json::to_string(&())?;
166        let json_argument = serde_json::to_string(argument)?;
167        let mut command = self.command().await?;
168        command
169            .arg("project")
170            .arg("publish-and-create")
171            .arg(path)
172            .args(publisher.into().iter().map(ChainId::to_string))
173            .args(["--json-parameters", &json_parameters])
174            .args(["--json-argument", &json_argument]);
175        if !required_application_ids.is_empty() {
176            command.arg("--required-application-ids");
177            command.args(required_application_ids);
178        }
179        let stdout = command.spawn_and_wait_for_stdout().await?;
180        Ok(stdout.trim().to_string())
181    }
182
183    /// Runs `linera project test`.
184    pub async fn project_test(&self, path: &Path) -> Result<()> {
185        self.command()
186            .await
187            .context("failed to create project test command")?
188            .current_dir(path)
189            .arg("project")
190            .arg("test")
191            .spawn_and_wait_for_stdout()
192            .await?;
193        Ok(())
194    }
195
196    async fn command_with_envs_and_arguments(
197        &self,
198        envs: &[(&str, &str)],
199        arguments: impl IntoIterator<Item = Cow<'_, str>>,
200    ) -> Result<Command> {
201        let mut command = self.command_binary().await?;
202        command.current_dir(self.path_provider.path());
203        for (key, value) in envs {
204            command.env(key, value);
205        }
206        for argument in arguments {
207            command.arg(&*argument);
208        }
209        Ok(command)
210    }
211
212    async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
213        self.command_with_envs_and_arguments(envs, self.command_arguments())
214            .await
215    }
216
217    #[cfg(feature = "benchmark")]
218    async fn command_with_arguments(
219        &self,
220        arguments: impl IntoIterator<Item = Cow<'_, str>>,
221    ) -> Result<Command> {
222        self.command_with_envs_and_arguments(
223            &[(
224                "RUST_LOG",
225                &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
226            )],
227            arguments,
228        )
229        .await
230    }
231
232    async fn command(&self) -> Result<Command> {
233        self.command_with_envs(&[(
234            "RUST_LOG",
235            &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
236        )])
237        .await
238    }
239
240    fn required_command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
241        [
242            "--wallet".into(),
243            self.wallet.as_str().into(),
244            "--keystore".into(),
245            self.keystore.as_str().into(),
246            "--storage".into(),
247            self.storage.as_str().into(),
248            "--send-timeout-ms".into(),
249            "500000".into(),
250            "--recv-timeout-ms".into(),
251            "500000".into(),
252        ]
253        .into_iter()
254        .chain(self.extra_args.iter().map(|s| s.as_str().into()))
255    }
256
257    /// Returns an iterator over the arguments that should be added to all command invocations.
258    fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
259        self.required_command_arguments().chain([
260            "--max-pending-message-bundles".into(),
261            self.max_pending_message_bundles.to_string().into(),
262        ])
263    }
264
265    /// Returns the [`Command`] instance configured to run the appropriate binary.
266    ///
267    /// The path is resolved once and cached inside `self` for subsequent usages.
268    async fn command_binary(&self) -> Result<Command> {
269        match self.command_with_cached_binary_path() {
270            Some(command) => Ok(command),
271            None => {
272                let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
273                let command = Command::new(&resolved_path);
274
275                self.set_cached_binary_path(resolved_path);
276
277                Ok(command)
278            }
279        }
280    }
281
282    /// Returns a [`Command`] instance configured with the cached `binary_path`, if available.
283    fn command_with_cached_binary_path(&self) -> Option<Command> {
284        let binary_path = self.binary_path.lock().unwrap();
285
286        binary_path.as_ref().map(Command::new)
287    }
288
289    /// Sets the cached `binary_path` with the `new_binary_path`.
290    ///
291    /// # Panics
292    ///
293    /// If the cache is already set to a different value. In theory the two threads calling
294    /// `command_binary` can race and resolve the binary path twice, but they should always be the
295    /// same path.
296    fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
297        let mut binary_path = self.binary_path.lock().unwrap();
298
299        if binary_path.is_none() {
300            *binary_path = Some(new_binary_path);
301        } else {
302            assert_eq!(*binary_path, Some(new_binary_path));
303        }
304    }
305
306    /// Runs `linera create-genesis-config`.
307    pub async fn create_genesis_config(
308        &self,
309        num_other_initial_chains: u32,
310        initial_funding: Amount,
311        policy_config: ResourceControlPolicyConfig,
312        http_allow_list: Option<Vec<String>>,
313    ) -> Result<()> {
314        let mut command = self.command().await?;
315        command
316            .args([
317                "create-genesis-config",
318                &num_other_initial_chains.to_string(),
319            ])
320            .args(["--initial-funding", &initial_funding.to_string()])
321            .args(["--committee", "committee.json"])
322            .args(["--genesis", "genesis.json"])
323            .args([
324                "--policy-config",
325                &policy_config.to_string().to_kebab_case(),
326            ]);
327        if let Some(allow_list) = http_allow_list {
328            command
329                .arg("--http-request-allow-list")
330                .arg(allow_list.join(","));
331        }
332        if let Some(seed) = self.testing_prng_seed {
333            command.arg("--testing-prng-seed").arg(seed.to_string());
334        }
335        command.spawn_and_wait_for_stdout().await?;
336        Ok(())
337    }
338
339    /// Runs `linera wallet init`. The genesis config is read from `genesis.json`, or from the
340    /// faucet if provided.
341    pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
342        let mut command = self.command().await?;
343        command.args(["wallet", "init"]);
344        match faucet {
345            None => command.args(["--genesis", "genesis.json"]),
346            Some(faucet) => command.args(["--faucet", faucet.url()]),
347        };
348        if let Some(seed) = self.testing_prng_seed {
349            command.arg("--testing-prng-seed").arg(seed.to_string());
350        }
351        command.spawn_and_wait_for_stdout().await?;
352        Ok(())
353    }
354
355    /// Runs `linera wallet request-chain`.
356    pub async fn request_chain(
357        &self,
358        faucet: &Faucet,
359        set_default: bool,
360    ) -> Result<(ChainId, AccountOwner)> {
361        let mut command = self.command().await?;
362        command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
363        if set_default {
364            command.arg("--set-default");
365        }
366        let stdout = command.spawn_and_wait_for_stdout().await?;
367        let mut lines = stdout.split_whitespace();
368        let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
369        let owner = lines.next().context("missing chain owner")?.parse()?;
370        Ok((chain_id, owner))
371    }
372
373    /// Runs `linera wallet publish-and-create`.
374    #[expect(clippy::too_many_arguments)]
375    pub async fn publish_and_create<
376        A: ContractAbi,
377        Parameters: Serialize,
378        InstantiationArgument: Serialize,
379    >(
380        &self,
381        contract: PathBuf,
382        service: PathBuf,
383        vm_runtime: VmRuntime,
384        parameters: &Parameters,
385        argument: &InstantiationArgument,
386        required_application_ids: &[ApplicationId],
387        publisher: impl Into<Option<ChainId>>,
388    ) -> Result<ApplicationId<A>> {
389        let json_parameters = serde_json::to_string(parameters)?;
390        let json_argument = serde_json::to_string(argument)?;
391        let mut command = self.command().await?;
392        let vm_runtime = format!("{}", vm_runtime);
393        command
394            .arg("publish-and-create")
395            .args([contract, service])
396            .args(["--vm-runtime", &vm_runtime.to_lowercase()])
397            .args(publisher.into().iter().map(ChainId::to_string))
398            .args(["--json-parameters", &json_parameters])
399            .args(["--json-argument", &json_argument]);
400        if !required_application_ids.is_empty() {
401            command.arg("--required-application-ids");
402            command.args(
403                required_application_ids
404                    .iter()
405                    .map(ApplicationId::to_string),
406            );
407        }
408        let stdout = command.spawn_and_wait_for_stdout().await?;
409        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
410    }
411
412    /// Runs `linera publish-module`.
413    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
414        &self,
415        contract: PathBuf,
416        service: PathBuf,
417        vm_runtime: VmRuntime,
418        publisher: impl Into<Option<ChainId>>,
419    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
420        let stdout = self
421            .command()
422            .await?
423            .arg("publish-module")
424            .args([contract, service])
425            .args(["--vm-runtime", &format!("{}", vm_runtime).to_lowercase()])
426            .args(publisher.into().iter().map(ChainId::to_string))
427            .spawn_and_wait_for_stdout()
428            .await?;
429        let module_id: ModuleId = stdout.trim().parse()?;
430        Ok(module_id.with_abi())
431    }
432
433    /// Runs `linera create-application`.
434    pub async fn create_application<
435        Abi: ContractAbi,
436        Parameters: Serialize,
437        InstantiationArgument: Serialize,
438    >(
439        &self,
440        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
441        parameters: &Parameters,
442        argument: &InstantiationArgument,
443        required_application_ids: &[ApplicationId],
444        creator: impl Into<Option<ChainId>>,
445    ) -> Result<ApplicationId<Abi>> {
446        let json_parameters = serde_json::to_string(parameters)?;
447        let json_argument = serde_json::to_string(argument)?;
448        let mut command = self.command().await?;
449        command
450            .arg("create-application")
451            .arg(module_id.forget_abi().to_string())
452            .args(["--json-parameters", &json_parameters])
453            .args(["--json-argument", &json_argument])
454            .args(creator.into().iter().map(ChainId::to_string));
455        if !required_application_ids.is_empty() {
456            command.arg("--required-application-ids");
457            command.args(
458                required_application_ids
459                    .iter()
460                    .map(ApplicationId::to_string),
461            );
462        }
463        let stdout = command.spawn_and_wait_for_stdout().await?;
464        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
465    }
466
467    /// Runs `linera service`.
468    pub async fn run_node_service(
469        &self,
470        port: impl Into<Option<u16>>,
471        process_inbox: ProcessInbox,
472    ) -> Result<NodeService> {
473        let port = port.into().unwrap_or(8080);
474        let mut command = self.command().await?;
475        command.arg("service");
476        if let ProcessInbox::Skip = process_inbox {
477            command.arg("--listener-skip-process-inbox");
478        }
479        if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
480            command.args(var.split_whitespace());
481        }
482        let child = command
483            .args(["--port".to_string(), port.to_string()])
484            .spawn_into()?;
485        let client = reqwest_client();
486        for i in 0..10 {
487            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
488            let request = client
489                .get(format!("http://localhost:{}/", port))
490                .send()
491                .await;
492            if request.is_ok() {
493                info!("Node service has started");
494                return Ok(NodeService::new(port, child));
495            } else {
496                warn!("Waiting for node service to start");
497            }
498        }
499        bail!("Failed to start node service");
500    }
501
502    /// Runs `linera query-validator`
503    pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
504        let mut command = self.command().await?;
505        command.arg("query-validator").arg(address);
506        let stdout = command.spawn_and_wait_for_stdout().await?;
507        let hash = stdout
508            .trim()
509            .parse()
510            .context("error while parsing the result of `linera query-validator`")?;
511        Ok(hash)
512    }
513
514    /// Runs `linera query-validators`.
515    pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
516        let mut command = self.command().await?;
517        command.arg("query-validators");
518        if let Some(chain_id) = chain_id {
519            command.arg(chain_id.to_string());
520        }
521        command.spawn_and_wait_for_stdout().await?;
522        Ok(())
523    }
524
525    /// Runs `linera sync-validator`.
526    pub async fn sync_validator(
527        &self,
528        chain_ids: impl IntoIterator<Item = &ChainId>,
529        validator_address: impl Into<String>,
530    ) -> Result<()> {
531        let mut command = self.command().await?;
532        command.arg("sync-validator").arg(validator_address.into());
533        let mut chain_ids = chain_ids.into_iter().peekable();
534        if chain_ids.peek().is_some() {
535            command
536                .arg("--chains")
537                .args(chain_ids.map(ChainId::to_string));
538        }
539        command.spawn_and_wait_for_stdout().await?;
540        Ok(())
541    }
542
543    /// Runs `linera faucet`.
544    pub async fn run_faucet(
545        &self,
546        port: impl Into<Option<u16>>,
547        chain_id: ChainId,
548        amount: Amount,
549    ) -> Result<FaucetService> {
550        let port = port.into().unwrap_or(8080);
551        let mut command = self.command().await?;
552        let child = command
553            .arg("faucet")
554            .arg(chain_id.to_string())
555            .args(["--port".to_string(), port.to_string()])
556            .args(["--amount".to_string(), amount.to_string()])
557            .spawn_into()?;
558        let client = reqwest_client();
559        for i in 0..10 {
560            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
561            let request = client
562                .get(format!("http://localhost:{}/", port))
563                .send()
564                .await;
565            if request.is_ok() {
566                info!("Faucet has started");
567                return Ok(FaucetService::new(port, child));
568            } else {
569                warn!("Waiting for faucet to start");
570            }
571        }
572        bail!("Failed to start faucet");
573    }
574
575    /// Runs `linera local-balance`.
576    pub async fn local_balance(&self, account: Account) -> Result<Amount> {
577        let stdout = self
578            .command()
579            .await?
580            .arg("local-balance")
581            .arg(account.to_string())
582            .spawn_and_wait_for_stdout()
583            .await?;
584        let amount = stdout
585            .trim()
586            .parse()
587            .context("error while parsing the result of `linera local-balance`")?;
588        Ok(amount)
589    }
590
591    /// Runs `linera query-balance`.
592    pub async fn query_balance(&self, account: Account) -> Result<Amount> {
593        let stdout = self
594            .command()
595            .await?
596            .arg("query-balance")
597            .arg(account.to_string())
598            .spawn_and_wait_for_stdout()
599            .await?;
600        let amount = stdout
601            .trim()
602            .parse()
603            .context("error while parsing the result of `linera query-balance`")?;
604        Ok(amount)
605    }
606
607    /// Runs `linera sync`.
608    pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
609        self.command()
610            .await?
611            .arg("sync")
612            .arg(chain_id.to_string())
613            .spawn_and_wait_for_stdout()
614            .await?;
615        Ok(())
616    }
617
618    /// Runs `linera process-inbox`.
619    pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
620        self.command()
621            .await?
622            .arg("process-inbox")
623            .arg(chain_id.to_string())
624            .spawn_and_wait_for_stdout()
625            .await?;
626        Ok(())
627    }
628
629    /// Runs `linera transfer`.
630    pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
631        self.command()
632            .await?
633            .arg("transfer")
634            .arg(amount.to_string())
635            .args(["--from", &from.to_string()])
636            .args(["--to", &to.to_string()])
637            .spawn_and_wait_for_stdout()
638            .await?;
639        Ok(())
640    }
641
642    /// Runs `linera transfer` with no logging.
643    pub async fn transfer_with_silent_logs(
644        &self,
645        amount: Amount,
646        from: ChainId,
647        to: ChainId,
648    ) -> Result<()> {
649        self.command()
650            .await?
651            .env("RUST_LOG", "off")
652            .arg("transfer")
653            .arg(amount.to_string())
654            .args(["--from", &from.to_string()])
655            .args(["--to", &to.to_string()])
656            .spawn_and_wait_for_stdout()
657            .await?;
658        Ok(())
659    }
660
661    /// Runs `linera transfer` with owner accounts.
662    pub async fn transfer_with_accounts(
663        &self,
664        amount: Amount,
665        from: Account,
666        to: Account,
667    ) -> Result<()> {
668        self.command()
669            .await?
670            .arg("transfer")
671            .arg(amount.to_string())
672            .args(["--from", &from.to_string()])
673            .args(["--to", &to.to_string()])
674            .spawn_and_wait_for_stdout()
675            .await?;
676        Ok(())
677    }
678
679    #[cfg(feature = "benchmark")]
680    fn benchmark_command_internal(command: &mut Command, args: BenchmarkCommand) -> Result<()> {
681        let formatted_args = to_args(&args)?
682            .chunks_exact(2)
683            .flat_map(|pair| {
684                let option = format!("--{}", pair[0]);
685                match pair[1].as_str() {
686                    "true" => vec![option],
687                    "false" => vec![],
688                    _ => vec![option, pair[1].clone()],
689                }
690            })
691            .collect::<Vec<_>>();
692        command
693            // For benchmarks, we need to enforce a large enough max pending message bundles.
694            .args([
695                "--max-pending-message-bundles",
696                &args.transactions_per_block.to_string(),
697            ])
698            .arg("benchmark")
699            .args(formatted_args);
700        Ok(())
701    }
702
703    #[cfg(feature = "benchmark")]
704    async fn benchmark_command_with_envs(
705        &self,
706        args: BenchmarkCommand,
707        envs: &[(&str, &str)],
708    ) -> Result<Command> {
709        let mut command = self
710            .command_with_envs_and_arguments(envs, self.required_command_arguments())
711            .await?;
712        Self::benchmark_command_internal(&mut command, args)?;
713        Ok(command)
714    }
715
716    #[cfg(feature = "benchmark")]
717    async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
718        let mut command = self
719            .command_with_arguments(self.required_command_arguments())
720            .await?;
721        Self::benchmark_command_internal(&mut command, args)?;
722        Ok(command)
723    }
724
725    /// Runs `linera benchmark`.
726    #[cfg(feature = "benchmark")]
727    pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
728        let mut command = self.benchmark_command(args).await?;
729        command.spawn_and_wait_for_stdout().await?;
730        Ok(())
731    }
732
733    /// Runs `linera benchmark`, but detached: don't wait for the command to finish, just spawn it
734    /// and return the child process, and the handles to the stdout and stderr.
735    #[cfg(feature = "benchmark")]
736    pub async fn benchmark_detached(
737        &self,
738        args: BenchmarkCommand,
739        tx: oneshot::Sender<()>,
740    ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
741        let mut child = self
742            .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
743            .await?
744            .kill_on_drop(true)
745            .stdin(Stdio::piped())
746            .stdout(Stdio::piped())
747            .stderr(Stdio::piped())
748            .spawn()?;
749
750        let pid = child.id().expect("failed to get pid");
751        let stdout = child.stdout.take().expect("stdout not open");
752        let stdout_handle = tokio::spawn(async move {
753            let mut lines = BufReader::new(stdout).lines();
754            while let Ok(Some(line)) = lines.next_line().await {
755                println!("benchmark{{pid={pid}}} {line}");
756            }
757        });
758
759        let stderr = child.stderr.take().expect("stderr not open");
760        let stderr_handle = tokio::spawn(async move {
761            let mut lines = BufReader::new(stderr).lines();
762            let mut tx = Some(tx);
763            while let Ok(Some(line)) = lines.next_line().await {
764                if line.contains("Ready to start benchmark") {
765                    tx.take()
766                        .expect("Should only send signal once")
767                        .send(())
768                        .expect("failed to send ready signal to main thread");
769                } else {
770                    println!("benchmark{{pid={pid}}} {line}");
771                }
772            }
773        });
774        Ok((child, stdout_handle, stderr_handle))
775    }
776
777    /// Runs `linera open-chain`.
778    pub async fn open_chain(
779        &self,
780        from: ChainId,
781        owner: Option<AccountOwner>,
782        initial_balance: Amount,
783    ) -> Result<(ChainId, AccountOwner)> {
784        let mut command = self.command().await?;
785        command
786            .arg("open-chain")
787            .args(["--from", &from.to_string()])
788            .args(["--initial-balance", &initial_balance.to_string()]);
789
790        if let Some(owner) = owner {
791            command.args(["--owner", &owner.to_string()]);
792        }
793
794        let stdout = command.spawn_and_wait_for_stdout().await?;
795        let mut split = stdout.split('\n');
796        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
797        let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
798        if let Some(owner) = owner {
799            assert_eq!(owner, new_owner);
800        }
801        Ok((chain_id, new_owner))
802    }
803
804    /// Runs `linera open-chain` then `linera assign`.
805    pub async fn open_and_assign(
806        &self,
807        client: &ClientWrapper,
808        initial_balance: Amount,
809    ) -> Result<ChainId> {
810        let our_chain = self
811            .load_wallet()?
812            .default_chain()
813            .context("no default chain found")?;
814        let owner = client.keygen().await?;
815        let (new_chain, _) = self
816            .open_chain(our_chain, Some(owner), initial_balance)
817            .await?;
818        client.assign(owner, new_chain).await?;
819        Ok(new_chain)
820    }
821
822    pub async fn open_multi_owner_chain(
823        &self,
824        from: ChainId,
825        owners: Vec<AccountOwner>,
826        weights: Vec<u64>,
827        multi_leader_rounds: u32,
828        balance: Amount,
829        base_timeout_ms: u64,
830    ) -> Result<ChainId> {
831        let mut command = self.command().await?;
832        command
833            .arg("open-multi-owner-chain")
834            .args(["--from", &from.to_string()])
835            .arg("--owners")
836            .args(owners.iter().map(AccountOwner::to_string))
837            .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
838        if !weights.is_empty() {
839            command
840                .arg("--owner-weights")
841                .args(weights.iter().map(u64::to_string));
842        };
843        command
844            .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
845            .args(["--initial-balance", &balance.to_string()]);
846
847        let stdout = command.spawn_and_wait_for_stdout().await?;
848        let mut split = stdout.split('\n');
849        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
850
851        Ok(chain_id)
852    }
853
854    pub async fn change_ownership(
855        &self,
856        chain_id: ChainId,
857        super_owners: Vec<AccountOwner>,
858        owners: Vec<AccountOwner>,
859    ) -> Result<()> {
860        let mut command = self.command().await?;
861        command
862            .arg("change-ownership")
863            .args(["--chain-id", &chain_id.to_string()]);
864        if !super_owners.is_empty() {
865            command
866                .arg("--super-owners")
867                .args(super_owners.iter().map(AccountOwner::to_string));
868        }
869        if !owners.is_empty() {
870            command
871                .arg("--owners")
872                .args(owners.iter().map(AccountOwner::to_string));
873        }
874        command.spawn_and_wait_for_stdout().await?;
875        Ok(())
876    }
877
878    /// Runs `linera wallet follow-chain CHAIN_ID`.
879    pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
880        let mut command = self.command().await?;
881        command
882            .args(["wallet", "follow-chain"])
883            .arg(chain_id.to_string());
884        if sync {
885            command.arg("--sync");
886        }
887        command.spawn_and_wait_for_stdout().await?;
888        Ok(())
889    }
890
891    /// Runs `linera wallet forget-chain CHAIN_ID`.
892    pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
893        let mut command = self.command().await?;
894        command
895            .args(["wallet", "forget-chain"])
896            .arg(chain_id.to_string());
897        command.spawn_and_wait_for_stdout().await?;
898        Ok(())
899    }
900
901    pub async fn retry_pending_block(
902        &self,
903        chain_id: Option<ChainId>,
904    ) -> Result<Option<CryptoHash>> {
905        let mut command = self.command().await?;
906        command.arg("retry-pending-block");
907        if let Some(chain_id) = chain_id {
908            command.arg(chain_id.to_string());
909        }
910        let stdout = command.spawn_and_wait_for_stdout().await?;
911        let stdout = stdout.trim();
912        if stdout.is_empty() {
913            Ok(None)
914        } else {
915            Ok(Some(CryptoHash::from_str(stdout)?))
916        }
917    }
918
919    /// Runs `linera publish-data-blob`.
920    pub async fn publish_data_blob(
921        &self,
922        path: &Path,
923        chain_id: Option<ChainId>,
924    ) -> Result<CryptoHash> {
925        let mut command = self.command().await?;
926        command.arg("publish-data-blob").arg(path);
927        if let Some(chain_id) = chain_id {
928            command.arg(chain_id.to_string());
929        }
930        let stdout = command.spawn_and_wait_for_stdout().await?;
931        let stdout = stdout.trim();
932        Ok(CryptoHash::from_str(stdout)?)
933    }
934
935    /// Runs `linera read-data-blob`.
936    pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
937        let mut command = self.command().await?;
938        command.arg("read-data-blob").arg(hash.to_string());
939        if let Some(chain_id) = chain_id {
940            command.arg(chain_id.to_string());
941        }
942        command.spawn_and_wait_for_stdout().await?;
943        Ok(())
944    }
945
946    pub fn load_wallet(&self) -> Result<Wallet> {
947        util::read_json(self.wallet_path())
948    }
949
950    pub fn load_keystore(&self) -> Result<InMemorySigner> {
951        util::read_json(self.keystore_path())
952    }
953
954    pub fn wallet_path(&self) -> PathBuf {
955        self.path_provider.path().join(&self.wallet)
956    }
957
958    pub fn keystore_path(&self) -> PathBuf {
959        self.path_provider.path().join(&self.keystore)
960    }
961
962    pub fn storage_path(&self) -> &str {
963        &self.storage
964    }
965
966    pub fn get_owner(&self) -> Option<AccountOwner> {
967        let wallet = self.load_wallet().ok()?;
968        let chain_id = wallet.default_chain()?;
969        wallet.get(chain_id)?.owner
970    }
971
972    pub async fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
973        self.load_wallet()
974            .ok()
975            .is_some_and(|wallet| wallet.get(chain).is_some())
976    }
977
978    pub async fn set_validator(
979        &self,
980        validator_key: &(String, String),
981        port: usize,
982        votes: usize,
983    ) -> Result<()> {
984        let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
985        self.command()
986            .await?
987            .arg("set-validator")
988            .args(["--public-key", &validator_key.0])
989            .args(["--account-key", &validator_key.1])
990            .args(["--address", &address])
991            .args(["--votes", &votes.to_string()])
992            .spawn_and_wait_for_stdout()
993            .await?;
994        Ok(())
995    }
996
997    pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
998        self.command()
999            .await?
1000            .arg("remove-validator")
1001            .args(["--public-key", validator_key])
1002            .spawn_and_wait_for_stdout()
1003            .await?;
1004        Ok(())
1005    }
1006
1007    pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
1008        self.command()
1009            .await?
1010            .arg("revoke-epochs")
1011            .arg(epoch.to_string())
1012            .spawn_and_wait_for_stdout()
1013            .await?;
1014        Ok(())
1015    }
1016
1017    /// Runs `linera keygen`.
1018    pub async fn keygen(&self) -> Result<AccountOwner> {
1019        let stdout = self
1020            .command()
1021            .await?
1022            .arg("keygen")
1023            .spawn_and_wait_for_stdout()
1024            .await?;
1025        AccountOwner::from_str(stdout.as_str().trim())
1026    }
1027
1028    /// Returns the default chain.
1029    pub fn default_chain(&self) -> Option<ChainId> {
1030        self.load_wallet().ok()?.default_chain()
1031    }
1032
1033    /// Runs `linera assign`.
1034    pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
1035        let _stdout = self
1036            .command()
1037            .await?
1038            .arg("assign")
1039            .args(["--owner", &owner.to_string()])
1040            .args(["--chain-id", &chain_id.to_string()])
1041            .spawn_and_wait_for_stdout()
1042            .await?;
1043        Ok(())
1044    }
1045
1046    /// Runs `linera set-preferred-owner` for `chain_id`.
1047    pub async fn set_preferred_owner(
1048        &self,
1049        chain_id: ChainId,
1050        owner: Option<AccountOwner>,
1051    ) -> Result<()> {
1052        let mut owner_arg = vec!["--owner".to_string()];
1053        if let Some(owner) = owner {
1054            owner_arg.push(owner.to_string());
1055        };
1056        self.command()
1057            .await?
1058            .arg("set-preferred-owner")
1059            .args(["--chain-id", &chain_id.to_string()])
1060            .args(owner_arg)
1061            .spawn_and_wait_for_stdout()
1062            .await?;
1063        Ok(())
1064    }
1065
1066    pub async fn build_application(
1067        &self,
1068        path: &Path,
1069        name: &str,
1070        is_workspace: bool,
1071    ) -> Result<(PathBuf, PathBuf)> {
1072        Command::new("cargo")
1073            .current_dir(self.path_provider.path())
1074            .arg("build")
1075            .arg("--release")
1076            .args(["--target", "wasm32-unknown-unknown"])
1077            .arg("--manifest-path")
1078            .arg(path.join("Cargo.toml"))
1079            .spawn_and_wait_for_stdout()
1080            .await?;
1081
1082        let release_dir = match is_workspace {
1083            true => path.join("../target/wasm32-unknown-unknown/release"),
1084            false => path.join("target/wasm32-unknown-unknown/release"),
1085        };
1086
1087        let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1088        let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1089
1090        let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1091        let service_size = fs_err::tokio::metadata(&service).await?.len();
1092        info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1093
1094        Ok((contract, service))
1095    }
1096}
1097
1098impl Drop for ClientWrapper {
1099    fn drop(&mut self) {
1100        use std::process::Command as SyncCommand;
1101
1102        if self.on_drop != OnClientDrop::CloseChains {
1103            return;
1104        }
1105
1106        let Ok(binary_path) = self.binary_path.lock() else {
1107            error!("Failed to close chains because a thread panicked with a lock to `binary_path`");
1108            return;
1109        };
1110
1111        let Some(binary_path) = binary_path.as_ref() else {
1112            warn!(
1113                "Assuming no chains need to be closed, because the command binary was never \
1114                resolved and therefore presumably never called"
1115            );
1116            return;
1117        };
1118
1119        let working_directory = self.path_provider.path();
1120        let mut wallet_show_command = SyncCommand::new(binary_path);
1121
1122        for argument in self.command_arguments() {
1123            wallet_show_command.arg(&*argument);
1124        }
1125
1126        let Ok(wallet_show_output) = wallet_show_command
1127            .current_dir(working_directory)
1128            .args(["wallet", "show", "--short", "--owned"])
1129            .output()
1130        else {
1131            warn!("Failed to execute `wallet show --short` to list chains to close");
1132            return;
1133        };
1134
1135        if !wallet_show_output.status.success() {
1136            warn!("Failed to list chains in the wallet to close them");
1137            return;
1138        }
1139
1140        let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1141            warn!(
1142                "Failed to close chains because `linera wallet show --short` \
1143                returned a non-UTF-8 output"
1144            );
1145            return;
1146        };
1147
1148        let chain_ids = chain_list_string
1149            .split('\n')
1150            .map(|line| line.trim())
1151            .filter(|line| !line.is_empty());
1152
1153        for chain_id in chain_ids {
1154            let mut close_chain_command = SyncCommand::new(binary_path);
1155
1156            for argument in self.command_arguments() {
1157                close_chain_command.arg(&*argument);
1158            }
1159
1160            close_chain_command.current_dir(working_directory);
1161
1162            match close_chain_command.args(["close-chain", chain_id]).status() {
1163                Ok(status) if status.success() => (),
1164                Ok(failure) => warn!("Failed to close chain {chain_id}: {failure}"),
1165                Err(error) => warn!("Failed to close chain {chain_id}: {error}"),
1166            }
1167        }
1168    }
1169}
1170
1171#[cfg(with_testing)]
1172impl ClientWrapper {
1173    pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1174        self.build_application(Self::example_path(name)?.as_path(), name, true)
1175            .await
1176    }
1177
1178    pub fn example_path(name: &str) -> Result<PathBuf> {
1179        Ok(env::current_dir()?.join("../examples/").join(name))
1180    }
1181}
1182
1183fn truncate_query_output(input: &str) -> String {
1184    let max_len = 1000;
1185    if input.len() < max_len {
1186        input.to_string()
1187    } else {
1188        format!("{} ...", input.get(..max_len).unwrap())
1189    }
1190}
1191
1192fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1193    let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1194    let max_len = 200;
1195    if query.len() < max_len {
1196        query
1197    } else {
1198        format!("{} ...", query.get(..max_len).unwrap())
1199    }
1200}
1201
1202/// A running node service.
1203pub struct NodeService {
1204    port: u16,
1205    child: Child,
1206}
1207
1208impl NodeService {
1209    fn new(port: u16, child: Child) -> Self {
1210        Self { port, child }
1211    }
1212
1213    pub async fn terminate(mut self) -> Result<()> {
1214        self.child.kill().await.context("terminating node service")
1215    }
1216
1217    pub fn port(&self) -> u16 {
1218        self.port
1219    }
1220
1221    pub fn ensure_is_running(&mut self) -> Result<()> {
1222        self.child.ensure_is_running()
1223    }
1224
1225    pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1226        let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1227        let mut data = self.query_node(query).await?;
1228        Ok(serde_json::from_value(data["processInbox"].take())?)
1229    }
1230
1231    pub async fn balance(&self, account: &Account) -> Result<Amount> {
1232        let chain = account.chain_id;
1233        let owner = account.owner;
1234        if matches!(owner, AccountOwner::CHAIN) {
1235            let query = format!(
1236                "query {{ chain(chainId:\"{chain}\") {{
1237                    executionState {{ system {{ balance }} }}
1238                }} }}"
1239            );
1240            let response = self.query_node(query).await?;
1241            let balance = &response["chain"]["executionState"]["system"]["balance"]
1242                .as_str()
1243                .unwrap();
1244            return Ok(Amount::from_str(balance)?);
1245        }
1246        let query = format!(
1247            "query {{ chain(chainId:\"{chain}\") {{
1248                executionState {{ system {{ balances {{
1249                    entry(key:\"{owner}\") {{ value }}
1250                }} }} }}
1251            }} }}"
1252        );
1253        let response = self.query_node(query).await?;
1254        let balances = &response["chain"]["executionState"]["system"]["balances"];
1255        let balance = balances["entry"]["value"].as_str();
1256        match balance {
1257            None => Ok(Amount::ZERO),
1258            Some(amount) => Ok(Amount::from_str(amount)?),
1259        }
1260    }
1261
1262    pub async fn make_application<A: ContractAbi>(
1263        &self,
1264        chain_id: &ChainId,
1265        application_id: &ApplicationId<A>,
1266    ) -> Result<ApplicationWrapper<A>> {
1267        let application_id = application_id.forget_abi().to_string();
1268        let link = format!(
1269            "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1270            self.port
1271        );
1272        Ok(ApplicationWrapper::from(link))
1273    }
1274
1275    pub async fn publish_data_blob(
1276        &self,
1277        chain_id: &ChainId,
1278        bytes: Vec<u8>,
1279    ) -> Result<CryptoHash> {
1280        let query = format!(
1281            "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1282            chain_id.to_value(),
1283            bytes.to_value(),
1284        );
1285        let data = self.query_node(query).await?;
1286        serde_json::from_value(data["publishDataBlob"].clone())
1287            .context("missing publishDataBlob field in response")
1288    }
1289
1290    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1291        &self,
1292        chain_id: &ChainId,
1293        contract: PathBuf,
1294        service: PathBuf,
1295        vm_runtime: VmRuntime,
1296    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1297        let contract_code = Bytecode::load_from_file(&contract).await?;
1298        let service_code = Bytecode::load_from_file(&service).await?;
1299        let query = format!(
1300            "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1301            chain_id.to_value(),
1302            contract_code.to_value(),
1303            service_code.to_value(),
1304            vm_runtime.to_value(),
1305        );
1306        let data = self.query_node(query).await?;
1307        let module_str = data["publishModule"]
1308            .as_str()
1309            .context("module ID not found")?;
1310        let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1311        Ok(module_id.with_abi())
1312    }
1313
1314    pub async fn query_committees(&self, chain_id: &ChainId) -> Result<BTreeMap<Epoch, Committee>> {
1315        let query = format!(
1316            "query {{ chain(chainId:\"{chain_id}\") {{
1317                executionState {{ system {{ committees }} }}
1318            }} }}"
1319        );
1320        let mut response = self.query_node(query).await?;
1321        let committees = response["chain"]["executionState"]["system"]["committees"].take();
1322        Ok(serde_json::from_value(committees)?)
1323    }
1324
1325    pub async fn events_from_index(
1326        &self,
1327        chain_id: &ChainId,
1328        stream_id: &StreamId,
1329        start_index: u32,
1330    ) -> Result<Vec<IndexAndEvent>> {
1331        let query = format!(
1332            "query {{
1333               eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1334               {{ index event }}
1335             }}",
1336            stream_id.to_value()
1337        );
1338        let mut response = self.query_node(query).await?;
1339        let response = response["eventsFromIndex"].take();
1340        Ok(serde_json::from_value(response)?)
1341    }
1342
1343    pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1344        let n_try = 5;
1345        let query = query.as_ref();
1346        for i in 0..n_try {
1347            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1348            let url = format!("http://localhost:{}/", self.port);
1349            let client = reqwest_client();
1350            let result = client
1351                .post(url)
1352                .json(&json!({ "query": query }))
1353                .send()
1354                .await;
1355            if matches!(result, Err(ref error) if error.is_timeout()) {
1356                warn!(
1357                    "Timeout when sending query {} to the node service",
1358                    truncate_query_output(query)
1359                );
1360                continue;
1361            }
1362            let response = result.with_context(|| {
1363                format!(
1364                    "query_node: failed to post query={}",
1365                    truncate_query_output(query)
1366                )
1367            })?;
1368            anyhow::ensure!(
1369                response.status().is_success(),
1370                "Query \"{}\" failed: {}",
1371                truncate_query_output(query),
1372                response
1373                    .text()
1374                    .await
1375                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1376            );
1377            let value: Value = response.json().await.context("invalid JSON")?;
1378            if let Some(errors) = value.get("errors") {
1379                warn!(
1380                    "Query \"{}\" failed: {}",
1381                    truncate_query_output(query),
1382                    errors
1383                );
1384            } else {
1385                return Ok(value["data"].clone());
1386            }
1387        }
1388        bail!(
1389            "Query \"{}\" failed after {} retries.",
1390            truncate_query_output(query),
1391            n_try
1392        );
1393    }
1394
1395    pub async fn create_application<
1396        Abi: ContractAbi,
1397        Parameters: Serialize,
1398        InstantiationArgument: Serialize,
1399    >(
1400        &self,
1401        chain_id: &ChainId,
1402        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1403        parameters: &Parameters,
1404        argument: &InstantiationArgument,
1405        required_application_ids: &[ApplicationId],
1406    ) -> Result<ApplicationId<Abi>> {
1407        let module_id = module_id.forget_abi();
1408        let json_required_applications_ids = required_application_ids
1409            .iter()
1410            .map(ApplicationId::to_string)
1411            .collect::<Vec<_>>()
1412            .to_value();
1413        // Convert to `serde_json::Value` then `async_graphql::Value` via the trait `InputType`.
1414        let new_parameters = serde_json::to_value(parameters)
1415            .context("could not create parameters JSON")?
1416            .to_value();
1417        let new_argument = serde_json::to_value(argument)
1418            .context("could not create argument JSON")?
1419            .to_value();
1420        let query = format!(
1421            "mutation {{ createApplication(\
1422                 chainId: \"{chain_id}\",
1423                 moduleId: \"{module_id}\", \
1424                 parameters: {new_parameters}, \
1425                 instantiationArgument: {new_argument}, \
1426                 requiredApplicationIds: {json_required_applications_ids}) \
1427             }}"
1428        );
1429        let data = self.query_node(query).await?;
1430        let app_id_str = data["createApplication"]
1431            .as_str()
1432            .context("missing createApplication string in response")?
1433            .trim();
1434        Ok(app_id_str
1435            .parse::<ApplicationId>()
1436            .context("invalid application ID")?
1437            .with_abi())
1438    }
1439
1440    /// Obtains the hash of the `chain`'s tip block, as known by this node service.
1441    pub async fn chain_tip_hash(&self, chain: ChainId) -> Result<Option<CryptoHash>> {
1442        let query = format!(r#"query {{ block(chainId: "{chain}") {{ hash }} }}"#);
1443
1444        let mut response = self.query_node(&query).await?;
1445
1446        match mem::take(&mut response["block"]["hash"]) {
1447            Value::Null => Ok(None),
1448            Value::String(hash) => Ok(Some(
1449                hash.parse()
1450                    .context("Received an invalid hash {hash:?} for chain tip")?,
1451            )),
1452            invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
1453        }
1454    }
1455
1456    /// Subscribes to the node service and returns a stream of notifications about a chain.
1457    pub async fn notifications(
1458        &self,
1459        chain_id: ChainId,
1460    ) -> Result<impl Stream<Item = Result<Notification>>> {
1461        let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
1462        let url = format!("ws://localhost:{}/ws", self.port);
1463        let mut request = url.into_client_request()?;
1464        request.headers_mut().insert(
1465            "Sec-WebSocket-Protocol",
1466            HeaderValue::from_str("graphql-transport-ws")?,
1467        );
1468        let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1469        let init_json = json!({
1470          "type": "connection_init",
1471          "payload": {}
1472        });
1473        websocket.send(init_json.to_string().into()).await?;
1474        let text = websocket
1475            .next()
1476            .await
1477            .context("Failed to establish connection")??
1478            .into_text()?;
1479        ensure!(
1480            text == "{\"type\":\"connection_ack\"}",
1481            "Unexpected response: {text}"
1482        );
1483        let query_json = json!({
1484          "id": "1",
1485          "type": "start",
1486          "payload": {
1487            "query": query,
1488            "variables": {},
1489            "operationName": null
1490          }
1491        });
1492        websocket.send(query_json.to_string().into()).await?;
1493        Ok(websocket
1494            .map_err(anyhow::Error::from)
1495            .and_then(|message| async {
1496                let text = message.into_text()?;
1497                let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1498                if let Some(errors) = value["payload"].get("errors") {
1499                    bail!("Notification subscription failed: {errors:?}");
1500                }
1501                serde_json::from_value(value["payload"]["data"]["notifications"].clone())
1502                    .context("Failed to deserialize notification")
1503            }))
1504    }
1505}
1506
1507/// A running faucet service.
1508pub struct FaucetService {
1509    port: u16,
1510    child: Child,
1511}
1512
1513impl FaucetService {
1514    fn new(port: u16, child: Child) -> Self {
1515        Self { port, child }
1516    }
1517
1518    pub async fn terminate(mut self) -> Result<()> {
1519        self.child
1520            .kill()
1521            .await
1522            .context("terminating faucet service")
1523    }
1524
1525    pub fn ensure_is_running(&mut self) -> Result<()> {
1526        self.child.ensure_is_running()
1527    }
1528
1529    pub fn instance(&self) -> Faucet {
1530        Faucet::new(format!("http://localhost:{}/", self.port))
1531    }
1532}
1533
1534/// A running `Application` to be queried in GraphQL.
1535pub struct ApplicationWrapper<A> {
1536    uri: String,
1537    _phantom: PhantomData<A>,
1538}
1539
1540impl<A> ApplicationWrapper<A> {
1541    pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
1542        let query = query.as_ref();
1543        let value = self.run_json_query(json!({ "query": query })).await?;
1544        Ok(value["data"].clone())
1545    }
1546
1547    pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
1548        const MAX_RETRIES: usize = 5;
1549
1550        for i in 0.. {
1551            let client = reqwest_client();
1552            let result = client.post(&self.uri).json(&query).send().await;
1553            let response = match result {
1554                Ok(response) => response,
1555                Err(error) if i < MAX_RETRIES => {
1556                    warn!(
1557                        "Failed to post query \"{}\": {error}; retrying",
1558                        truncate_query_output_serialize(&query),
1559                    );
1560                    continue;
1561                }
1562                Err(error) => {
1563                    let query = truncate_query_output_serialize(&query);
1564                    return Err(error)
1565                        .with_context(|| format!("run_json_query: failed to post query={query}"));
1566                }
1567            };
1568            anyhow::ensure!(
1569                response.status().is_success(),
1570                "Query \"{}\" failed: {}",
1571                truncate_query_output_serialize(&query),
1572                response
1573                    .text()
1574                    .await
1575                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1576            );
1577            let value: Value = response.json().await.context("invalid JSON")?;
1578            if let Some(errors) = value.get("errors") {
1579                bail!(
1580                    "Query \"{}\" failed: {}",
1581                    truncate_query_output_serialize(&query),
1582                    errors
1583                );
1584            }
1585            return Ok(value);
1586        }
1587        unreachable!()
1588    }
1589
1590    pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
1591        let query = query.as_ref();
1592        self.run_graphql_query(&format!("query {{ {query} }}"))
1593            .await
1594    }
1595
1596    pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
1597        let query = query.as_ref().trim();
1598        let name = query
1599            .split_once(|ch: char| !ch.is_alphanumeric())
1600            .map_or(query, |(name, _)| name);
1601        let data = self.query(query).await?;
1602        serde_json::from_value(data[name].clone())
1603            .with_context(|| format!("{name} field missing in response"))
1604    }
1605
1606    pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
1607        let mutation = mutation.as_ref();
1608        self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
1609            .await
1610    }
1611}
1612
1613impl<A> From<String> for ApplicationWrapper<A> {
1614    fn from(uri: String) -> ApplicationWrapper<A> {
1615        ApplicationWrapper {
1616            uri,
1617            _phantom: PhantomData,
1618        }
1619    }
1620}