linera_service/cli_wrappers/
wallet.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    borrow::Cow,
6    collections::BTreeMap,
7    env,
8    marker::PhantomData,
9    mem,
10    path::{Path, PathBuf},
11    pin::Pin,
12    process::Stdio,
13    str::FromStr,
14    sync,
15    time::Duration,
16};
17
18use anyhow::{bail, ensure, Context, Result};
19use async_graphql::InputType;
20use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
21use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
22use heck::ToKebabCase;
23use linera_base::{
24    abi::ContractAbi,
25    command::{resolve_binary, CommandExt},
26    crypto::{CryptoHash, InMemorySigner},
27    data_types::{Amount, BlockHeight, Bytecode, Epoch},
28    identifiers::{
29        Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
30    },
31    vm::VmRuntime,
32};
33use linera_client::{client_options::ResourceControlPolicyConfig, wallet::Wallet};
34use linera_core::worker::Notification;
35use linera_execution::committee::Committee;
36use linera_faucet_client::Faucet;
37use serde::{de::DeserializeOwned, ser::Serialize};
38use serde_command_opts::to_args;
39use serde_json::{json, Value};
40use tempfile::TempDir;
41use tokio::{
42    io::{AsyncBufReadExt, BufReader},
43    process::{Child, Command},
44    sync::oneshot,
45    task::JoinHandle,
46};
47#[cfg(with_testing)]
48use {
49    futures::FutureExt as _,
50    linera_core::worker::Reason,
51    std::{collections::BTreeSet, future::Future},
52};
53
54use crate::{
55    cli::command::BenchmarkCommand,
56    cli_wrappers::{
57        local_net::{PathProvider, ProcessInbox},
58        Network,
59    },
60    util::{self, ChildExt},
61};
62
63/// The name of the environment variable that allows specifying additional arguments to be passed
64/// to the node-service command of the client.
65const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
66
67fn reqwest_client() -> reqwest::Client {
68    reqwest::ClientBuilder::new()
69        .timeout(Duration::from_secs(30))
70        .build()
71        .unwrap()
72}
73
74/// Wrapper to run a Linera client command.
75pub struct ClientWrapper {
76    binary_path: sync::Mutex<Option<PathBuf>>,
77    testing_prng_seed: Option<u64>,
78    storage: String,
79    wallet: String,
80    keystore: String,
81    max_pending_message_bundles: usize,
82    network: Network,
83    pub path_provider: PathProvider,
84    on_drop: OnClientDrop,
85    extra_args: Vec<String>,
86}
87
88/// Action to perform when the [`ClientWrapper`] is dropped.
89#[derive(Clone, Copy, Debug, Eq, PartialEq)]
90pub enum OnClientDrop {
91    /// Close all the chains on the wallet.
92    CloseChains,
93    /// Do not close any chains, leaving them active.
94    LeakChains,
95}
96
97impl ClientWrapper {
98    pub fn new(
99        path_provider: PathProvider,
100        network: Network,
101        testing_prng_seed: Option<u64>,
102        id: usize,
103        on_drop: OnClientDrop,
104    ) -> Self {
105        Self::new_with_extra_args(
106            path_provider,
107            network,
108            testing_prng_seed,
109            id,
110            on_drop,
111            vec!["--wait-for-outgoing-messages".to_string()],
112        )
113    }
114
115    pub fn new_with_extra_args(
116        path_provider: PathProvider,
117        network: Network,
118        testing_prng_seed: Option<u64>,
119        id: usize,
120        on_drop: OnClientDrop,
121        extra_args: Vec<String>,
122    ) -> Self {
123        let storage = format!(
124            "rocksdb:{}/client_{}.db",
125            path_provider.path().display(),
126            id
127        );
128        let wallet = format!("wallet_{}.json", id);
129        let keystore = format!("keystore_{}.json", id);
130        Self {
131            binary_path: sync::Mutex::new(None),
132            testing_prng_seed,
133            storage,
134            wallet,
135            keystore,
136            max_pending_message_bundles: 10_000,
137            network,
138            path_provider,
139            on_drop,
140            extra_args,
141        }
142    }
143
144    /// Runs `linera project new`.
145    pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
146        let tmp = TempDir::new()?;
147        let mut command = self.command().await?;
148        command
149            .current_dir(tmp.path())
150            .arg("project")
151            .arg("new")
152            .arg(project_name)
153            .arg("--linera-root")
154            .arg(linera_root)
155            .spawn_and_wait_for_stdout()
156            .await?;
157        Ok(tmp)
158    }
159
160    /// Runs `linera project publish`.
161    pub async fn project_publish<T: Serialize>(
162        &self,
163        path: PathBuf,
164        required_application_ids: Vec<String>,
165        publisher: impl Into<Option<ChainId>>,
166        argument: &T,
167    ) -> Result<String> {
168        let json_parameters = serde_json::to_string(&())?;
169        let json_argument = serde_json::to_string(argument)?;
170        let mut command = self.command().await?;
171        command
172            .arg("project")
173            .arg("publish-and-create")
174            .arg(path)
175            .args(publisher.into().iter().map(ChainId::to_string))
176            .args(["--json-parameters", &json_parameters])
177            .args(["--json-argument", &json_argument]);
178        if !required_application_ids.is_empty() {
179            command.arg("--required-application-ids");
180            command.args(required_application_ids);
181        }
182        let stdout = command.spawn_and_wait_for_stdout().await?;
183        Ok(stdout.trim().to_string())
184    }
185
186    /// Runs `linera project test`.
187    pub async fn project_test(&self, path: &Path) -> Result<()> {
188        self.command()
189            .await
190            .context("failed to create project test command")?
191            .current_dir(path)
192            .arg("project")
193            .arg("test")
194            .spawn_and_wait_for_stdout()
195            .await?;
196        Ok(())
197    }
198
199    async fn command_with_envs_and_arguments(
200        &self,
201        envs: &[(&str, &str)],
202        arguments: impl IntoIterator<Item = Cow<'_, str>>,
203    ) -> Result<Command> {
204        let mut command = self.command_binary().await?;
205        command.current_dir(self.path_provider.path());
206        for (key, value) in envs {
207            command.env(key, value);
208        }
209        for argument in arguments {
210            command.arg(&*argument);
211        }
212        Ok(command)
213    }
214
215    async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
216        self.command_with_envs_and_arguments(envs, self.command_arguments())
217            .await
218    }
219
220    async fn command_with_arguments(
221        &self,
222        arguments: impl IntoIterator<Item = Cow<'_, str>>,
223    ) -> Result<Command> {
224        self.command_with_envs_and_arguments(
225            &[(
226                "RUST_LOG",
227                &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
228            )],
229            arguments,
230        )
231        .await
232    }
233
234    async fn command(&self) -> Result<Command> {
235        self.command_with_envs(&[(
236            "RUST_LOG",
237            &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
238        )])
239        .await
240    }
241
242    fn required_command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
243        [
244            "--wallet".into(),
245            self.wallet.as_str().into(),
246            "--keystore".into(),
247            self.keystore.as_str().into(),
248            "--storage".into(),
249            self.storage.as_str().into(),
250            "--send-timeout-ms".into(),
251            "500000".into(),
252            "--recv-timeout-ms".into(),
253            "500000".into(),
254        ]
255        .into_iter()
256        .chain(self.extra_args.iter().map(|s| s.as_str().into()))
257    }
258
259    /// Returns an iterator over the arguments that should be added to all command invocations.
260    fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
261        self.required_command_arguments().chain([
262            "--max-pending-message-bundles".into(),
263            self.max_pending_message_bundles.to_string().into(),
264        ])
265    }
266
267    /// Returns the [`Command`] instance configured to run the appropriate binary.
268    ///
269    /// The path is resolved once and cached inside `self` for subsequent usages.
270    async fn command_binary(&self) -> Result<Command> {
271        match self.command_with_cached_binary_path() {
272            Some(command) => Ok(command),
273            None => {
274                let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
275                let command = Command::new(&resolved_path);
276
277                self.set_cached_binary_path(resolved_path);
278
279                Ok(command)
280            }
281        }
282    }
283
284    /// Returns a [`Command`] instance configured with the cached `binary_path`, if available.
285    fn command_with_cached_binary_path(&self) -> Option<Command> {
286        let binary_path = self.binary_path.lock().unwrap();
287
288        binary_path.as_ref().map(Command::new)
289    }
290
291    /// Sets the cached `binary_path` with the `new_binary_path`.
292    ///
293    /// # Panics
294    ///
295    /// If the cache is already set to a different value. In theory the two threads calling
296    /// `command_binary` can race and resolve the binary path twice, but they should always be the
297    /// same path.
298    fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
299        let mut binary_path = self.binary_path.lock().unwrap();
300
301        if binary_path.is_none() {
302            *binary_path = Some(new_binary_path);
303        } else {
304            assert_eq!(*binary_path, Some(new_binary_path));
305        }
306    }
307
308    /// Runs `linera create-genesis-config`.
309    pub async fn create_genesis_config(
310        &self,
311        num_other_initial_chains: u32,
312        initial_funding: Amount,
313        policy_config: ResourceControlPolicyConfig,
314        http_allow_list: Option<Vec<String>>,
315    ) -> Result<()> {
316        let mut command = self.command().await?;
317        command
318            .args([
319                "create-genesis-config",
320                &num_other_initial_chains.to_string(),
321            ])
322            .args(["--initial-funding", &initial_funding.to_string()])
323            .args(["--committee", "committee.json"])
324            .args(["--genesis", "genesis.json"])
325            .args([
326                "--policy-config",
327                &policy_config.to_string().to_kebab_case(),
328            ]);
329        if let Some(allow_list) = http_allow_list {
330            command
331                .arg("--http-request-allow-list")
332                .arg(allow_list.join(","));
333        }
334        if let Some(seed) = self.testing_prng_seed {
335            command.arg("--testing-prng-seed").arg(seed.to_string());
336        }
337        command.spawn_and_wait_for_stdout().await?;
338        Ok(())
339    }
340
341    /// Runs `linera wallet init`. The genesis config is read from `genesis.json`, or from the
342    /// faucet if provided.
343    pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
344        let mut command = self.command().await?;
345        command.args(["wallet", "init"]);
346        match faucet {
347            None => command.args(["--genesis", "genesis.json"]),
348            Some(faucet) => command.args(["--faucet", faucet.url()]),
349        };
350        if let Some(seed) = self.testing_prng_seed {
351            command.arg("--testing-prng-seed").arg(seed.to_string());
352        }
353        command.spawn_and_wait_for_stdout().await?;
354        Ok(())
355    }
356
357    /// Runs `linera wallet request-chain`.
358    pub async fn request_chain(
359        &self,
360        faucet: &Faucet,
361        set_default: bool,
362    ) -> Result<(ChainId, AccountOwner)> {
363        let mut command = self.command().await?;
364        command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
365        if set_default {
366            command.arg("--set-default");
367        }
368        let stdout = command.spawn_and_wait_for_stdout().await?;
369        let mut lines = stdout.split_whitespace();
370        let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
371        let owner = lines.next().context("missing chain owner")?.parse()?;
372        Ok((chain_id, owner))
373    }
374
375    /// Runs `linera wallet publish-and-create`.
376    #[expect(clippy::too_many_arguments)]
377    pub async fn publish_and_create<
378        A: ContractAbi,
379        Parameters: Serialize,
380        InstantiationArgument: Serialize,
381    >(
382        &self,
383        contract: PathBuf,
384        service: PathBuf,
385        vm_runtime: VmRuntime,
386        parameters: &Parameters,
387        argument: &InstantiationArgument,
388        required_application_ids: &[ApplicationId],
389        publisher: impl Into<Option<ChainId>>,
390    ) -> Result<ApplicationId<A>> {
391        let json_parameters = serde_json::to_string(parameters)?;
392        let json_argument = serde_json::to_string(argument)?;
393        let mut command = self.command().await?;
394        let vm_runtime = format!("{}", vm_runtime);
395        command
396            .arg("publish-and-create")
397            .args([contract, service])
398            .args(["--vm-runtime", &vm_runtime.to_lowercase()])
399            .args(publisher.into().iter().map(ChainId::to_string))
400            .args(["--json-parameters", &json_parameters])
401            .args(["--json-argument", &json_argument]);
402        if !required_application_ids.is_empty() {
403            command.arg("--required-application-ids");
404            command.args(
405                required_application_ids
406                    .iter()
407                    .map(ApplicationId::to_string),
408            );
409        }
410        let stdout = command.spawn_and_wait_for_stdout().await?;
411        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
412    }
413
414    /// Runs `linera publish-module`.
415    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
416        &self,
417        contract: PathBuf,
418        service: PathBuf,
419        vm_runtime: VmRuntime,
420        publisher: impl Into<Option<ChainId>>,
421    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
422        let stdout = self
423            .command()
424            .await?
425            .arg("publish-module")
426            .args([contract, service])
427            .args(["--vm-runtime", &format!("{}", vm_runtime).to_lowercase()])
428            .args(publisher.into().iter().map(ChainId::to_string))
429            .spawn_and_wait_for_stdout()
430            .await?;
431        let module_id: ModuleId = stdout.trim().parse()?;
432        Ok(module_id.with_abi())
433    }
434
435    /// Runs `linera create-application`.
436    pub async fn create_application<
437        Abi: ContractAbi,
438        Parameters: Serialize,
439        InstantiationArgument: Serialize,
440    >(
441        &self,
442        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
443        parameters: &Parameters,
444        argument: &InstantiationArgument,
445        required_application_ids: &[ApplicationId],
446        creator: impl Into<Option<ChainId>>,
447    ) -> Result<ApplicationId<Abi>> {
448        let json_parameters = serde_json::to_string(parameters)?;
449        let json_argument = serde_json::to_string(argument)?;
450        let mut command = self.command().await?;
451        command
452            .arg("create-application")
453            .arg(module_id.forget_abi().to_string())
454            .args(["--json-parameters", &json_parameters])
455            .args(["--json-argument", &json_argument])
456            .args(creator.into().iter().map(ChainId::to_string));
457        if !required_application_ids.is_empty() {
458            command.arg("--required-application-ids");
459            command.args(
460                required_application_ids
461                    .iter()
462                    .map(ApplicationId::to_string),
463            );
464        }
465        let stdout = command.spawn_and_wait_for_stdout().await?;
466        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
467    }
468
469    /// Runs `linera service`.
470    pub async fn run_node_service(
471        &self,
472        port: impl Into<Option<u16>>,
473        process_inbox: ProcessInbox,
474    ) -> Result<NodeService> {
475        let port = port.into().unwrap_or(8080);
476        let mut command = self.command().await?;
477        command.arg("service");
478        if let ProcessInbox::Skip = process_inbox {
479            command.arg("--listener-skip-process-inbox");
480        }
481        if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
482            command.args(var.split_whitespace());
483        }
484        let child = command
485            .args(["--port".to_string(), port.to_string()])
486            .spawn_into()?;
487        let client = reqwest_client();
488        for i in 0..10 {
489            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
490            let request = client
491                .get(format!("http://localhost:{}/", port))
492                .send()
493                .await;
494            if request.is_ok() {
495                tracing::info!("Node service has started");
496                return Ok(NodeService::new(port, child));
497            } else {
498                tracing::warn!("Waiting for node service to start");
499            }
500        }
501        bail!("Failed to start node service");
502    }
503
504    /// Runs `linera query-validator`
505    pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
506        let mut command = self.command().await?;
507        command.arg("query-validator").arg(address);
508        let stdout = command.spawn_and_wait_for_stdout().await?;
509
510        // Parse the genesis config hash from the output.
511        // It's on a line like "Genesis config hash: <hash>"
512        let hash = stdout
513            .lines()
514            .find_map(|line| {
515                line.strip_prefix("Genesis config hash: ")
516                    .and_then(|hash_str| hash_str.trim().parse().ok())
517            })
518            .context("error while parsing the result of `linera query-validator`")?;
519        Ok(hash)
520    }
521
522    /// Runs `linera query-validators`.
523    pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
524        let mut command = self.command().await?;
525        command.arg("query-validators");
526        if let Some(chain_id) = chain_id {
527            command.arg(chain_id.to_string());
528        }
529        command.spawn_and_wait_for_stdout().await?;
530        Ok(())
531    }
532
533    /// Runs `linera sync-validator`.
534    pub async fn sync_validator(
535        &self,
536        chain_ids: impl IntoIterator<Item = &ChainId>,
537        validator_address: impl Into<String>,
538    ) -> Result<()> {
539        let mut command = self.command().await?;
540        command.arg("sync-validator").arg(validator_address.into());
541        let mut chain_ids = chain_ids.into_iter().peekable();
542        if chain_ids.peek().is_some() {
543            command
544                .arg("--chains")
545                .args(chain_ids.map(ChainId::to_string));
546        }
547        command.spawn_and_wait_for_stdout().await?;
548        Ok(())
549    }
550
551    /// Runs `linera faucet`.
552    pub async fn run_faucet(
553        &self,
554        port: impl Into<Option<u16>>,
555        chain_id: Option<ChainId>,
556        amount: Amount,
557    ) -> Result<FaucetService> {
558        let port = port.into().unwrap_or(8080);
559        let temp_dir = tempfile::tempdir()
560            .context("Failed to create temporary directory for faucet storage")?;
561        let storage_path = temp_dir.path().join("faucet_storage.sqlite");
562        let mut command = self.command().await?;
563        let command = command
564            .arg("faucet")
565            .args(["--port".to_string(), port.to_string()])
566            .args(["--amount".to_string(), amount.to_string()])
567            .args([
568                "--storage-path".to_string(),
569                storage_path.to_string_lossy().to_string(),
570            ]);
571        if let Some(chain_id) = chain_id {
572            command.arg(chain_id.to_string());
573        }
574        let child = command.spawn_into()?;
575        let client = reqwest_client();
576        for i in 0..10 {
577            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
578            let request = client
579                .get(format!("http://localhost:{}/", port))
580                .send()
581                .await;
582            if request.is_ok() {
583                tracing::info!("Faucet has started");
584                return Ok(FaucetService::new(port, child, temp_dir));
585            } else {
586                tracing::debug!("Waiting for faucet to start");
587            }
588        }
589        bail!("Failed to start faucet");
590    }
591
592    /// Runs `linera local-balance`.
593    pub async fn local_balance(&self, account: Account) -> Result<Amount> {
594        let stdout = self
595            .command()
596            .await?
597            .arg("local-balance")
598            .arg(account.to_string())
599            .spawn_and_wait_for_stdout()
600            .await?;
601        let amount = stdout
602            .trim()
603            .parse()
604            .context("error while parsing the result of `linera local-balance`")?;
605        Ok(amount)
606    }
607
608    /// Runs `linera query-balance`.
609    pub async fn query_balance(&self, account: Account) -> Result<Amount> {
610        let stdout = self
611            .command()
612            .await?
613            .arg("query-balance")
614            .arg(account.to_string())
615            .spawn_and_wait_for_stdout()
616            .await?;
617        let amount = stdout
618            .trim()
619            .parse()
620            .context("error while parsing the result of `linera query-balance`")?;
621        Ok(amount)
622    }
623
624    /// Runs `linera sync`.
625    pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
626        self.command()
627            .await?
628            .arg("sync")
629            .arg(chain_id.to_string())
630            .spawn_and_wait_for_stdout()
631            .await?;
632        Ok(())
633    }
634
635    /// Runs `linera process-inbox`.
636    pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
637        self.command()
638            .await?
639            .arg("process-inbox")
640            .arg(chain_id.to_string())
641            .spawn_and_wait_for_stdout()
642            .await?;
643        Ok(())
644    }
645
646    /// Runs `linera transfer`.
647    pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
648        self.command()
649            .await?
650            .arg("transfer")
651            .arg(amount.to_string())
652            .args(["--from", &from.to_string()])
653            .args(["--to", &to.to_string()])
654            .spawn_and_wait_for_stdout()
655            .await?;
656        Ok(())
657    }
658
659    /// Runs `linera transfer` with no logging.
660    pub async fn transfer_with_silent_logs(
661        &self,
662        amount: Amount,
663        from: ChainId,
664        to: ChainId,
665    ) -> Result<()> {
666        self.command()
667            .await?
668            .env("RUST_LOG", "off")
669            .arg("transfer")
670            .arg(amount.to_string())
671            .args(["--from", &from.to_string()])
672            .args(["--to", &to.to_string()])
673            .spawn_and_wait_for_stdout()
674            .await?;
675        Ok(())
676    }
677
678    /// Runs `linera transfer` with owner accounts.
679    pub async fn transfer_with_accounts(
680        &self,
681        amount: Amount,
682        from: Account,
683        to: Account,
684    ) -> Result<()> {
685        self.command()
686            .await?
687            .arg("transfer")
688            .arg(amount.to_string())
689            .args(["--from", &from.to_string()])
690            .args(["--to", &to.to_string()])
691            .spawn_and_wait_for_stdout()
692            .await?;
693        Ok(())
694    }
695
696    fn benchmark_command_internal(command: &mut Command, args: BenchmarkCommand) -> Result<()> {
697        let mut formatted_args = to_args(&args)?;
698        let subcommand = formatted_args.remove(0);
699        // The subcommand is followed by the flattened options, which are preceded by "options".
700        // So remove that as well.
701        formatted_args.remove(0);
702        let options = formatted_args
703            .chunks_exact(2)
704            .flat_map(|pair| {
705                let option = format!("--{}", pair[0]);
706                match pair[1].as_str() {
707                    "true" => vec![option],
708                    "false" => vec![],
709                    _ => vec![option, pair[1].clone()],
710                }
711            })
712            .collect::<Vec<_>>();
713        command
714            .args([
715                "--max-pending-message-bundles",
716                &args.transactions_per_block().to_string(),
717            ])
718            .arg("benchmark")
719            .arg(subcommand)
720            .args(options);
721        Ok(())
722    }
723
724    async fn benchmark_command_with_envs(
725        &self,
726        args: BenchmarkCommand,
727        envs: &[(&str, &str)],
728    ) -> Result<Command> {
729        let mut command = self
730            .command_with_envs_and_arguments(envs, self.required_command_arguments())
731            .await?;
732        Self::benchmark_command_internal(&mut command, args)?;
733        Ok(command)
734    }
735
736    async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
737        let mut command = self
738            .command_with_arguments(self.required_command_arguments())
739            .await?;
740        Self::benchmark_command_internal(&mut command, args)?;
741        Ok(command)
742    }
743
744    /// Runs `linera benchmark`.
745    pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
746        let mut command = self.benchmark_command(args).await?;
747        command.spawn_and_wait_for_stdout().await?;
748        Ok(())
749    }
750
751    /// Runs `linera benchmark`, but detached: don't wait for the command to finish, just spawn it
752    /// and return the child process, and the handles to the stdout and stderr.
753    pub async fn benchmark_detached(
754        &self,
755        args: BenchmarkCommand,
756        tx: oneshot::Sender<()>,
757    ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
758        let mut child = self
759            .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
760            .await?
761            .kill_on_drop(true)
762            .stdin(Stdio::piped())
763            .stdout(Stdio::piped())
764            .stderr(Stdio::piped())
765            .spawn()?;
766
767        let pid = child.id().expect("failed to get pid");
768        let stdout = child.stdout.take().expect("stdout not open");
769        let stdout_handle = tokio::spawn(async move {
770            let mut lines = BufReader::new(stdout).lines();
771            while let Ok(Some(line)) = lines.next_line().await {
772                println!("benchmark{{pid={pid}}} {line}");
773            }
774        });
775
776        let stderr = child.stderr.take().expect("stderr not open");
777        let stderr_handle = tokio::spawn(async move {
778            let mut lines = BufReader::new(stderr).lines();
779            let mut tx = Some(tx);
780            while let Ok(Some(line)) = lines.next_line().await {
781                if line.contains("Ready to start benchmark") {
782                    tx.take()
783                        .expect("Should only send signal once")
784                        .send(())
785                        .expect("failed to send ready signal to main thread");
786                } else {
787                    println!("benchmark{{pid={pid}}} {line}");
788                }
789            }
790        });
791        Ok((child, stdout_handle, stderr_handle))
792    }
793
794    async fn open_chain_internal(
795        &self,
796        from: ChainId,
797        owner: Option<AccountOwner>,
798        initial_balance: Amount,
799        super_owner: bool,
800    ) -> Result<(ChainId, AccountOwner)> {
801        let mut command = self.command().await?;
802        command
803            .arg("open-chain")
804            .args(["--from", &from.to_string()])
805            .args(["--initial-balance", &initial_balance.to_string()]);
806
807        if let Some(owner) = owner {
808            command.args(["--owner", &owner.to_string()]);
809        }
810
811        if super_owner {
812            command.arg("--super-owner");
813        }
814
815        let stdout = command.spawn_and_wait_for_stdout().await?;
816        let mut split = stdout.split('\n');
817        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
818        let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
819        if let Some(owner) = owner {
820            assert_eq!(owner, new_owner);
821        }
822        Ok((chain_id, new_owner))
823    }
824
825    /// Runs `linera open-chain --super-owner`.
826    pub async fn open_chain_super_owner(
827        &self,
828        from: ChainId,
829        owner: Option<AccountOwner>,
830        initial_balance: Amount,
831    ) -> Result<(ChainId, AccountOwner)> {
832        self.open_chain_internal(from, owner, initial_balance, true)
833            .await
834    }
835
836    /// Runs `linera open-chain`.
837    pub async fn open_chain(
838        &self,
839        from: ChainId,
840        owner: Option<AccountOwner>,
841        initial_balance: Amount,
842    ) -> Result<(ChainId, AccountOwner)> {
843        self.open_chain_internal(from, owner, initial_balance, false)
844            .await
845    }
846
847    /// Runs `linera open-chain` then `linera assign`.
848    pub async fn open_and_assign(
849        &self,
850        client: &ClientWrapper,
851        initial_balance: Amount,
852    ) -> Result<ChainId> {
853        let our_chain = self
854            .load_wallet()?
855            .default_chain()
856            .context("no default chain found")?;
857        let owner = client.keygen().await?;
858        let (new_chain, _) = self
859            .open_chain(our_chain, Some(owner), initial_balance)
860            .await?;
861        client.assign(owner, new_chain).await?;
862        Ok(new_chain)
863    }
864
865    pub async fn open_multi_owner_chain(
866        &self,
867        from: ChainId,
868        owners: Vec<AccountOwner>,
869        weights: Vec<u64>,
870        multi_leader_rounds: u32,
871        balance: Amount,
872        base_timeout_ms: u64,
873    ) -> Result<ChainId> {
874        let mut command = self.command().await?;
875        command
876            .arg("open-multi-owner-chain")
877            .args(["--from", &from.to_string()])
878            .arg("--owners")
879            .args(owners.iter().map(AccountOwner::to_string))
880            .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
881        if !weights.is_empty() {
882            command
883                .arg("--owner-weights")
884                .args(weights.iter().map(u64::to_string));
885        };
886        command
887            .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
888            .args(["--initial-balance", &balance.to_string()]);
889
890        let stdout = command.spawn_and_wait_for_stdout().await?;
891        let mut split = stdout.split('\n');
892        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
893
894        Ok(chain_id)
895    }
896
897    pub async fn change_ownership(
898        &self,
899        chain_id: ChainId,
900        super_owners: Vec<AccountOwner>,
901        owners: Vec<AccountOwner>,
902    ) -> Result<()> {
903        let mut command = self.command().await?;
904        command
905            .arg("change-ownership")
906            .args(["--chain-id", &chain_id.to_string()]);
907        if !super_owners.is_empty() {
908            command
909                .arg("--super-owners")
910                .args(super_owners.iter().map(AccountOwner::to_string));
911        }
912        if !owners.is_empty() {
913            command
914                .arg("--owners")
915                .args(owners.iter().map(AccountOwner::to_string));
916        }
917        command.spawn_and_wait_for_stdout().await?;
918        Ok(())
919    }
920
921    /// Runs `linera wallet follow-chain CHAIN_ID`.
922    pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
923        let mut command = self.command().await?;
924        command
925            .args(["wallet", "follow-chain"])
926            .arg(chain_id.to_string());
927        if sync {
928            command.arg("--sync");
929        }
930        command.spawn_and_wait_for_stdout().await?;
931        Ok(())
932    }
933
934    /// Runs `linera wallet forget-chain CHAIN_ID`.
935    pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
936        let mut command = self.command().await?;
937        command
938            .args(["wallet", "forget-chain"])
939            .arg(chain_id.to_string());
940        command.spawn_and_wait_for_stdout().await?;
941        Ok(())
942    }
943
944    pub async fn retry_pending_block(
945        &self,
946        chain_id: Option<ChainId>,
947    ) -> Result<Option<CryptoHash>> {
948        let mut command = self.command().await?;
949        command.arg("retry-pending-block");
950        if let Some(chain_id) = chain_id {
951            command.arg(chain_id.to_string());
952        }
953        let stdout = command.spawn_and_wait_for_stdout().await?;
954        let stdout = stdout.trim();
955        if stdout.is_empty() {
956            Ok(None)
957        } else {
958            Ok(Some(CryptoHash::from_str(stdout)?))
959        }
960    }
961
962    /// Runs `linera publish-data-blob`.
963    pub async fn publish_data_blob(
964        &self,
965        path: &Path,
966        chain_id: Option<ChainId>,
967    ) -> Result<CryptoHash> {
968        let mut command = self.command().await?;
969        command.arg("publish-data-blob").arg(path);
970        if let Some(chain_id) = chain_id {
971            command.arg(chain_id.to_string());
972        }
973        let stdout = command.spawn_and_wait_for_stdout().await?;
974        let stdout = stdout.trim();
975        Ok(CryptoHash::from_str(stdout)?)
976    }
977
978    /// Runs `linera read-data-blob`.
979    pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
980        let mut command = self.command().await?;
981        command.arg("read-data-blob").arg(hash.to_string());
982        if let Some(chain_id) = chain_id {
983            command.arg(chain_id.to_string());
984        }
985        command.spawn_and_wait_for_stdout().await?;
986        Ok(())
987    }
988
989    pub fn load_wallet(&self) -> Result<Wallet> {
990        util::read_json(self.wallet_path())
991    }
992
993    pub fn load_keystore(&self) -> Result<InMemorySigner> {
994        util::read_json(self.keystore_path())
995    }
996
997    pub fn wallet_path(&self) -> PathBuf {
998        self.path_provider.path().join(&self.wallet)
999    }
1000
1001    pub fn keystore_path(&self) -> PathBuf {
1002        self.path_provider.path().join(&self.keystore)
1003    }
1004
1005    pub fn storage_path(&self) -> &str {
1006        &self.storage
1007    }
1008
1009    pub fn get_owner(&self) -> Option<AccountOwner> {
1010        let wallet = self.load_wallet().ok()?;
1011        let chain_id = wallet.default_chain()?;
1012        wallet.get(chain_id)?.owner
1013    }
1014
1015    pub fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
1016        self.load_wallet()
1017            .ok()
1018            .is_some_and(|wallet| wallet.get(chain).is_some())
1019    }
1020
1021    pub async fn set_validator(
1022        &self,
1023        validator_key: &(String, String),
1024        port: usize,
1025        votes: usize,
1026    ) -> Result<()> {
1027        let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1028        self.command()
1029            .await?
1030            .arg("set-validator")
1031            .args(["--public-key", &validator_key.0])
1032            .args(["--account-key", &validator_key.1])
1033            .args(["--address", &address])
1034            .args(["--votes", &votes.to_string()])
1035            .spawn_and_wait_for_stdout()
1036            .await?;
1037        Ok(())
1038    }
1039
1040    pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
1041        self.command()
1042            .await?
1043            .arg("remove-validator")
1044            .args(["--public-key", validator_key])
1045            .spawn_and_wait_for_stdout()
1046            .await?;
1047        Ok(())
1048    }
1049
1050    pub async fn change_validators(
1051        &self,
1052        add_validators: &[(String, String, usize, usize)], // (public_key, account_key, port, votes)
1053        modify_validators: &[(String, String, usize, usize)], // (public_key, account_key, port, votes)
1054        remove_validators: &[String],
1055    ) -> Result<()> {
1056        let mut command = self.command().await?;
1057        command.arg("change-validators");
1058
1059        for (public_key, account_key, port, votes) in add_validators {
1060            let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1061            let validator_spec = format!("{public_key},{account_key},{address},{votes}");
1062            command.args(["--add", &validator_spec]);
1063        }
1064
1065        for (public_key, account_key, port, votes) in modify_validators {
1066            let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1067            let validator_spec = format!("{public_key},{account_key},{address},{votes}");
1068            command.args(["--modify", &validator_spec]);
1069        }
1070
1071        for validator_key in remove_validators {
1072            command.args(["--remove", validator_key]);
1073        }
1074
1075        command.spawn_and_wait_for_stdout().await?;
1076        Ok(())
1077    }
1078
1079    pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
1080        self.command()
1081            .await?
1082            .arg("revoke-epochs")
1083            .arg(epoch.to_string())
1084            .spawn_and_wait_for_stdout()
1085            .await?;
1086        Ok(())
1087    }
1088
1089    /// Runs `linera keygen`.
1090    pub async fn keygen(&self) -> Result<AccountOwner> {
1091        let stdout = self
1092            .command()
1093            .await?
1094            .arg("keygen")
1095            .spawn_and_wait_for_stdout()
1096            .await?;
1097        AccountOwner::from_str(stdout.as_str().trim())
1098    }
1099
1100    /// Returns the default chain.
1101    pub fn default_chain(&self) -> Option<ChainId> {
1102        self.load_wallet().ok()?.default_chain()
1103    }
1104
1105    /// Runs `linera assign`.
1106    pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
1107        let _stdout = self
1108            .command()
1109            .await?
1110            .arg("assign")
1111            .args(["--owner", &owner.to_string()])
1112            .args(["--chain-id", &chain_id.to_string()])
1113            .spawn_and_wait_for_stdout()
1114            .await?;
1115        Ok(())
1116    }
1117
1118    /// Runs `linera set-preferred-owner` for `chain_id`.
1119    pub async fn set_preferred_owner(
1120        &self,
1121        chain_id: ChainId,
1122        owner: Option<AccountOwner>,
1123    ) -> Result<()> {
1124        let mut owner_arg = vec!["--owner".to_string()];
1125        if let Some(owner) = owner {
1126            owner_arg.push(owner.to_string());
1127        };
1128        self.command()
1129            .await?
1130            .arg("set-preferred-owner")
1131            .args(["--chain-id", &chain_id.to_string()])
1132            .args(owner_arg)
1133            .spawn_and_wait_for_stdout()
1134            .await?;
1135        Ok(())
1136    }
1137
1138    pub async fn build_application(
1139        &self,
1140        path: &Path,
1141        name: &str,
1142        is_workspace: bool,
1143    ) -> Result<(PathBuf, PathBuf)> {
1144        Command::new("cargo")
1145            .current_dir(self.path_provider.path())
1146            .arg("build")
1147            .arg("--release")
1148            .args(["--target", "wasm32-unknown-unknown"])
1149            .arg("--manifest-path")
1150            .arg(path.join("Cargo.toml"))
1151            .spawn_and_wait_for_stdout()
1152            .await?;
1153
1154        let release_dir = match is_workspace {
1155            true => path.join("../target/wasm32-unknown-unknown/release"),
1156            false => path.join("target/wasm32-unknown-unknown/release"),
1157        };
1158
1159        let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1160        let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1161
1162        let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1163        let service_size = fs_err::tokio::metadata(&service).await?.len();
1164        tracing::info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1165
1166        Ok((contract, service))
1167    }
1168}
1169
1170impl Drop for ClientWrapper {
1171    fn drop(&mut self) {
1172        use std::process::Command as SyncCommand;
1173
1174        if self.on_drop != OnClientDrop::CloseChains {
1175            return;
1176        }
1177
1178        let Ok(binary_path) = self.binary_path.lock() else {
1179            tracing::error!(
1180                "Failed to close chains because a thread panicked with a lock to `binary_path`"
1181            );
1182            return;
1183        };
1184
1185        let Some(binary_path) = binary_path.as_ref() else {
1186            tracing::warn!(
1187                "Assuming no chains need to be closed, because the command binary was never \
1188                resolved and therefore presumably never called"
1189            );
1190            return;
1191        };
1192
1193        let working_directory = self.path_provider.path();
1194        let mut wallet_show_command = SyncCommand::new(binary_path);
1195
1196        for argument in self.command_arguments() {
1197            wallet_show_command.arg(&*argument);
1198        }
1199
1200        let Ok(wallet_show_output) = wallet_show_command
1201            .current_dir(working_directory)
1202            .args(["wallet", "show", "--short", "--owned"])
1203            .output()
1204        else {
1205            tracing::warn!("Failed to execute `wallet show --short` to list chains to close");
1206            return;
1207        };
1208
1209        if !wallet_show_output.status.success() {
1210            tracing::warn!("Failed to list chains in the wallet to close them");
1211            return;
1212        }
1213
1214        let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1215            tracing::warn!(
1216                "Failed to close chains because `linera wallet show --short` \
1217                returned a non-UTF-8 output"
1218            );
1219            return;
1220        };
1221
1222        let chain_ids = chain_list_string
1223            .split('\n')
1224            .map(|line| line.trim())
1225            .filter(|line| !line.is_empty());
1226
1227        for chain_id in chain_ids {
1228            let mut close_chain_command = SyncCommand::new(binary_path);
1229
1230            for argument in self.command_arguments() {
1231                close_chain_command.arg(&*argument);
1232            }
1233
1234            close_chain_command.current_dir(working_directory);
1235
1236            match close_chain_command.args(["close-chain", chain_id]).status() {
1237                Ok(status) if status.success() => (),
1238                Ok(failure) => tracing::warn!("Failed to close chain {chain_id}: {failure}"),
1239                Err(error) => tracing::warn!("Failed to close chain {chain_id}: {error}"),
1240            }
1241        }
1242    }
1243}
1244
1245#[cfg(with_testing)]
1246impl ClientWrapper {
1247    pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1248        self.build_application(Self::example_path(name)?.as_path(), name, true)
1249            .await
1250    }
1251
1252    pub fn example_path(name: &str) -> Result<PathBuf> {
1253        Ok(env::current_dir()?.join("../examples/").join(name))
1254    }
1255}
1256
1257fn truncate_query_output(input: &str) -> String {
1258    let max_len = 1000;
1259    if input.len() < max_len {
1260        input.to_string()
1261    } else {
1262        format!("{} ...", input.get(..max_len).unwrap())
1263    }
1264}
1265
1266fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1267    let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1268    let max_len = 200;
1269    if query.len() < max_len {
1270        query
1271    } else {
1272        format!("{} ...", query.get(..max_len).unwrap())
1273    }
1274}
1275
1276/// A running node service.
1277pub struct NodeService {
1278    port: u16,
1279    child: Child,
1280}
1281
1282impl NodeService {
1283    fn new(port: u16, child: Child) -> Self {
1284        Self { port, child }
1285    }
1286
1287    pub async fn terminate(mut self) -> Result<()> {
1288        self.child.kill().await.context("terminating node service")
1289    }
1290
1291    pub fn port(&self) -> u16 {
1292        self.port
1293    }
1294
1295    pub fn ensure_is_running(&mut self) -> Result<()> {
1296        self.child.ensure_is_running()
1297    }
1298
1299    pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1300        let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1301        let mut data = self.query_node(query).await?;
1302        Ok(serde_json::from_value(data["processInbox"].take())?)
1303    }
1304
1305    pub async fn sync(&self, chain_id: &ChainId) -> Result<u64> {
1306        let query = format!("mutation {{ sync(chainId: \"{chain_id}\") }}");
1307        let mut data = self.query_node(query).await?;
1308        Ok(serde_json::from_value(data["sync"].take())?)
1309    }
1310
1311    pub async fn transfer(
1312        &self,
1313        chain_id: ChainId,
1314        owner: AccountOwner,
1315        recipient: Account,
1316        amount: Amount,
1317    ) -> Result<CryptoHash> {
1318        let json_owner = owner.to_value();
1319        let json_recipient = recipient.to_value();
1320        let query = format!(
1321            "mutation {{ transfer(\
1322                 chainId: \"{chain_id}\", \
1323                 owner: {json_owner}, \
1324                 recipient: {json_recipient}, \
1325                 amount: \"{amount}\") \
1326             }}"
1327        );
1328        let data = self.query_node(query).await?;
1329        serde_json::from_value(data["transfer"].clone())
1330            .context("missing transfer field in response")
1331    }
1332
1333    pub async fn balance(&self, account: &Account) -> Result<Amount> {
1334        let chain = account.chain_id;
1335        let owner = account.owner;
1336        if matches!(owner, AccountOwner::CHAIN) {
1337            let query = format!(
1338                "query {{ chain(chainId:\"{chain}\") {{
1339                    executionState {{ system {{ balance }} }}
1340                }} }}"
1341            );
1342            let response = self.query_node(query).await?;
1343            let balance = &response["chain"]["executionState"]["system"]["balance"]
1344                .as_str()
1345                .unwrap();
1346            return Ok(Amount::from_str(balance)?);
1347        }
1348        let query = format!(
1349            "query {{ chain(chainId:\"{chain}\") {{
1350                executionState {{ system {{ balances {{
1351                    entry(key:\"{owner}\") {{ value }}
1352                }} }} }}
1353            }} }}"
1354        );
1355        let response = self.query_node(query).await?;
1356        let balances = &response["chain"]["executionState"]["system"]["balances"];
1357        let balance = balances["entry"]["value"].as_str();
1358        match balance {
1359            None => Ok(Amount::ZERO),
1360            Some(amount) => Ok(Amount::from_str(amount)?),
1361        }
1362    }
1363
1364    pub fn make_application<A: ContractAbi>(
1365        &self,
1366        chain_id: &ChainId,
1367        application_id: &ApplicationId<A>,
1368    ) -> Result<ApplicationWrapper<A>> {
1369        let application_id = application_id.forget_abi().to_string();
1370        let link = format!(
1371            "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1372            self.port
1373        );
1374        Ok(ApplicationWrapper::from(link))
1375    }
1376
1377    pub async fn publish_data_blob(
1378        &self,
1379        chain_id: &ChainId,
1380        bytes: Vec<u8>,
1381    ) -> Result<CryptoHash> {
1382        let query = format!(
1383            "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1384            chain_id.to_value(),
1385            bytes.to_value(),
1386        );
1387        let data = self.query_node(query).await?;
1388        serde_json::from_value(data["publishDataBlob"].clone())
1389            .context("missing publishDataBlob field in response")
1390    }
1391
1392    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1393        &self,
1394        chain_id: &ChainId,
1395        contract: PathBuf,
1396        service: PathBuf,
1397        vm_runtime: VmRuntime,
1398    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1399        let contract_code = Bytecode::load_from_file(&contract)?;
1400        let service_code = Bytecode::load_from_file(&service)?;
1401        let query = format!(
1402            "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1403            chain_id.to_value(),
1404            contract_code.to_value(),
1405            service_code.to_value(),
1406            vm_runtime.to_value(),
1407        );
1408        let data = self.query_node(query).await?;
1409        let module_str = data["publishModule"]
1410            .as_str()
1411            .context("module ID not found")?;
1412        let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1413        Ok(module_id.with_abi())
1414    }
1415
1416    pub async fn query_committees(&self, chain_id: &ChainId) -> Result<BTreeMap<Epoch, Committee>> {
1417        let query = format!(
1418            "query {{ chain(chainId:\"{chain_id}\") {{
1419                executionState {{ system {{ committees }} }}
1420            }} }}"
1421        );
1422        let mut response = self.query_node(query).await?;
1423        let committees = response["chain"]["executionState"]["system"]["committees"].take();
1424        Ok(serde_json::from_value(committees)?)
1425    }
1426
1427    pub async fn events_from_index(
1428        &self,
1429        chain_id: &ChainId,
1430        stream_id: &StreamId,
1431        start_index: u32,
1432    ) -> Result<Vec<IndexAndEvent>> {
1433        let query = format!(
1434            "query {{
1435               eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1436               {{ index event }}
1437             }}",
1438            stream_id.to_value()
1439        );
1440        let mut response = self.query_node(query).await?;
1441        let response = response["eventsFromIndex"].take();
1442        Ok(serde_json::from_value(response)?)
1443    }
1444
1445    pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1446        let n_try = 5;
1447        let query = query.as_ref();
1448        for i in 0..n_try {
1449            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1450            let url = format!("http://localhost:{}/", self.port);
1451            let client = reqwest_client();
1452            let result = client
1453                .post(url)
1454                .json(&json!({ "query": query }))
1455                .send()
1456                .await;
1457            if matches!(result, Err(ref error) if error.is_timeout()) {
1458                tracing::warn!(
1459                    "Timeout when sending query {} to the node service",
1460                    truncate_query_output(query)
1461                );
1462                continue;
1463            }
1464            let response = result.with_context(|| {
1465                format!(
1466                    "query_node: failed to post query={}",
1467                    truncate_query_output(query)
1468                )
1469            })?;
1470            ensure!(
1471                response.status().is_success(),
1472                "Query \"{}\" failed: {}",
1473                truncate_query_output(query),
1474                response
1475                    .text()
1476                    .await
1477                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1478            );
1479            let value: Value = response.json().await.context("invalid JSON")?;
1480            if let Some(errors) = value.get("errors") {
1481                tracing::warn!(
1482                    "Query \"{}\" failed: {}",
1483                    truncate_query_output(query),
1484                    errors
1485                );
1486            } else {
1487                return Ok(value["data"].clone());
1488            }
1489        }
1490        bail!(
1491            "Query \"{}\" failed after {} retries.",
1492            truncate_query_output(query),
1493            n_try
1494        );
1495    }
1496
1497    pub async fn create_application<
1498        Abi: ContractAbi,
1499        Parameters: Serialize,
1500        InstantiationArgument: Serialize,
1501    >(
1502        &self,
1503        chain_id: &ChainId,
1504        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1505        parameters: &Parameters,
1506        argument: &InstantiationArgument,
1507        required_application_ids: &[ApplicationId],
1508    ) -> Result<ApplicationId<Abi>> {
1509        let module_id = module_id.forget_abi();
1510        let json_required_applications_ids = required_application_ids
1511            .iter()
1512            .map(ApplicationId::to_string)
1513            .collect::<Vec<_>>()
1514            .to_value();
1515        // Convert to `serde_json::Value` then `async_graphql::Value` via the trait `InputType`.
1516        let new_parameters = serde_json::to_value(parameters)
1517            .context("could not create parameters JSON")?
1518            .to_value();
1519        let new_argument = serde_json::to_value(argument)
1520            .context("could not create argument JSON")?
1521            .to_value();
1522        let query = format!(
1523            "mutation {{ createApplication(\
1524                 chainId: \"{chain_id}\",
1525                 moduleId: \"{module_id}\", \
1526                 parameters: {new_parameters}, \
1527                 instantiationArgument: {new_argument}, \
1528                 requiredApplicationIds: {json_required_applications_ids}) \
1529             }}"
1530        );
1531        let data = self.query_node(query).await?;
1532        let app_id_str = data["createApplication"]
1533            .as_str()
1534            .context("missing createApplication string in response")?
1535            .trim();
1536        Ok(app_id_str
1537            .parse::<ApplicationId>()
1538            .context("invalid application ID")?
1539            .with_abi())
1540    }
1541
1542    /// Obtains the hash and height of the `chain`'s tip block, as known by this node service.
1543    pub async fn chain_tip(&self, chain: ChainId) -> Result<Option<(CryptoHash, BlockHeight)>> {
1544        let query = format!(
1545            r#"query {{ block(chainId: "{chain}") {{
1546                hash
1547                block {{ header {{ height }} }}
1548            }} }}"#
1549        );
1550
1551        let mut response = self.query_node(&query).await?;
1552
1553        match (
1554            mem::take(&mut response["block"]["hash"]),
1555            mem::take(&mut response["block"]["block"]["header"]["height"]),
1556        ) {
1557            (Value::Null, Value::Null) => Ok(None),
1558            (Value::String(hash), Value::Number(height)) => Ok(Some((
1559                hash.parse()
1560                    .context("Received an invalid hash {hash:?} for chain tip")?,
1561                BlockHeight(height.as_u64().unwrap()),
1562            ))),
1563            invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
1564        }
1565    }
1566
1567    /// Subscribes to the node service and returns a stream of notifications about a chain.
1568    pub async fn notifications(
1569        &self,
1570        chain_id: ChainId,
1571    ) -> Result<Pin<Box<impl Stream<Item = Result<Notification>>>>> {
1572        let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
1573        let url = format!("ws://localhost:{}/ws", self.port);
1574        let mut request = url.into_client_request()?;
1575        request.headers_mut().insert(
1576            "Sec-WebSocket-Protocol",
1577            HeaderValue::from_str("graphql-transport-ws")?,
1578        );
1579        let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1580        let init_json = json!({
1581          "type": "connection_init",
1582          "payload": {}
1583        });
1584        websocket.send(init_json.to_string().into()).await?;
1585        let text = websocket
1586            .next()
1587            .await
1588            .context("Failed to establish connection")??
1589            .into_text()?;
1590        ensure!(
1591            text == "{\"type\":\"connection_ack\"}",
1592            "Unexpected response: {text}"
1593        );
1594        let query_json = json!({
1595          "id": "1",
1596          "type": "start",
1597          "payload": {
1598            "query": query,
1599            "variables": {},
1600            "operationName": null
1601          }
1602        });
1603        websocket.send(query_json.to_string().into()).await?;
1604        Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
1605            |message| async {
1606                let text = message.into_text()?;
1607                let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1608                if let Some(errors) = value["payload"].get("errors") {
1609                    bail!("Notification subscription failed: {errors:?}");
1610                }
1611                serde_json::from_value(value["payload"]["data"]["notifications"].clone())
1612                    .context("Failed to deserialize notification")
1613            },
1614        )))
1615    }
1616}
1617
1618/// A running faucet service.
1619pub struct FaucetService {
1620    port: u16,
1621    child: Child,
1622    _temp_dir: tempfile::TempDir,
1623}
1624
1625impl FaucetService {
1626    fn new(port: u16, child: Child, temp_dir: tempfile::TempDir) -> Self {
1627        Self {
1628            port,
1629            child,
1630            _temp_dir: temp_dir,
1631        }
1632    }
1633
1634    pub async fn terminate(mut self) -> Result<()> {
1635        self.child
1636            .kill()
1637            .await
1638            .context("terminating faucet service")
1639    }
1640
1641    pub fn ensure_is_running(&mut self) -> Result<()> {
1642        self.child.ensure_is_running()
1643    }
1644
1645    pub fn instance(&self) -> Faucet {
1646        Faucet::new(format!("http://localhost:{}/", self.port))
1647    }
1648}
1649
1650/// A running `Application` to be queried in GraphQL.
1651pub struct ApplicationWrapper<A> {
1652    uri: String,
1653    _phantom: PhantomData<A>,
1654}
1655
1656impl<A> ApplicationWrapper<A> {
1657    pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
1658        let query = query.as_ref();
1659        let value = self.run_json_query(json!({ "query": query })).await?;
1660        Ok(value["data"].clone())
1661    }
1662
1663    pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
1664        const MAX_RETRIES: usize = 5;
1665
1666        for i in 0.. {
1667            let client = reqwest_client();
1668            let result = client.post(&self.uri).json(&query).send().await;
1669            let response = match result {
1670                Ok(response) => response,
1671                Err(error) if i < MAX_RETRIES => {
1672                    tracing::warn!(
1673                        "Failed to post query \"{}\": {error}; retrying",
1674                        truncate_query_output_serialize(&query),
1675                    );
1676                    continue;
1677                }
1678                Err(error) => {
1679                    let query = truncate_query_output_serialize(&query);
1680                    return Err(error)
1681                        .with_context(|| format!("run_json_query: failed to post query={query}"));
1682                }
1683            };
1684            ensure!(
1685                response.status().is_success(),
1686                "Query \"{}\" failed: {}",
1687                truncate_query_output_serialize(&query),
1688                response
1689                    .text()
1690                    .await
1691                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1692            );
1693            let value: Value = response.json().await.context("invalid JSON")?;
1694            if let Some(errors) = value.get("errors") {
1695                bail!(
1696                    "Query \"{}\" failed: {}",
1697                    truncate_query_output_serialize(&query),
1698                    errors
1699                );
1700            }
1701            return Ok(value);
1702        }
1703        unreachable!()
1704    }
1705
1706    pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
1707        let query = query.as_ref();
1708        self.run_graphql_query(&format!("query {{ {query} }}"))
1709            .await
1710    }
1711
1712    pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
1713        let query = query.as_ref().trim();
1714        let name = query
1715            .split_once(|ch: char| !ch.is_alphanumeric())
1716            .map_or(query, |(name, _)| name);
1717        let data = self.query(query).await?;
1718        serde_json::from_value(data[name].clone())
1719            .with_context(|| format!("{name} field missing in response"))
1720    }
1721
1722    pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
1723        let mutation = mutation.as_ref();
1724        self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
1725            .await
1726    }
1727
1728    pub async fn multiple_mutate(&self, mutations: &[String]) -> Result<Value> {
1729        let mut out = String::from("mutation {\n");
1730        for (index, mutation) in mutations.iter().enumerate() {
1731            out = format!("{}  u{}: {}\n", out, index, mutation);
1732        }
1733        out.push_str("}\n");
1734        self.run_graphql_query(&out).await
1735    }
1736}
1737
1738impl<A> From<String> for ApplicationWrapper<A> {
1739    fn from(uri: String) -> ApplicationWrapper<A> {
1740        ApplicationWrapper {
1741            uri,
1742            _phantom: PhantomData,
1743        }
1744    }
1745}
1746
1747/// Returns the timeout for tests that wait for notifications, either read from the env
1748/// variable `LINERA_TEST_NOTIFICATION_TIMEOUT_MS`, or the default value of 10 seconds.
1749#[cfg(with_testing)]
1750fn notification_timeout() -> Duration {
1751    const NOTIFICATION_TIMEOUT_MS_ENV: &str = "LINERA_TEST_NOTIFICATION_TIMEOUT_MS";
1752    const NOTIFICATION_TIMEOUT_MS_DEFAULT: u64 = 10_000;
1753
1754    match env::var(NOTIFICATION_TIMEOUT_MS_ENV) {
1755        Ok(var) => Duration::from_millis(var.parse().unwrap_or_else(|error| {
1756            panic!("{NOTIFICATION_TIMEOUT_MS_ENV} is not a valid number: {error}")
1757        })),
1758        Err(env::VarError::NotPresent) => Duration::from_millis(NOTIFICATION_TIMEOUT_MS_DEFAULT),
1759        Err(env::VarError::NotUnicode(_)) => {
1760            panic!("{NOTIFICATION_TIMEOUT_MS_ENV} must be valid Unicode")
1761        }
1762    }
1763}
1764
1765#[cfg(with_testing)]
1766pub trait NotificationsExt {
1767    /// Waits for a notification for which `f` returns `Some(t)`, and returns `t`.
1768    fn wait_for<T>(
1769        &mut self,
1770        f: impl FnMut(Notification) -> Option<T>,
1771    ) -> impl Future<Output = Result<T>>;
1772
1773    /// Waits for a `NewEvents` notification for the given block height. If no height is specified,
1774    /// any height is accepted.
1775    fn wait_for_events(
1776        &mut self,
1777        expected_height: impl Into<Option<BlockHeight>>,
1778    ) -> impl Future<Output = Result<BTreeSet<StreamId>>> {
1779        let expected_height = expected_height.into();
1780        self.wait_for(move |notification| {
1781            if let Reason::NewEvents {
1782                height,
1783                event_streams,
1784                ..
1785            } = notification.reason
1786            {
1787                if expected_height.is_none_or(|h| h == height) {
1788                    return Some(event_streams);
1789                }
1790            }
1791            None
1792        })
1793    }
1794
1795    /// Waits for a `NewBlock` notification for the given block height. If no height is specified,
1796    /// any height is accepted.
1797    fn wait_for_block(
1798        &mut self,
1799        expected_height: impl Into<Option<BlockHeight>>,
1800    ) -> impl Future<Output = Result<CryptoHash>> {
1801        let expected_height = expected_height.into();
1802        self.wait_for(move |notification| {
1803            if let Reason::NewBlock { height, hash, .. } = notification.reason {
1804                if expected_height.is_none_or(|h| h == height) {
1805                    return Some(hash);
1806                }
1807            }
1808            None
1809        })
1810    }
1811
1812    /// Waits for a `NewIncomingBundle` notification for the given sender chain and sender block
1813    /// height. If no height is specified, any height is accepted.
1814    fn wait_for_bundle(
1815        &mut self,
1816        expected_origin: ChainId,
1817        expected_height: impl Into<Option<BlockHeight>>,
1818    ) -> impl Future<Output = Result<()>> {
1819        let expected_height = expected_height.into();
1820        self.wait_for(move |notification| {
1821            if let Reason::NewIncomingBundle { height, origin } = notification.reason {
1822                if expected_height.is_none_or(|h| h == height) && origin == expected_origin {
1823                    return Some(());
1824                }
1825            }
1826            None
1827        })
1828    }
1829}
1830
1831#[cfg(with_testing)]
1832impl<S: Stream<Item = Result<Notification>>> NotificationsExt for Pin<Box<S>> {
1833    async fn wait_for<T>(&mut self, mut f: impl FnMut(Notification) -> Option<T>) -> Result<T> {
1834        let mut timeout = Box::pin(linera_base::time::timer::sleep(notification_timeout())).fuse();
1835        loop {
1836            let notification = futures::select! {
1837                () = timeout => bail!("Timeout waiting for notification"),
1838                notification = self.next().fuse() => notification.context("Stream closed")??,
1839            };
1840            if let Some(t) = f(notification) {
1841                return Ok(t);
1842            }
1843        }
1844    }
1845}