linera_service/cli_wrappers/
wallet.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    borrow::Cow,
6    collections::BTreeMap,
7    env,
8    marker::PhantomData,
9    mem,
10    path::{Path, PathBuf},
11    process::Stdio,
12    str::FromStr,
13    sync,
14    time::Duration,
15};
16
17use anyhow::{bail, ensure, Context, Result};
18use async_graphql::InputType;
19use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
20use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
21use heck::ToKebabCase;
22use linera_base::{
23    abi::ContractAbi,
24    command::{resolve_binary, CommandExt},
25    crypto::{CryptoHash, InMemorySigner},
26    data_types::{Amount, Bytecode, Epoch},
27    identifiers::{
28        Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
29    },
30    vm::VmRuntime,
31};
32use linera_client::{client_options::ResourceControlPolicyConfig, wallet::Wallet};
33use linera_core::worker::Notification;
34use linera_execution::committee::Committee;
35use linera_faucet_client::Faucet;
36use serde::{de::DeserializeOwned, ser::Serialize};
37use serde_command_opts::to_args;
38use serde_json::{json, Value};
39use tempfile::TempDir;
40use tokio::{
41    io::{AsyncBufReadExt, BufReader},
42    process::{Child, Command},
43    sync::oneshot,
44    task::JoinHandle,
45};
46use tracing::{error, info, warn};
47
48use crate::{
49    cli::command::BenchmarkCommand,
50    cli_wrappers::{
51        local_net::{PathProvider, ProcessInbox},
52        Network,
53    },
54    util::{self, ChildExt},
55};
56
57/// The name of the environment variable that allows specifying additional arguments to be passed
58/// to the node-service command of the client.
59const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
60
61fn reqwest_client() -> reqwest::Client {
62    reqwest::ClientBuilder::new()
63        .timeout(Duration::from_secs(30))
64        .build()
65        .unwrap()
66}
67
68/// Wrapper to run a Linera client command.
69pub struct ClientWrapper {
70    binary_path: sync::Mutex<Option<PathBuf>>,
71    testing_prng_seed: Option<u64>,
72    storage: String,
73    wallet: String,
74    keystore: String,
75    max_pending_message_bundles: usize,
76    network: Network,
77    pub path_provider: PathProvider,
78    on_drop: OnClientDrop,
79    extra_args: Vec<String>,
80}
81
82/// Action to perform when the [`ClientWrapper`] is dropped.
83#[derive(Clone, Copy, Debug, Eq, PartialEq)]
84pub enum OnClientDrop {
85    /// Close all the chains on the wallet.
86    CloseChains,
87    /// Do not close any chains, leaving them active.
88    LeakChains,
89}
90
91impl ClientWrapper {
92    pub fn new(
93        path_provider: PathProvider,
94        network: Network,
95        testing_prng_seed: Option<u64>,
96        id: usize,
97        on_drop: OnClientDrop,
98    ) -> Self {
99        Self::new_with_extra_args(
100            path_provider,
101            network,
102            testing_prng_seed,
103            id,
104            on_drop,
105            vec!["--wait-for-outgoing-messages".to_string()],
106        )
107    }
108
109    pub fn new_with_extra_args(
110        path_provider: PathProvider,
111        network: Network,
112        testing_prng_seed: Option<u64>,
113        id: usize,
114        on_drop: OnClientDrop,
115        extra_args: Vec<String>,
116    ) -> Self {
117        let storage = format!(
118            "rocksdb:{}/client_{}.db",
119            path_provider.path().display(),
120            id
121        );
122        let wallet = format!("wallet_{}.json", id);
123        let keystore = format!("keystore_{}.json", id);
124        Self {
125            binary_path: sync::Mutex::new(None),
126            testing_prng_seed,
127            storage,
128            wallet,
129            keystore,
130            max_pending_message_bundles: 10_000,
131            network,
132            path_provider,
133            on_drop,
134            extra_args,
135        }
136    }
137
138    /// Runs `linera project new`.
139    pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
140        let tmp = TempDir::new()?;
141        let mut command = self.command().await?;
142        command
143            .current_dir(tmp.path())
144            .arg("project")
145            .arg("new")
146            .arg(project_name)
147            .arg("--linera-root")
148            .arg(linera_root)
149            .spawn_and_wait_for_stdout()
150            .await?;
151        Ok(tmp)
152    }
153
154    /// Runs `linera project publish`.
155    pub async fn project_publish<T: Serialize>(
156        &self,
157        path: PathBuf,
158        required_application_ids: Vec<String>,
159        publisher: impl Into<Option<ChainId>>,
160        argument: &T,
161    ) -> Result<String> {
162        let json_parameters = serde_json::to_string(&())?;
163        let json_argument = serde_json::to_string(argument)?;
164        let mut command = self.command().await?;
165        command
166            .arg("project")
167            .arg("publish-and-create")
168            .arg(path)
169            .args(publisher.into().iter().map(ChainId::to_string))
170            .args(["--json-parameters", &json_parameters])
171            .args(["--json-argument", &json_argument]);
172        if !required_application_ids.is_empty() {
173            command.arg("--required-application-ids");
174            command.args(required_application_ids);
175        }
176        let stdout = command.spawn_and_wait_for_stdout().await?;
177        Ok(stdout.trim().to_string())
178    }
179
180    /// Runs `linera project test`.
181    pub async fn project_test(&self, path: &Path) -> Result<()> {
182        self.command()
183            .await
184            .context("failed to create project test command")?
185            .current_dir(path)
186            .arg("project")
187            .arg("test")
188            .spawn_and_wait_for_stdout()
189            .await?;
190        Ok(())
191    }
192
193    async fn command_with_envs_and_arguments(
194        &self,
195        envs: &[(&str, &str)],
196        arguments: impl IntoIterator<Item = Cow<'_, str>>,
197    ) -> Result<Command> {
198        let mut command = self.command_binary().await?;
199        command.current_dir(self.path_provider.path());
200        for (key, value) in envs {
201            command.env(key, value);
202        }
203        for argument in arguments {
204            command.arg(&*argument);
205        }
206        Ok(command)
207    }
208
209    async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
210        self.command_with_envs_and_arguments(envs, self.command_arguments())
211            .await
212    }
213
214    async fn command_with_arguments(
215        &self,
216        arguments: impl IntoIterator<Item = Cow<'_, str>>,
217    ) -> Result<Command> {
218        self.command_with_envs_and_arguments(
219            &[(
220                "RUST_LOG",
221                &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
222            )],
223            arguments,
224        )
225        .await
226    }
227
228    async fn command(&self) -> Result<Command> {
229        self.command_with_envs(&[(
230            "RUST_LOG",
231            &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
232        )])
233        .await
234    }
235
236    fn required_command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
237        [
238            "--wallet".into(),
239            self.wallet.as_str().into(),
240            "--keystore".into(),
241            self.keystore.as_str().into(),
242            "--storage".into(),
243            self.storage.as_str().into(),
244            "--send-timeout-ms".into(),
245            "500000".into(),
246            "--recv-timeout-ms".into(),
247            "500000".into(),
248        ]
249        .into_iter()
250        .chain(self.extra_args.iter().map(|s| s.as_str().into()))
251    }
252
253    /// Returns an iterator over the arguments that should be added to all command invocations.
254    fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
255        self.required_command_arguments().chain([
256            "--max-pending-message-bundles".into(),
257            self.max_pending_message_bundles.to_string().into(),
258        ])
259    }
260
261    /// Returns the [`Command`] instance configured to run the appropriate binary.
262    ///
263    /// The path is resolved once and cached inside `self` for subsequent usages.
264    async fn command_binary(&self) -> Result<Command> {
265        match self.command_with_cached_binary_path() {
266            Some(command) => Ok(command),
267            None => {
268                let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
269                let command = Command::new(&resolved_path);
270
271                self.set_cached_binary_path(resolved_path);
272
273                Ok(command)
274            }
275        }
276    }
277
278    /// Returns a [`Command`] instance configured with the cached `binary_path`, if available.
279    fn command_with_cached_binary_path(&self) -> Option<Command> {
280        let binary_path = self.binary_path.lock().unwrap();
281
282        binary_path.as_ref().map(Command::new)
283    }
284
285    /// Sets the cached `binary_path` with the `new_binary_path`.
286    ///
287    /// # Panics
288    ///
289    /// If the cache is already set to a different value. In theory the two threads calling
290    /// `command_binary` can race and resolve the binary path twice, but they should always be the
291    /// same path.
292    fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
293        let mut binary_path = self.binary_path.lock().unwrap();
294
295        if binary_path.is_none() {
296            *binary_path = Some(new_binary_path);
297        } else {
298            assert_eq!(*binary_path, Some(new_binary_path));
299        }
300    }
301
302    /// Runs `linera create-genesis-config`.
303    pub async fn create_genesis_config(
304        &self,
305        num_other_initial_chains: u32,
306        initial_funding: Amount,
307        policy_config: ResourceControlPolicyConfig,
308        http_allow_list: Option<Vec<String>>,
309    ) -> Result<()> {
310        let mut command = self.command().await?;
311        command
312            .args([
313                "create-genesis-config",
314                &num_other_initial_chains.to_string(),
315            ])
316            .args(["--initial-funding", &initial_funding.to_string()])
317            .args(["--committee", "committee.json"])
318            .args(["--genesis", "genesis.json"])
319            .args([
320                "--policy-config",
321                &policy_config.to_string().to_kebab_case(),
322            ]);
323        if let Some(allow_list) = http_allow_list {
324            command
325                .arg("--http-request-allow-list")
326                .arg(allow_list.join(","));
327        }
328        if let Some(seed) = self.testing_prng_seed {
329            command.arg("--testing-prng-seed").arg(seed.to_string());
330        }
331        command.spawn_and_wait_for_stdout().await?;
332        Ok(())
333    }
334
335    /// Runs `linera wallet init`. The genesis config is read from `genesis.json`, or from the
336    /// faucet if provided.
337    pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
338        let mut command = self.command().await?;
339        command.args(["wallet", "init"]);
340        match faucet {
341            None => command.args(["--genesis", "genesis.json"]),
342            Some(faucet) => command.args(["--faucet", faucet.url()]),
343        };
344        if let Some(seed) = self.testing_prng_seed {
345            command.arg("--testing-prng-seed").arg(seed.to_string());
346        }
347        command.spawn_and_wait_for_stdout().await?;
348        Ok(())
349    }
350
351    /// Runs `linera wallet request-chain`.
352    pub async fn request_chain(
353        &self,
354        faucet: &Faucet,
355        set_default: bool,
356    ) -> Result<(ChainId, AccountOwner)> {
357        let mut command = self.command().await?;
358        command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
359        if set_default {
360            command.arg("--set-default");
361        }
362        let stdout = command.spawn_and_wait_for_stdout().await?;
363        let mut lines = stdout.split_whitespace();
364        let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
365        let owner = lines.next().context("missing chain owner")?.parse()?;
366        Ok((chain_id, owner))
367    }
368
369    /// Runs `linera wallet publish-and-create`.
370    #[expect(clippy::too_many_arguments)]
371    pub async fn publish_and_create<
372        A: ContractAbi,
373        Parameters: Serialize,
374        InstantiationArgument: Serialize,
375    >(
376        &self,
377        contract: PathBuf,
378        service: PathBuf,
379        vm_runtime: VmRuntime,
380        parameters: &Parameters,
381        argument: &InstantiationArgument,
382        required_application_ids: &[ApplicationId],
383        publisher: impl Into<Option<ChainId>>,
384    ) -> Result<ApplicationId<A>> {
385        let json_parameters = serde_json::to_string(parameters)?;
386        let json_argument = serde_json::to_string(argument)?;
387        let mut command = self.command().await?;
388        let vm_runtime = format!("{}", vm_runtime);
389        command
390            .arg("publish-and-create")
391            .args([contract, service])
392            .args(["--vm-runtime", &vm_runtime.to_lowercase()])
393            .args(publisher.into().iter().map(ChainId::to_string))
394            .args(["--json-parameters", &json_parameters])
395            .args(["--json-argument", &json_argument]);
396        if !required_application_ids.is_empty() {
397            command.arg("--required-application-ids");
398            command.args(
399                required_application_ids
400                    .iter()
401                    .map(ApplicationId::to_string),
402            );
403        }
404        let stdout = command.spawn_and_wait_for_stdout().await?;
405        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
406    }
407
408    /// Runs `linera publish-module`.
409    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
410        &self,
411        contract: PathBuf,
412        service: PathBuf,
413        vm_runtime: VmRuntime,
414        publisher: impl Into<Option<ChainId>>,
415    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
416        let stdout = self
417            .command()
418            .await?
419            .arg("publish-module")
420            .args([contract, service])
421            .args(["--vm-runtime", &format!("{}", vm_runtime).to_lowercase()])
422            .args(publisher.into().iter().map(ChainId::to_string))
423            .spawn_and_wait_for_stdout()
424            .await?;
425        let module_id: ModuleId = stdout.trim().parse()?;
426        Ok(module_id.with_abi())
427    }
428
429    /// Runs `linera create-application`.
430    pub async fn create_application<
431        Abi: ContractAbi,
432        Parameters: Serialize,
433        InstantiationArgument: Serialize,
434    >(
435        &self,
436        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
437        parameters: &Parameters,
438        argument: &InstantiationArgument,
439        required_application_ids: &[ApplicationId],
440        creator: impl Into<Option<ChainId>>,
441    ) -> Result<ApplicationId<Abi>> {
442        let json_parameters = serde_json::to_string(parameters)?;
443        let json_argument = serde_json::to_string(argument)?;
444        let mut command = self.command().await?;
445        command
446            .arg("create-application")
447            .arg(module_id.forget_abi().to_string())
448            .args(["--json-parameters", &json_parameters])
449            .args(["--json-argument", &json_argument])
450            .args(creator.into().iter().map(ChainId::to_string));
451        if !required_application_ids.is_empty() {
452            command.arg("--required-application-ids");
453            command.args(
454                required_application_ids
455                    .iter()
456                    .map(ApplicationId::to_string),
457            );
458        }
459        let stdout = command.spawn_and_wait_for_stdout().await?;
460        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
461    }
462
463    /// Runs `linera service`.
464    pub async fn run_node_service(
465        &self,
466        port: impl Into<Option<u16>>,
467        process_inbox: ProcessInbox,
468    ) -> Result<NodeService> {
469        let port = port.into().unwrap_or(8080);
470        let mut command = self.command().await?;
471        command.arg("service");
472        if let ProcessInbox::Skip = process_inbox {
473            command.arg("--listener-skip-process-inbox");
474        }
475        if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
476            command.args(var.split_whitespace());
477        }
478        let child = command
479            .args(["--port".to_string(), port.to_string()])
480            .spawn_into()?;
481        let client = reqwest_client();
482        for i in 0..10 {
483            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
484            let request = client
485                .get(format!("http://localhost:{}/", port))
486                .send()
487                .await;
488            if request.is_ok() {
489                info!("Node service has started");
490                return Ok(NodeService::new(port, child));
491            } else {
492                warn!("Waiting for node service to start");
493            }
494        }
495        bail!("Failed to start node service");
496    }
497
498    /// Runs `linera query-validator`
499    pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
500        let mut command = self.command().await?;
501        command.arg("query-validator").arg(address);
502        let stdout = command.spawn_and_wait_for_stdout().await?;
503        let hash = stdout
504            .trim()
505            .parse()
506            .context("error while parsing the result of `linera query-validator`")?;
507        Ok(hash)
508    }
509
510    /// Runs `linera query-validators`.
511    pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
512        let mut command = self.command().await?;
513        command.arg("query-validators");
514        if let Some(chain_id) = chain_id {
515            command.arg(chain_id.to_string());
516        }
517        command.spawn_and_wait_for_stdout().await?;
518        Ok(())
519    }
520
521    /// Runs `linera sync-validator`.
522    pub async fn sync_validator(
523        &self,
524        chain_ids: impl IntoIterator<Item = &ChainId>,
525        validator_address: impl Into<String>,
526    ) -> Result<()> {
527        let mut command = self.command().await?;
528        command.arg("sync-validator").arg(validator_address.into());
529        let mut chain_ids = chain_ids.into_iter().peekable();
530        if chain_ids.peek().is_some() {
531            command
532                .arg("--chains")
533                .args(chain_ids.map(ChainId::to_string));
534        }
535        command.spawn_and_wait_for_stdout().await?;
536        Ok(())
537    }
538
539    /// Runs `linera faucet`.
540    pub async fn run_faucet(
541        &self,
542        port: impl Into<Option<u16>>,
543        chain_id: ChainId,
544        amount: Amount,
545    ) -> Result<FaucetService> {
546        let port = port.into().unwrap_or(8080);
547        let mut command = self.command().await?;
548        let child = command
549            .arg("faucet")
550            .arg(chain_id.to_string())
551            .args(["--port".to_string(), port.to_string()])
552            .args(["--amount".to_string(), amount.to_string()])
553            .spawn_into()?;
554        let client = reqwest_client();
555        for i in 0..10 {
556            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
557            let request = client
558                .get(format!("http://localhost:{}/", port))
559                .send()
560                .await;
561            if request.is_ok() {
562                info!("Faucet has started");
563                return Ok(FaucetService::new(port, child));
564            } else {
565                warn!("Waiting for faucet to start");
566            }
567        }
568        bail!("Failed to start faucet");
569    }
570
571    /// Runs `linera local-balance`.
572    pub async fn local_balance(&self, account: Account) -> Result<Amount> {
573        let stdout = self
574            .command()
575            .await?
576            .arg("local-balance")
577            .arg(account.to_string())
578            .spawn_and_wait_for_stdout()
579            .await?;
580        let amount = stdout
581            .trim()
582            .parse()
583            .context("error while parsing the result of `linera local-balance`")?;
584        Ok(amount)
585    }
586
587    /// Runs `linera query-balance`.
588    pub async fn query_balance(&self, account: Account) -> Result<Amount> {
589        let stdout = self
590            .command()
591            .await?
592            .arg("query-balance")
593            .arg(account.to_string())
594            .spawn_and_wait_for_stdout()
595            .await?;
596        let amount = stdout
597            .trim()
598            .parse()
599            .context("error while parsing the result of `linera query-balance`")?;
600        Ok(amount)
601    }
602
603    /// Runs `linera sync`.
604    pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
605        self.command()
606            .await?
607            .arg("sync")
608            .arg(chain_id.to_string())
609            .spawn_and_wait_for_stdout()
610            .await?;
611        Ok(())
612    }
613
614    /// Runs `linera process-inbox`.
615    pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
616        self.command()
617            .await?
618            .arg("process-inbox")
619            .arg(chain_id.to_string())
620            .spawn_and_wait_for_stdout()
621            .await?;
622        Ok(())
623    }
624
625    /// Runs `linera transfer`.
626    pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
627        self.command()
628            .await?
629            .arg("transfer")
630            .arg(amount.to_string())
631            .args(["--from", &from.to_string()])
632            .args(["--to", &to.to_string()])
633            .spawn_and_wait_for_stdout()
634            .await?;
635        Ok(())
636    }
637
638    /// Runs `linera transfer` with no logging.
639    pub async fn transfer_with_silent_logs(
640        &self,
641        amount: Amount,
642        from: ChainId,
643        to: ChainId,
644    ) -> Result<()> {
645        self.command()
646            .await?
647            .env("RUST_LOG", "off")
648            .arg("transfer")
649            .arg(amount.to_string())
650            .args(["--from", &from.to_string()])
651            .args(["--to", &to.to_string()])
652            .spawn_and_wait_for_stdout()
653            .await?;
654        Ok(())
655    }
656
657    /// Runs `linera transfer` with owner accounts.
658    pub async fn transfer_with_accounts(
659        &self,
660        amount: Amount,
661        from: Account,
662        to: Account,
663    ) -> Result<()> {
664        self.command()
665            .await?
666            .arg("transfer")
667            .arg(amount.to_string())
668            .args(["--from", &from.to_string()])
669            .args(["--to", &to.to_string()])
670            .spawn_and_wait_for_stdout()
671            .await?;
672        Ok(())
673    }
674
675    fn benchmark_command_internal(command: &mut Command, args: BenchmarkCommand) -> Result<()> {
676        let mut formatted_args = to_args(&args)?;
677        let subcommand = formatted_args.remove(0);
678        // The subcommand is followed by the flattened options, which are preceded by "options".
679        // So remove that as well.
680        formatted_args.remove(0);
681        let options = formatted_args
682            .chunks_exact(2)
683            .flat_map(|pair| {
684                let option = format!("--{}", pair[0]);
685                match pair[1].as_str() {
686                    "true" => vec![option],
687                    "false" => vec![],
688                    _ => vec![option, pair[1].clone()],
689                }
690            })
691            .collect::<Vec<_>>();
692        command
693            .args([
694                "--max-pending-message-bundles",
695                &args.transactions_per_block().to_string(),
696            ])
697            .arg("benchmark")
698            .arg(subcommand)
699            .args(options);
700        Ok(())
701    }
702
703    async fn benchmark_command_with_envs(
704        &self,
705        args: BenchmarkCommand,
706        envs: &[(&str, &str)],
707    ) -> Result<Command> {
708        let mut command = self
709            .command_with_envs_and_arguments(envs, self.required_command_arguments())
710            .await?;
711        Self::benchmark_command_internal(&mut command, args)?;
712        Ok(command)
713    }
714
715    async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
716        let mut command = self
717            .command_with_arguments(self.required_command_arguments())
718            .await?;
719        Self::benchmark_command_internal(&mut command, args)?;
720        Ok(command)
721    }
722
723    /// Runs `linera benchmark`.
724    pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
725        let mut command = self.benchmark_command(args).await?;
726        command.spawn_and_wait_for_stdout().await?;
727        Ok(())
728    }
729
730    /// Runs `linera benchmark`, but detached: don't wait for the command to finish, just spawn it
731    /// and return the child process, and the handles to the stdout and stderr.
732    pub async fn benchmark_detached(
733        &self,
734        args: BenchmarkCommand,
735        tx: oneshot::Sender<()>,
736    ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
737        let mut child = self
738            .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
739            .await?
740            .kill_on_drop(true)
741            .stdin(Stdio::piped())
742            .stdout(Stdio::piped())
743            .stderr(Stdio::piped())
744            .spawn()?;
745
746        let pid = child.id().expect("failed to get pid");
747        let stdout = child.stdout.take().expect("stdout not open");
748        let stdout_handle = tokio::spawn(async move {
749            let mut lines = BufReader::new(stdout).lines();
750            while let Ok(Some(line)) = lines.next_line().await {
751                println!("benchmark{{pid={pid}}} {line}");
752            }
753        });
754
755        let stderr = child.stderr.take().expect("stderr not open");
756        let stderr_handle = tokio::spawn(async move {
757            let mut lines = BufReader::new(stderr).lines();
758            let mut tx = Some(tx);
759            while let Ok(Some(line)) = lines.next_line().await {
760                if line.contains("Ready to start benchmark") {
761                    tx.take()
762                        .expect("Should only send signal once")
763                        .send(())
764                        .expect("failed to send ready signal to main thread");
765                } else {
766                    println!("benchmark{{pid={pid}}} {line}");
767                }
768            }
769        });
770        Ok((child, stdout_handle, stderr_handle))
771    }
772
773    async fn open_chain_internal(
774        &self,
775        from: ChainId,
776        owner: Option<AccountOwner>,
777        initial_balance: Amount,
778        super_owner: bool,
779    ) -> Result<(ChainId, AccountOwner)> {
780        let mut command = self.command().await?;
781        command
782            .arg("open-chain")
783            .args(["--from", &from.to_string()])
784            .args(["--initial-balance", &initial_balance.to_string()]);
785
786        if let Some(owner) = owner {
787            command.args(["--owner", &owner.to_string()]);
788        }
789
790        if super_owner {
791            command.arg("--super-owner");
792        }
793
794        let stdout = command.spawn_and_wait_for_stdout().await?;
795        let mut split = stdout.split('\n');
796        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
797        let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
798        if let Some(owner) = owner {
799            assert_eq!(owner, new_owner);
800        }
801        Ok((chain_id, new_owner))
802    }
803
804    /// Runs `linera open-chain --super-owner`.
805    pub async fn open_chain_super_owner(
806        &self,
807        from: ChainId,
808        owner: Option<AccountOwner>,
809        initial_balance: Amount,
810    ) -> Result<(ChainId, AccountOwner)> {
811        self.open_chain_internal(from, owner, initial_balance, true)
812            .await
813    }
814
815    /// Runs `linera open-chain`.
816    pub async fn open_chain(
817        &self,
818        from: ChainId,
819        owner: Option<AccountOwner>,
820        initial_balance: Amount,
821    ) -> Result<(ChainId, AccountOwner)> {
822        self.open_chain_internal(from, owner, initial_balance, false)
823            .await
824    }
825
826    /// Runs `linera open-chain` then `linera assign`.
827    pub async fn open_and_assign(
828        &self,
829        client: &ClientWrapper,
830        initial_balance: Amount,
831    ) -> Result<ChainId> {
832        let our_chain = self
833            .load_wallet()?
834            .default_chain()
835            .context("no default chain found")?;
836        let owner = client.keygen().await?;
837        let (new_chain, _) = self
838            .open_chain(our_chain, Some(owner), initial_balance)
839            .await?;
840        client.assign(owner, new_chain).await?;
841        Ok(new_chain)
842    }
843
844    pub async fn open_multi_owner_chain(
845        &self,
846        from: ChainId,
847        owners: Vec<AccountOwner>,
848        weights: Vec<u64>,
849        multi_leader_rounds: u32,
850        balance: Amount,
851        base_timeout_ms: u64,
852    ) -> Result<ChainId> {
853        let mut command = self.command().await?;
854        command
855            .arg("open-multi-owner-chain")
856            .args(["--from", &from.to_string()])
857            .arg("--owners")
858            .args(owners.iter().map(AccountOwner::to_string))
859            .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
860        if !weights.is_empty() {
861            command
862                .arg("--owner-weights")
863                .args(weights.iter().map(u64::to_string));
864        };
865        command
866            .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
867            .args(["--initial-balance", &balance.to_string()]);
868
869        let stdout = command.spawn_and_wait_for_stdout().await?;
870        let mut split = stdout.split('\n');
871        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
872
873        Ok(chain_id)
874    }
875
876    pub async fn change_ownership(
877        &self,
878        chain_id: ChainId,
879        super_owners: Vec<AccountOwner>,
880        owners: Vec<AccountOwner>,
881    ) -> Result<()> {
882        let mut command = self.command().await?;
883        command
884            .arg("change-ownership")
885            .args(["--chain-id", &chain_id.to_string()]);
886        if !super_owners.is_empty() {
887            command
888                .arg("--super-owners")
889                .args(super_owners.iter().map(AccountOwner::to_string));
890        }
891        if !owners.is_empty() {
892            command
893                .arg("--owners")
894                .args(owners.iter().map(AccountOwner::to_string));
895        }
896        command.spawn_and_wait_for_stdout().await?;
897        Ok(())
898    }
899
900    /// Runs `linera wallet follow-chain CHAIN_ID`.
901    pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
902        let mut command = self.command().await?;
903        command
904            .args(["wallet", "follow-chain"])
905            .arg(chain_id.to_string());
906        if sync {
907            command.arg("--sync");
908        }
909        command.spawn_and_wait_for_stdout().await?;
910        Ok(())
911    }
912
913    /// Runs `linera wallet forget-chain CHAIN_ID`.
914    pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
915        let mut command = self.command().await?;
916        command
917            .args(["wallet", "forget-chain"])
918            .arg(chain_id.to_string());
919        command.spawn_and_wait_for_stdout().await?;
920        Ok(())
921    }
922
923    pub async fn retry_pending_block(
924        &self,
925        chain_id: Option<ChainId>,
926    ) -> Result<Option<CryptoHash>> {
927        let mut command = self.command().await?;
928        command.arg("retry-pending-block");
929        if let Some(chain_id) = chain_id {
930            command.arg(chain_id.to_string());
931        }
932        let stdout = command.spawn_and_wait_for_stdout().await?;
933        let stdout = stdout.trim();
934        if stdout.is_empty() {
935            Ok(None)
936        } else {
937            Ok(Some(CryptoHash::from_str(stdout)?))
938        }
939    }
940
941    /// Runs `linera publish-data-blob`.
942    pub async fn publish_data_blob(
943        &self,
944        path: &Path,
945        chain_id: Option<ChainId>,
946    ) -> Result<CryptoHash> {
947        let mut command = self.command().await?;
948        command.arg("publish-data-blob").arg(path);
949        if let Some(chain_id) = chain_id {
950            command.arg(chain_id.to_string());
951        }
952        let stdout = command.spawn_and_wait_for_stdout().await?;
953        let stdout = stdout.trim();
954        Ok(CryptoHash::from_str(stdout)?)
955    }
956
957    /// Runs `linera read-data-blob`.
958    pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
959        let mut command = self.command().await?;
960        command.arg("read-data-blob").arg(hash.to_string());
961        if let Some(chain_id) = chain_id {
962            command.arg(chain_id.to_string());
963        }
964        command.spawn_and_wait_for_stdout().await?;
965        Ok(())
966    }
967
968    pub fn load_wallet(&self) -> Result<Wallet> {
969        util::read_json(self.wallet_path())
970    }
971
972    pub fn load_keystore(&self) -> Result<InMemorySigner> {
973        util::read_json(self.keystore_path())
974    }
975
976    pub fn wallet_path(&self) -> PathBuf {
977        self.path_provider.path().join(&self.wallet)
978    }
979
980    pub fn keystore_path(&self) -> PathBuf {
981        self.path_provider.path().join(&self.keystore)
982    }
983
984    pub fn storage_path(&self) -> &str {
985        &self.storage
986    }
987
988    pub fn get_owner(&self) -> Option<AccountOwner> {
989        let wallet = self.load_wallet().ok()?;
990        let chain_id = wallet.default_chain()?;
991        wallet.get(chain_id)?.owner
992    }
993
994    pub fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
995        self.load_wallet()
996            .ok()
997            .is_some_and(|wallet| wallet.get(chain).is_some())
998    }
999
1000    pub async fn set_validator(
1001        &self,
1002        validator_key: &(String, String),
1003        port: usize,
1004        votes: usize,
1005    ) -> Result<()> {
1006        let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1007        self.command()
1008            .await?
1009            .arg("set-validator")
1010            .args(["--public-key", &validator_key.0])
1011            .args(["--account-key", &validator_key.1])
1012            .args(["--address", &address])
1013            .args(["--votes", &votes.to_string()])
1014            .spawn_and_wait_for_stdout()
1015            .await?;
1016        Ok(())
1017    }
1018
1019    pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
1020        self.command()
1021            .await?
1022            .arg("remove-validator")
1023            .args(["--public-key", validator_key])
1024            .spawn_and_wait_for_stdout()
1025            .await?;
1026        Ok(())
1027    }
1028
1029    pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
1030        self.command()
1031            .await?
1032            .arg("revoke-epochs")
1033            .arg(epoch.to_string())
1034            .spawn_and_wait_for_stdout()
1035            .await?;
1036        Ok(())
1037    }
1038
1039    /// Runs `linera keygen`.
1040    pub async fn keygen(&self) -> Result<AccountOwner> {
1041        let stdout = self
1042            .command()
1043            .await?
1044            .arg("keygen")
1045            .spawn_and_wait_for_stdout()
1046            .await?;
1047        AccountOwner::from_str(stdout.as_str().trim())
1048    }
1049
1050    /// Returns the default chain.
1051    pub fn default_chain(&self) -> Option<ChainId> {
1052        self.load_wallet().ok()?.default_chain()
1053    }
1054
1055    /// Runs `linera assign`.
1056    pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
1057        let _stdout = self
1058            .command()
1059            .await?
1060            .arg("assign")
1061            .args(["--owner", &owner.to_string()])
1062            .args(["--chain-id", &chain_id.to_string()])
1063            .spawn_and_wait_for_stdout()
1064            .await?;
1065        Ok(())
1066    }
1067
1068    /// Runs `linera set-preferred-owner` for `chain_id`.
1069    pub async fn set_preferred_owner(
1070        &self,
1071        chain_id: ChainId,
1072        owner: Option<AccountOwner>,
1073    ) -> Result<()> {
1074        let mut owner_arg = vec!["--owner".to_string()];
1075        if let Some(owner) = owner {
1076            owner_arg.push(owner.to_string());
1077        };
1078        self.command()
1079            .await?
1080            .arg("set-preferred-owner")
1081            .args(["--chain-id", &chain_id.to_string()])
1082            .args(owner_arg)
1083            .spawn_and_wait_for_stdout()
1084            .await?;
1085        Ok(())
1086    }
1087
1088    pub async fn build_application(
1089        &self,
1090        path: &Path,
1091        name: &str,
1092        is_workspace: bool,
1093    ) -> Result<(PathBuf, PathBuf)> {
1094        Command::new("cargo")
1095            .current_dir(self.path_provider.path())
1096            .arg("build")
1097            .arg("--release")
1098            .args(["--target", "wasm32-unknown-unknown"])
1099            .arg("--manifest-path")
1100            .arg(path.join("Cargo.toml"))
1101            .spawn_and_wait_for_stdout()
1102            .await?;
1103
1104        let release_dir = match is_workspace {
1105            true => path.join("../target/wasm32-unknown-unknown/release"),
1106            false => path.join("target/wasm32-unknown-unknown/release"),
1107        };
1108
1109        let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1110        let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1111
1112        let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1113        let service_size = fs_err::tokio::metadata(&service).await?.len();
1114        info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1115
1116        Ok((contract, service))
1117    }
1118}
1119
1120impl Drop for ClientWrapper {
1121    fn drop(&mut self) {
1122        use std::process::Command as SyncCommand;
1123
1124        if self.on_drop != OnClientDrop::CloseChains {
1125            return;
1126        }
1127
1128        let Ok(binary_path) = self.binary_path.lock() else {
1129            error!("Failed to close chains because a thread panicked with a lock to `binary_path`");
1130            return;
1131        };
1132
1133        let Some(binary_path) = binary_path.as_ref() else {
1134            warn!(
1135                "Assuming no chains need to be closed, because the command binary was never \
1136                resolved and therefore presumably never called"
1137            );
1138            return;
1139        };
1140
1141        let working_directory = self.path_provider.path();
1142        let mut wallet_show_command = SyncCommand::new(binary_path);
1143
1144        for argument in self.command_arguments() {
1145            wallet_show_command.arg(&*argument);
1146        }
1147
1148        let Ok(wallet_show_output) = wallet_show_command
1149            .current_dir(working_directory)
1150            .args(["wallet", "show", "--short", "--owned"])
1151            .output()
1152        else {
1153            warn!("Failed to execute `wallet show --short` to list chains to close");
1154            return;
1155        };
1156
1157        if !wallet_show_output.status.success() {
1158            warn!("Failed to list chains in the wallet to close them");
1159            return;
1160        }
1161
1162        let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1163            warn!(
1164                "Failed to close chains because `linera wallet show --short` \
1165                returned a non-UTF-8 output"
1166            );
1167            return;
1168        };
1169
1170        let chain_ids = chain_list_string
1171            .split('\n')
1172            .map(|line| line.trim())
1173            .filter(|line| !line.is_empty());
1174
1175        for chain_id in chain_ids {
1176            let mut close_chain_command = SyncCommand::new(binary_path);
1177
1178            for argument in self.command_arguments() {
1179                close_chain_command.arg(&*argument);
1180            }
1181
1182            close_chain_command.current_dir(working_directory);
1183
1184            match close_chain_command.args(["close-chain", chain_id]).status() {
1185                Ok(status) if status.success() => (),
1186                Ok(failure) => warn!("Failed to close chain {chain_id}: {failure}"),
1187                Err(error) => warn!("Failed to close chain {chain_id}: {error}"),
1188            }
1189        }
1190    }
1191}
1192
1193#[cfg(with_testing)]
1194impl ClientWrapper {
1195    pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1196        self.build_application(Self::example_path(name)?.as_path(), name, true)
1197            .await
1198    }
1199
1200    pub fn example_path(name: &str) -> Result<PathBuf> {
1201        Ok(env::current_dir()?.join("../examples/").join(name))
1202    }
1203}
1204
1205fn truncate_query_output(input: &str) -> String {
1206    let max_len = 1000;
1207    if input.len() < max_len {
1208        input.to_string()
1209    } else {
1210        format!("{} ...", input.get(..max_len).unwrap())
1211    }
1212}
1213
1214fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1215    let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1216    let max_len = 200;
1217    if query.len() < max_len {
1218        query
1219    } else {
1220        format!("{} ...", query.get(..max_len).unwrap())
1221    }
1222}
1223
1224/// A running node service.
1225pub struct NodeService {
1226    port: u16,
1227    child: Child,
1228}
1229
1230impl NodeService {
1231    fn new(port: u16, child: Child) -> Self {
1232        Self { port, child }
1233    }
1234
1235    pub async fn terminate(mut self) -> Result<()> {
1236        self.child.kill().await.context("terminating node service")
1237    }
1238
1239    pub fn port(&self) -> u16 {
1240        self.port
1241    }
1242
1243    pub fn ensure_is_running(&mut self) -> Result<()> {
1244        self.child.ensure_is_running()
1245    }
1246
1247    pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1248        let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1249        let mut data = self.query_node(query).await?;
1250        Ok(serde_json::from_value(data["processInbox"].take())?)
1251    }
1252
1253    pub async fn transfer(
1254        &self,
1255        chain_id: ChainId,
1256        owner: AccountOwner,
1257        recipient: Account,
1258        amount: Amount,
1259    ) -> Result<CryptoHash> {
1260        let json_owner = owner.to_value();
1261        let json_recipient = recipient.to_value();
1262        let query = format!(
1263            "mutation {{ transfer(\
1264                 chainId: \"{chain_id}\", \
1265                 owner: {json_owner}, \
1266                 recipient: {json_recipient}, \
1267                 amount: \"{amount}\") \
1268             }}"
1269        );
1270        let data = self.query_node(query).await?;
1271        serde_json::from_value(data["transfer"].clone())
1272            .context("missing transfer field in response")
1273    }
1274
1275    pub async fn balance(&self, account: &Account) -> Result<Amount> {
1276        let chain = account.chain_id;
1277        let owner = account.owner;
1278        if matches!(owner, AccountOwner::CHAIN) {
1279            let query = format!(
1280                "query {{ chain(chainId:\"{chain}\") {{
1281                    executionState {{ system {{ balance }} }}
1282                }} }}"
1283            );
1284            let response = self.query_node(query).await?;
1285            let balance = &response["chain"]["executionState"]["system"]["balance"]
1286                .as_str()
1287                .unwrap();
1288            return Ok(Amount::from_str(balance)?);
1289        }
1290        let query = format!(
1291            "query {{ chain(chainId:\"{chain}\") {{
1292                executionState {{ system {{ balances {{
1293                    entry(key:\"{owner}\") {{ value }}
1294                }} }} }}
1295            }} }}"
1296        );
1297        let response = self.query_node(query).await?;
1298        let balances = &response["chain"]["executionState"]["system"]["balances"];
1299        let balance = balances["entry"]["value"].as_str();
1300        match balance {
1301            None => Ok(Amount::ZERO),
1302            Some(amount) => Ok(Amount::from_str(amount)?),
1303        }
1304    }
1305
1306    pub fn make_application<A: ContractAbi>(
1307        &self,
1308        chain_id: &ChainId,
1309        application_id: &ApplicationId<A>,
1310    ) -> Result<ApplicationWrapper<A>> {
1311        let application_id = application_id.forget_abi().to_string();
1312        let link = format!(
1313            "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1314            self.port
1315        );
1316        Ok(ApplicationWrapper::from(link))
1317    }
1318
1319    pub async fn publish_data_blob(
1320        &self,
1321        chain_id: &ChainId,
1322        bytes: Vec<u8>,
1323    ) -> Result<CryptoHash> {
1324        let query = format!(
1325            "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1326            chain_id.to_value(),
1327            bytes.to_value(),
1328        );
1329        let data = self.query_node(query).await?;
1330        serde_json::from_value(data["publishDataBlob"].clone())
1331            .context("missing publishDataBlob field in response")
1332    }
1333
1334    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1335        &self,
1336        chain_id: &ChainId,
1337        contract: PathBuf,
1338        service: PathBuf,
1339        vm_runtime: VmRuntime,
1340    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1341        let contract_code = Bytecode::load_from_file(&contract)?;
1342        let service_code = Bytecode::load_from_file(&service)?;
1343        let query = format!(
1344            "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1345            chain_id.to_value(),
1346            contract_code.to_value(),
1347            service_code.to_value(),
1348            vm_runtime.to_value(),
1349        );
1350        let data = self.query_node(query).await?;
1351        let module_str = data["publishModule"]
1352            .as_str()
1353            .context("module ID not found")?;
1354        let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1355        Ok(module_id.with_abi())
1356    }
1357
1358    pub async fn query_committees(&self, chain_id: &ChainId) -> Result<BTreeMap<Epoch, Committee>> {
1359        let query = format!(
1360            "query {{ chain(chainId:\"{chain_id}\") {{
1361                executionState {{ system {{ committees }} }}
1362            }} }}"
1363        );
1364        let mut response = self.query_node(query).await?;
1365        let committees = response["chain"]["executionState"]["system"]["committees"].take();
1366        Ok(serde_json::from_value(committees)?)
1367    }
1368
1369    pub async fn events_from_index(
1370        &self,
1371        chain_id: &ChainId,
1372        stream_id: &StreamId,
1373        start_index: u32,
1374    ) -> Result<Vec<IndexAndEvent>> {
1375        let query = format!(
1376            "query {{
1377               eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1378               {{ index event }}
1379             }}",
1380            stream_id.to_value()
1381        );
1382        let mut response = self.query_node(query).await?;
1383        let response = response["eventsFromIndex"].take();
1384        Ok(serde_json::from_value(response)?)
1385    }
1386
1387    pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1388        let n_try = 5;
1389        let query = query.as_ref();
1390        for i in 0..n_try {
1391            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1392            let url = format!("http://localhost:{}/", self.port);
1393            let client = reqwest_client();
1394            let result = client
1395                .post(url)
1396                .json(&json!({ "query": query }))
1397                .send()
1398                .await;
1399            if matches!(result, Err(ref error) if error.is_timeout()) {
1400                warn!(
1401                    "Timeout when sending query {} to the node service",
1402                    truncate_query_output(query)
1403                );
1404                continue;
1405            }
1406            let response = result.with_context(|| {
1407                format!(
1408                    "query_node: failed to post query={}",
1409                    truncate_query_output(query)
1410                )
1411            })?;
1412            ensure!(
1413                response.status().is_success(),
1414                "Query \"{}\" failed: {}",
1415                truncate_query_output(query),
1416                response
1417                    .text()
1418                    .await
1419                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1420            );
1421            let value: Value = response.json().await.context("invalid JSON")?;
1422            if let Some(errors) = value.get("errors") {
1423                warn!(
1424                    "Query \"{}\" failed: {}",
1425                    truncate_query_output(query),
1426                    errors
1427                );
1428            } else {
1429                return Ok(value["data"].clone());
1430            }
1431        }
1432        bail!(
1433            "Query \"{}\" failed after {} retries.",
1434            truncate_query_output(query),
1435            n_try
1436        );
1437    }
1438
1439    pub async fn create_application<
1440        Abi: ContractAbi,
1441        Parameters: Serialize,
1442        InstantiationArgument: Serialize,
1443    >(
1444        &self,
1445        chain_id: &ChainId,
1446        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1447        parameters: &Parameters,
1448        argument: &InstantiationArgument,
1449        required_application_ids: &[ApplicationId],
1450    ) -> Result<ApplicationId<Abi>> {
1451        let module_id = module_id.forget_abi();
1452        let json_required_applications_ids = required_application_ids
1453            .iter()
1454            .map(ApplicationId::to_string)
1455            .collect::<Vec<_>>()
1456            .to_value();
1457        // Convert to `serde_json::Value` then `async_graphql::Value` via the trait `InputType`.
1458        let new_parameters = serde_json::to_value(parameters)
1459            .context("could not create parameters JSON")?
1460            .to_value();
1461        let new_argument = serde_json::to_value(argument)
1462            .context("could not create argument JSON")?
1463            .to_value();
1464        let query = format!(
1465            "mutation {{ createApplication(\
1466                 chainId: \"{chain_id}\",
1467                 moduleId: \"{module_id}\", \
1468                 parameters: {new_parameters}, \
1469                 instantiationArgument: {new_argument}, \
1470                 requiredApplicationIds: {json_required_applications_ids}) \
1471             }}"
1472        );
1473        let data = self.query_node(query).await?;
1474        let app_id_str = data["createApplication"]
1475            .as_str()
1476            .context("missing createApplication string in response")?
1477            .trim();
1478        Ok(app_id_str
1479            .parse::<ApplicationId>()
1480            .context("invalid application ID")?
1481            .with_abi())
1482    }
1483
1484    /// Obtains the hash of the `chain`'s tip block, as known by this node service.
1485    pub async fn chain_tip_hash(&self, chain: ChainId) -> Result<Option<CryptoHash>> {
1486        let query = format!(r#"query {{ block(chainId: "{chain}") {{ hash }} }}"#);
1487
1488        let mut response = self.query_node(&query).await?;
1489
1490        match mem::take(&mut response["block"]["hash"]) {
1491            Value::Null => Ok(None),
1492            Value::String(hash) => Ok(Some(
1493                hash.parse()
1494                    .context("Received an invalid hash {hash:?} for chain tip")?,
1495            )),
1496            invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
1497        }
1498    }
1499
1500    /// Subscribes to the node service and returns a stream of notifications about a chain.
1501    pub async fn notifications(
1502        &self,
1503        chain_id: ChainId,
1504    ) -> Result<impl Stream<Item = Result<Notification>>> {
1505        let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
1506        let url = format!("ws://localhost:{}/ws", self.port);
1507        let mut request = url.into_client_request()?;
1508        request.headers_mut().insert(
1509            "Sec-WebSocket-Protocol",
1510            HeaderValue::from_str("graphql-transport-ws")?,
1511        );
1512        let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1513        let init_json = json!({
1514          "type": "connection_init",
1515          "payload": {}
1516        });
1517        websocket.send(init_json.to_string().into()).await?;
1518        let text = websocket
1519            .next()
1520            .await
1521            .context("Failed to establish connection")??
1522            .into_text()?;
1523        ensure!(
1524            text == "{\"type\":\"connection_ack\"}",
1525            "Unexpected response: {text}"
1526        );
1527        let query_json = json!({
1528          "id": "1",
1529          "type": "start",
1530          "payload": {
1531            "query": query,
1532            "variables": {},
1533            "operationName": null
1534          }
1535        });
1536        websocket.send(query_json.to_string().into()).await?;
1537        Ok(websocket
1538            .map_err(anyhow::Error::from)
1539            .and_then(|message| async {
1540                let text = message.into_text()?;
1541                let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1542                if let Some(errors) = value["payload"].get("errors") {
1543                    bail!("Notification subscription failed: {errors:?}");
1544                }
1545                serde_json::from_value(value["payload"]["data"]["notifications"].clone())
1546                    .context("Failed to deserialize notification")
1547            }))
1548    }
1549}
1550
1551/// A running faucet service.
1552pub struct FaucetService {
1553    port: u16,
1554    child: Child,
1555}
1556
1557impl FaucetService {
1558    fn new(port: u16, child: Child) -> Self {
1559        Self { port, child }
1560    }
1561
1562    pub async fn terminate(mut self) -> Result<()> {
1563        self.child
1564            .kill()
1565            .await
1566            .context("terminating faucet service")
1567    }
1568
1569    pub fn ensure_is_running(&mut self) -> Result<()> {
1570        self.child.ensure_is_running()
1571    }
1572
1573    pub fn instance(&self) -> Faucet {
1574        Faucet::new(format!("http://localhost:{}/", self.port))
1575    }
1576}
1577
1578/// A running `Application` to be queried in GraphQL.
1579pub struct ApplicationWrapper<A> {
1580    uri: String,
1581    _phantom: PhantomData<A>,
1582}
1583
1584impl<A> ApplicationWrapper<A> {
1585    pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
1586        let query = query.as_ref();
1587        let value = self.run_json_query(json!({ "query": query })).await?;
1588        Ok(value["data"].clone())
1589    }
1590
1591    pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
1592        const MAX_RETRIES: usize = 5;
1593
1594        for i in 0.. {
1595            let client = reqwest_client();
1596            let result = client.post(&self.uri).json(&query).send().await;
1597            let response = match result {
1598                Ok(response) => response,
1599                Err(error) if i < MAX_RETRIES => {
1600                    warn!(
1601                        "Failed to post query \"{}\": {error}; retrying",
1602                        truncate_query_output_serialize(&query),
1603                    );
1604                    continue;
1605                }
1606                Err(error) => {
1607                    let query = truncate_query_output_serialize(&query);
1608                    return Err(error)
1609                        .with_context(|| format!("run_json_query: failed to post query={query}"));
1610                }
1611            };
1612            ensure!(
1613                response.status().is_success(),
1614                "Query \"{}\" failed: {}",
1615                truncate_query_output_serialize(&query),
1616                response
1617                    .text()
1618                    .await
1619                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1620            );
1621            let value: Value = response.json().await.context("invalid JSON")?;
1622            if let Some(errors) = value.get("errors") {
1623                bail!(
1624                    "Query \"{}\" failed: {}",
1625                    truncate_query_output_serialize(&query),
1626                    errors
1627                );
1628            }
1629            return Ok(value);
1630        }
1631        unreachable!()
1632    }
1633
1634    pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
1635        let query = query.as_ref();
1636        self.run_graphql_query(&format!("query {{ {query} }}"))
1637            .await
1638    }
1639
1640    pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
1641        let query = query.as_ref().trim();
1642        let name = query
1643            .split_once(|ch: char| !ch.is_alphanumeric())
1644            .map_or(query, |(name, _)| name);
1645        let data = self.query(query).await?;
1646        serde_json::from_value(data[name].clone())
1647            .with_context(|| format!("{name} field missing in response"))
1648    }
1649
1650    pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
1651        let mutation = mutation.as_ref();
1652        self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
1653            .await
1654    }
1655}
1656
1657impl<A> From<String> for ApplicationWrapper<A> {
1658    fn from(uri: String) -> ApplicationWrapper<A> {
1659        ApplicationWrapper {
1660            uri,
1661            _phantom: PhantomData,
1662        }
1663    }
1664}