linera_service/cli_wrappers/
wallet.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    borrow::Cow,
6    collections::BTreeMap,
7    env,
8    marker::PhantomData,
9    mem,
10    path::{Path, PathBuf},
11    pin::Pin,
12    process::Stdio,
13    str::FromStr,
14    sync,
15    time::Duration,
16};
17
18use anyhow::{bail, ensure, Context, Result};
19use async_graphql::InputType;
20use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
21use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
22use heck::ToKebabCase;
23use linera_base::{
24    abi::ContractAbi,
25    command::{resolve_binary, CommandExt},
26    crypto::{CryptoHash, InMemorySigner},
27    data_types::{Amount, BlockHeight, Bytecode, Epoch},
28    identifiers::{
29        Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
30    },
31    vm::VmRuntime,
32};
33use linera_client::{client_options::ResourceControlPolicyConfig, wallet::Wallet};
34use linera_core::worker::Notification;
35use linera_execution::committee::Committee;
36use linera_faucet_client::Faucet;
37use serde::{de::DeserializeOwned, ser::Serialize};
38use serde_command_opts::to_args;
39use serde_json::{json, Value};
40use tempfile::TempDir;
41use tokio::{
42    io::{AsyncBufReadExt, BufReader},
43    process::{Child, Command},
44    sync::oneshot,
45    task::JoinHandle,
46};
47#[cfg(with_testing)]
48use {
49    futures::FutureExt as _,
50    linera_core::worker::Reason,
51    std::{collections::BTreeSet, future::Future},
52};
53
54use crate::{
55    cli::command::BenchmarkCommand,
56    cli_wrappers::{
57        local_net::{PathProvider, ProcessInbox},
58        Network,
59    },
60    util::{self, ChildExt},
61};
62
63/// The name of the environment variable that allows specifying additional arguments to be passed
64/// to the node-service command of the client.
65const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
66
67fn reqwest_client() -> reqwest::Client {
68    reqwest::ClientBuilder::new()
69        .timeout(Duration::from_secs(30))
70        .build()
71        .unwrap()
72}
73
74/// Wrapper to run a Linera client command.
75pub struct ClientWrapper {
76    binary_path: sync::Mutex<Option<PathBuf>>,
77    testing_prng_seed: Option<u64>,
78    storage: String,
79    wallet: String,
80    keystore: String,
81    max_pending_message_bundles: usize,
82    network: Network,
83    pub path_provider: PathProvider,
84    on_drop: OnClientDrop,
85    extra_args: Vec<String>,
86}
87
88/// Action to perform when the [`ClientWrapper`] is dropped.
89#[derive(Clone, Copy, Debug, Eq, PartialEq)]
90pub enum OnClientDrop {
91    /// Close all the chains on the wallet.
92    CloseChains,
93    /// Do not close any chains, leaving them active.
94    LeakChains,
95}
96
97impl ClientWrapper {
98    pub fn new(
99        path_provider: PathProvider,
100        network: Network,
101        testing_prng_seed: Option<u64>,
102        id: usize,
103        on_drop: OnClientDrop,
104    ) -> Self {
105        Self::new_with_extra_args(
106            path_provider,
107            network,
108            testing_prng_seed,
109            id,
110            on_drop,
111            vec!["--wait-for-outgoing-messages".to_string()],
112        )
113    }
114
115    pub fn new_with_extra_args(
116        path_provider: PathProvider,
117        network: Network,
118        testing_prng_seed: Option<u64>,
119        id: usize,
120        on_drop: OnClientDrop,
121        extra_args: Vec<String>,
122    ) -> Self {
123        let storage = format!(
124            "rocksdb:{}/client_{}.db",
125            path_provider.path().display(),
126            id
127        );
128        let wallet = format!("wallet_{}.json", id);
129        let keystore = format!("keystore_{}.json", id);
130        Self {
131            binary_path: sync::Mutex::new(None),
132            testing_prng_seed,
133            storage,
134            wallet,
135            keystore,
136            max_pending_message_bundles: 10_000,
137            network,
138            path_provider,
139            on_drop,
140            extra_args,
141        }
142    }
143
144    /// Runs `linera project new`.
145    pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
146        let tmp = TempDir::new()?;
147        let mut command = self.command().await?;
148        command
149            .current_dir(tmp.path())
150            .arg("project")
151            .arg("new")
152            .arg(project_name)
153            .arg("--linera-root")
154            .arg(linera_root)
155            .spawn_and_wait_for_stdout()
156            .await?;
157        Ok(tmp)
158    }
159
160    /// Runs `linera project publish`.
161    pub async fn project_publish<T: Serialize>(
162        &self,
163        path: PathBuf,
164        required_application_ids: Vec<String>,
165        publisher: impl Into<Option<ChainId>>,
166        argument: &T,
167    ) -> Result<String> {
168        let json_parameters = serde_json::to_string(&())?;
169        let json_argument = serde_json::to_string(argument)?;
170        let mut command = self.command().await?;
171        command
172            .arg("project")
173            .arg("publish-and-create")
174            .arg(path)
175            .args(publisher.into().iter().map(ChainId::to_string))
176            .args(["--json-parameters", &json_parameters])
177            .args(["--json-argument", &json_argument]);
178        if !required_application_ids.is_empty() {
179            command.arg("--required-application-ids");
180            command.args(required_application_ids);
181        }
182        let stdout = command.spawn_and_wait_for_stdout().await?;
183        Ok(stdout.trim().to_string())
184    }
185
186    /// Runs `linera project test`.
187    pub async fn project_test(&self, path: &Path) -> Result<()> {
188        self.command()
189            .await
190            .context("failed to create project test command")?
191            .current_dir(path)
192            .arg("project")
193            .arg("test")
194            .spawn_and_wait_for_stdout()
195            .await?;
196        Ok(())
197    }
198
199    async fn command_with_envs_and_arguments(
200        &self,
201        envs: &[(&str, &str)],
202        arguments: impl IntoIterator<Item = Cow<'_, str>>,
203    ) -> Result<Command> {
204        let mut command = self.command_binary().await?;
205        command.current_dir(self.path_provider.path());
206        for (key, value) in envs {
207            command.env(key, value);
208        }
209        for argument in arguments {
210            command.arg(&*argument);
211        }
212        Ok(command)
213    }
214
215    async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
216        self.command_with_envs_and_arguments(envs, self.command_arguments())
217            .await
218    }
219
220    async fn command_with_arguments(
221        &self,
222        arguments: impl IntoIterator<Item = Cow<'_, str>>,
223    ) -> Result<Command> {
224        self.command_with_envs_and_arguments(
225            &[(
226                "RUST_LOG",
227                &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
228            )],
229            arguments,
230        )
231        .await
232    }
233
234    async fn command(&self) -> Result<Command> {
235        self.command_with_envs(&[(
236            "RUST_LOG",
237            &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
238        )])
239        .await
240    }
241
242    fn required_command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
243        [
244            "--wallet".into(),
245            self.wallet.as_str().into(),
246            "--keystore".into(),
247            self.keystore.as_str().into(),
248            "--storage".into(),
249            self.storage.as_str().into(),
250            "--send-timeout-ms".into(),
251            "500000".into(),
252            "--recv-timeout-ms".into(),
253            "500000".into(),
254        ]
255        .into_iter()
256        .chain(self.extra_args.iter().map(|s| s.as_str().into()))
257    }
258
259    /// Returns an iterator over the arguments that should be added to all command invocations.
260    fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
261        self.required_command_arguments().chain([
262            "--max-pending-message-bundles".into(),
263            self.max_pending_message_bundles.to_string().into(),
264        ])
265    }
266
267    /// Returns the [`Command`] instance configured to run the appropriate binary.
268    ///
269    /// The path is resolved once and cached inside `self` for subsequent usages.
270    async fn command_binary(&self) -> Result<Command> {
271        match self.command_with_cached_binary_path() {
272            Some(command) => Ok(command),
273            None => {
274                let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
275                let command = Command::new(&resolved_path);
276
277                self.set_cached_binary_path(resolved_path);
278
279                Ok(command)
280            }
281        }
282    }
283
284    /// Returns a [`Command`] instance configured with the cached `binary_path`, if available.
285    fn command_with_cached_binary_path(&self) -> Option<Command> {
286        let binary_path = self.binary_path.lock().unwrap();
287
288        binary_path.as_ref().map(Command::new)
289    }
290
291    /// Sets the cached `binary_path` with the `new_binary_path`.
292    ///
293    /// # Panics
294    ///
295    /// If the cache is already set to a different value. In theory the two threads calling
296    /// `command_binary` can race and resolve the binary path twice, but they should always be the
297    /// same path.
298    fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
299        let mut binary_path = self.binary_path.lock().unwrap();
300
301        if binary_path.is_none() {
302            *binary_path = Some(new_binary_path);
303        } else {
304            assert_eq!(*binary_path, Some(new_binary_path));
305        }
306    }
307
308    /// Runs `linera create-genesis-config`.
309    pub async fn create_genesis_config(
310        &self,
311        num_other_initial_chains: u32,
312        initial_funding: Amount,
313        policy_config: ResourceControlPolicyConfig,
314        http_allow_list: Option<Vec<String>>,
315    ) -> Result<()> {
316        let mut command = self.command().await?;
317        command
318            .args([
319                "create-genesis-config",
320                &num_other_initial_chains.to_string(),
321            ])
322            .args(["--initial-funding", &initial_funding.to_string()])
323            .args(["--committee", "committee.json"])
324            .args(["--genesis", "genesis.json"])
325            .args([
326                "--policy-config",
327                &policy_config.to_string().to_kebab_case(),
328            ]);
329        if let Some(allow_list) = http_allow_list {
330            command
331                .arg("--http-request-allow-list")
332                .arg(allow_list.join(","));
333        }
334        if let Some(seed) = self.testing_prng_seed {
335            command.arg("--testing-prng-seed").arg(seed.to_string());
336        }
337        command.spawn_and_wait_for_stdout().await?;
338        Ok(())
339    }
340
341    /// Runs `linera wallet init`. The genesis config is read from `genesis.json`, or from the
342    /// faucet if provided.
343    pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
344        let mut command = self.command().await?;
345        command.args(["wallet", "init"]);
346        match faucet {
347            None => command.args(["--genesis", "genesis.json"]),
348            Some(faucet) => command.args(["--faucet", faucet.url()]),
349        };
350        if let Some(seed) = self.testing_prng_seed {
351            command.arg("--testing-prng-seed").arg(seed.to_string());
352        }
353        command.spawn_and_wait_for_stdout().await?;
354        Ok(())
355    }
356
357    /// Runs `linera wallet request-chain`.
358    pub async fn request_chain(
359        &self,
360        faucet: &Faucet,
361        set_default: bool,
362    ) -> Result<(ChainId, AccountOwner)> {
363        let mut command = self.command().await?;
364        command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
365        if set_default {
366            command.arg("--set-default");
367        }
368        let stdout = command.spawn_and_wait_for_stdout().await?;
369        let mut lines = stdout.split_whitespace();
370        let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
371        let owner = lines.next().context("missing chain owner")?.parse()?;
372        Ok((chain_id, owner))
373    }
374
375    /// Runs `linera wallet publish-and-create`.
376    #[expect(clippy::too_many_arguments)]
377    pub async fn publish_and_create<
378        A: ContractAbi,
379        Parameters: Serialize,
380        InstantiationArgument: Serialize,
381    >(
382        &self,
383        contract: PathBuf,
384        service: PathBuf,
385        vm_runtime: VmRuntime,
386        parameters: &Parameters,
387        argument: &InstantiationArgument,
388        required_application_ids: &[ApplicationId],
389        publisher: impl Into<Option<ChainId>>,
390    ) -> Result<ApplicationId<A>> {
391        let json_parameters = serde_json::to_string(parameters)?;
392        let json_argument = serde_json::to_string(argument)?;
393        let mut command = self.command().await?;
394        let vm_runtime = format!("{}", vm_runtime);
395        command
396            .arg("publish-and-create")
397            .args([contract, service])
398            .args(["--vm-runtime", &vm_runtime.to_lowercase()])
399            .args(publisher.into().iter().map(ChainId::to_string))
400            .args(["--json-parameters", &json_parameters])
401            .args(["--json-argument", &json_argument]);
402        if !required_application_ids.is_empty() {
403            command.arg("--required-application-ids");
404            command.args(
405                required_application_ids
406                    .iter()
407                    .map(ApplicationId::to_string),
408            );
409        }
410        let stdout = command.spawn_and_wait_for_stdout().await?;
411        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
412    }
413
414    /// Runs `linera publish-module`.
415    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
416        &self,
417        contract: PathBuf,
418        service: PathBuf,
419        vm_runtime: VmRuntime,
420        publisher: impl Into<Option<ChainId>>,
421    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
422        let stdout = self
423            .command()
424            .await?
425            .arg("publish-module")
426            .args([contract, service])
427            .args(["--vm-runtime", &format!("{}", vm_runtime).to_lowercase()])
428            .args(publisher.into().iter().map(ChainId::to_string))
429            .spawn_and_wait_for_stdout()
430            .await?;
431        let module_id: ModuleId = stdout.trim().parse()?;
432        Ok(module_id.with_abi())
433    }
434
435    /// Runs `linera create-application`.
436    pub async fn create_application<
437        Abi: ContractAbi,
438        Parameters: Serialize,
439        InstantiationArgument: Serialize,
440    >(
441        &self,
442        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
443        parameters: &Parameters,
444        argument: &InstantiationArgument,
445        required_application_ids: &[ApplicationId],
446        creator: impl Into<Option<ChainId>>,
447    ) -> Result<ApplicationId<Abi>> {
448        let json_parameters = serde_json::to_string(parameters)?;
449        let json_argument = serde_json::to_string(argument)?;
450        let mut command = self.command().await?;
451        command
452            .arg("create-application")
453            .arg(module_id.forget_abi().to_string())
454            .args(["--json-parameters", &json_parameters])
455            .args(["--json-argument", &json_argument])
456            .args(creator.into().iter().map(ChainId::to_string));
457        if !required_application_ids.is_empty() {
458            command.arg("--required-application-ids");
459            command.args(
460                required_application_ids
461                    .iter()
462                    .map(ApplicationId::to_string),
463            );
464        }
465        let stdout = command.spawn_and_wait_for_stdout().await?;
466        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
467    }
468
469    /// Runs `linera service`.
470    pub async fn run_node_service(
471        &self,
472        port: impl Into<Option<u16>>,
473        process_inbox: ProcessInbox,
474    ) -> Result<NodeService> {
475        let port = port.into().unwrap_or(8080);
476        let mut command = self.command().await?;
477        command.arg("service");
478        if let ProcessInbox::Skip = process_inbox {
479            command.arg("--listener-skip-process-inbox");
480        }
481        if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
482            command.args(var.split_whitespace());
483        }
484        let child = command
485            .args(["--port".to_string(), port.to_string()])
486            .spawn_into()?;
487        let client = reqwest_client();
488        for i in 0..10 {
489            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
490            let request = client
491                .get(format!("http://localhost:{}/", port))
492                .send()
493                .await;
494            if request.is_ok() {
495                tracing::info!("Node service has started");
496                return Ok(NodeService::new(port, child));
497            } else {
498                tracing::warn!("Waiting for node service to start");
499            }
500        }
501        bail!("Failed to start node service");
502    }
503
504    /// Runs `linera validator query`
505    pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
506        let mut command = self.command().await?;
507        command.arg("validator").arg("query").arg(address);
508        let stdout = command.spawn_and_wait_for_stdout().await?;
509
510        // Parse the genesis config hash from the output.
511        // It's on a line like "Genesis config hash: <hash>"
512        let hash = stdout
513            .lines()
514            .find_map(|line| {
515                line.strip_prefix("Genesis config hash: ")
516                    .and_then(|hash_str| hash_str.trim().parse().ok())
517            })
518            .context("error while parsing the result of `linera validator query`")?;
519        Ok(hash)
520    }
521
522    /// Runs `linera validator list`.
523    pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
524        let mut command = self.command().await?;
525        command.arg("validator").arg("list");
526        if let Some(chain_id) = chain_id {
527            command.args(["--chain-id", &chain_id.to_string()]);
528        }
529        command.spawn_and_wait_for_stdout().await?;
530        Ok(())
531    }
532
533    /// Runs `linera sync-validator`.
534    pub async fn sync_validator(
535        &self,
536        chain_ids: impl IntoIterator<Item = &ChainId>,
537        validator_address: impl Into<String>,
538    ) -> Result<()> {
539        let mut command = self.command().await?;
540        command
541            .arg("validator")
542            .arg("sync")
543            .arg(validator_address.into());
544        let mut chain_ids = chain_ids.into_iter().peekable();
545        if chain_ids.peek().is_some() {
546            command
547                .arg("--chains")
548                .args(chain_ids.map(ChainId::to_string));
549        }
550        command.spawn_and_wait_for_stdout().await?;
551        Ok(())
552    }
553
554    /// Runs `linera faucet`.
555    pub async fn run_faucet(
556        &self,
557        port: impl Into<Option<u16>>,
558        chain_id: Option<ChainId>,
559        amount: Amount,
560    ) -> Result<FaucetService> {
561        let port = port.into().unwrap_or(8080);
562        let temp_dir = tempfile::tempdir()
563            .context("Failed to create temporary directory for faucet storage")?;
564        let storage_path = temp_dir.path().join("faucet_storage.sqlite");
565        let mut command = self.command().await?;
566        let command = command
567            .arg("faucet")
568            .args(["--port".to_string(), port.to_string()])
569            .args(["--amount".to_string(), amount.to_string()])
570            .args([
571                "--storage-path".to_string(),
572                storage_path.to_string_lossy().to_string(),
573            ]);
574        if let Some(chain_id) = chain_id {
575            command.arg(chain_id.to_string());
576        }
577        let child = command.spawn_into()?;
578        let client = reqwest_client();
579        for i in 0..10 {
580            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
581            let request = client
582                .get(format!("http://localhost:{}/", port))
583                .send()
584                .await;
585            if request.is_ok() {
586                tracing::info!("Faucet has started");
587                return Ok(FaucetService::new(port, child, temp_dir));
588            } else {
589                tracing::debug!("Waiting for faucet to start");
590            }
591        }
592        bail!("Failed to start faucet");
593    }
594
595    /// Runs `linera local-balance`.
596    pub async fn local_balance(&self, account: Account) -> Result<Amount> {
597        let stdout = self
598            .command()
599            .await?
600            .arg("local-balance")
601            .arg(account.to_string())
602            .spawn_and_wait_for_stdout()
603            .await?;
604        let amount = stdout
605            .trim()
606            .parse()
607            .context("error while parsing the result of `linera local-balance`")?;
608        Ok(amount)
609    }
610
611    /// Runs `linera query-balance`.
612    pub async fn query_balance(&self, account: Account) -> Result<Amount> {
613        let stdout = self
614            .command()
615            .await?
616            .arg("query-balance")
617            .arg(account.to_string())
618            .spawn_and_wait_for_stdout()
619            .await?;
620        let amount = stdout
621            .trim()
622            .parse()
623            .context("error while parsing the result of `linera query-balance`")?;
624        Ok(amount)
625    }
626
627    /// Runs `linera sync`.
628    pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
629        self.command()
630            .await?
631            .arg("sync")
632            .arg(chain_id.to_string())
633            .spawn_and_wait_for_stdout()
634            .await?;
635        Ok(())
636    }
637
638    /// Runs `linera process-inbox`.
639    pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
640        self.command()
641            .await?
642            .arg("process-inbox")
643            .arg(chain_id.to_string())
644            .spawn_and_wait_for_stdout()
645            .await?;
646        Ok(())
647    }
648
649    /// Runs `linera transfer`.
650    pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
651        self.command()
652            .await?
653            .arg("transfer")
654            .arg(amount.to_string())
655            .args(["--from", &from.to_string()])
656            .args(["--to", &to.to_string()])
657            .spawn_and_wait_for_stdout()
658            .await?;
659        Ok(())
660    }
661
662    /// Runs `linera transfer` with no logging.
663    pub async fn transfer_with_silent_logs(
664        &self,
665        amount: Amount,
666        from: ChainId,
667        to: ChainId,
668    ) -> Result<()> {
669        self.command()
670            .await?
671            .env("RUST_LOG", "off")
672            .arg("transfer")
673            .arg(amount.to_string())
674            .args(["--from", &from.to_string()])
675            .args(["--to", &to.to_string()])
676            .spawn_and_wait_for_stdout()
677            .await?;
678        Ok(())
679    }
680
681    /// Runs `linera transfer` with owner accounts.
682    pub async fn transfer_with_accounts(
683        &self,
684        amount: Amount,
685        from: Account,
686        to: Account,
687    ) -> Result<()> {
688        self.command()
689            .await?
690            .arg("transfer")
691            .arg(amount.to_string())
692            .args(["--from", &from.to_string()])
693            .args(["--to", &to.to_string()])
694            .spawn_and_wait_for_stdout()
695            .await?;
696        Ok(())
697    }
698
699    fn benchmark_command_internal(command: &mut Command, args: BenchmarkCommand) -> Result<()> {
700        let mut formatted_args = to_args(&args)?;
701        let subcommand = formatted_args.remove(0);
702        // The subcommand is followed by the flattened options, which are preceded by "options".
703        // So remove that as well.
704        formatted_args.remove(0);
705        let options = formatted_args
706            .chunks_exact(2)
707            .flat_map(|pair| {
708                let option = format!("--{}", pair[0]);
709                match pair[1].as_str() {
710                    "true" => vec![option],
711                    "false" => vec![],
712                    _ => vec![option, pair[1].clone()],
713                }
714            })
715            .collect::<Vec<_>>();
716        command
717            .args([
718                "--max-pending-message-bundles",
719                &args.transactions_per_block().to_string(),
720            ])
721            .arg("benchmark")
722            .arg(subcommand)
723            .args(options);
724        Ok(())
725    }
726
727    async fn benchmark_command_with_envs(
728        &self,
729        args: BenchmarkCommand,
730        envs: &[(&str, &str)],
731    ) -> Result<Command> {
732        let mut command = self
733            .command_with_envs_and_arguments(envs, self.required_command_arguments())
734            .await?;
735        Self::benchmark_command_internal(&mut command, args)?;
736        Ok(command)
737    }
738
739    async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
740        let mut command = self
741            .command_with_arguments(self.required_command_arguments())
742            .await?;
743        Self::benchmark_command_internal(&mut command, args)?;
744        Ok(command)
745    }
746
747    /// Runs `linera benchmark`.
748    pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
749        let mut command = self.benchmark_command(args).await?;
750        command.spawn_and_wait_for_stdout().await?;
751        Ok(())
752    }
753
754    /// Runs `linera benchmark`, but detached: don't wait for the command to finish, just spawn it
755    /// and return the child process, and the handles to the stdout and stderr.
756    pub async fn benchmark_detached(
757        &self,
758        args: BenchmarkCommand,
759        tx: oneshot::Sender<()>,
760    ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
761        let mut child = self
762            .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
763            .await?
764            .kill_on_drop(true)
765            .stdin(Stdio::piped())
766            .stdout(Stdio::piped())
767            .stderr(Stdio::piped())
768            .spawn()?;
769
770        let pid = child.id().expect("failed to get pid");
771        let stdout = child.stdout.take().expect("stdout not open");
772        let stdout_handle = tokio::spawn(async move {
773            let mut lines = BufReader::new(stdout).lines();
774            while let Ok(Some(line)) = lines.next_line().await {
775                println!("benchmark{{pid={pid}}} {line}");
776            }
777        });
778
779        let stderr = child.stderr.take().expect("stderr not open");
780        let stderr_handle = tokio::spawn(async move {
781            let mut lines = BufReader::new(stderr).lines();
782            let mut tx = Some(tx);
783            while let Ok(Some(line)) = lines.next_line().await {
784                if line.contains("Ready to start benchmark") {
785                    tx.take()
786                        .expect("Should only send signal once")
787                        .send(())
788                        .expect("failed to send ready signal to main thread");
789                } else {
790                    println!("benchmark{{pid={pid}}} {line}");
791                }
792            }
793        });
794        Ok((child, stdout_handle, stderr_handle))
795    }
796
797    async fn open_chain_internal(
798        &self,
799        from: ChainId,
800        owner: Option<AccountOwner>,
801        initial_balance: Amount,
802        super_owner: bool,
803    ) -> Result<(ChainId, AccountOwner)> {
804        let mut command = self.command().await?;
805        command
806            .arg("open-chain")
807            .args(["--from", &from.to_string()])
808            .args(["--initial-balance", &initial_balance.to_string()]);
809
810        if let Some(owner) = owner {
811            command.args(["--owner", &owner.to_string()]);
812        }
813
814        if super_owner {
815            command.arg("--super-owner");
816        }
817
818        let stdout = command.spawn_and_wait_for_stdout().await?;
819        let mut split = stdout.split('\n');
820        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
821        let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
822        if let Some(owner) = owner {
823            assert_eq!(owner, new_owner);
824        }
825        Ok((chain_id, new_owner))
826    }
827
828    /// Runs `linera open-chain --super-owner`.
829    pub async fn open_chain_super_owner(
830        &self,
831        from: ChainId,
832        owner: Option<AccountOwner>,
833        initial_balance: Amount,
834    ) -> Result<(ChainId, AccountOwner)> {
835        self.open_chain_internal(from, owner, initial_balance, true)
836            .await
837    }
838
839    /// Runs `linera open-chain`.
840    pub async fn open_chain(
841        &self,
842        from: ChainId,
843        owner: Option<AccountOwner>,
844        initial_balance: Amount,
845    ) -> Result<(ChainId, AccountOwner)> {
846        self.open_chain_internal(from, owner, initial_balance, false)
847            .await
848    }
849
850    /// Runs `linera open-chain` then `linera assign`.
851    pub async fn open_and_assign(
852        &self,
853        client: &ClientWrapper,
854        initial_balance: Amount,
855    ) -> Result<ChainId> {
856        let our_chain = self
857            .load_wallet()?
858            .default_chain()
859            .context("no default chain found")?;
860        let owner = client.keygen().await?;
861        let (new_chain, _) = self
862            .open_chain(our_chain, Some(owner), initial_balance)
863            .await?;
864        client.assign(owner, new_chain).await?;
865        Ok(new_chain)
866    }
867
868    pub async fn open_multi_owner_chain(
869        &self,
870        from: ChainId,
871        owners: Vec<AccountOwner>,
872        weights: Vec<u64>,
873        multi_leader_rounds: u32,
874        balance: Amount,
875        base_timeout_ms: u64,
876    ) -> Result<ChainId> {
877        let mut command = self.command().await?;
878        command
879            .arg("open-multi-owner-chain")
880            .args(["--from", &from.to_string()])
881            .arg("--owners")
882            .args(owners.iter().map(AccountOwner::to_string))
883            .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
884        if !weights.is_empty() {
885            command
886                .arg("--owner-weights")
887                .args(weights.iter().map(u64::to_string));
888        };
889        command
890            .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
891            .args(["--initial-balance", &balance.to_string()]);
892
893        let stdout = command.spawn_and_wait_for_stdout().await?;
894        let mut split = stdout.split('\n');
895        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
896
897        Ok(chain_id)
898    }
899
900    pub async fn change_ownership(
901        &self,
902        chain_id: ChainId,
903        super_owners: Vec<AccountOwner>,
904        owners: Vec<AccountOwner>,
905    ) -> Result<()> {
906        let mut command = self.command().await?;
907        command
908            .arg("change-ownership")
909            .args(["--chain-id", &chain_id.to_string()]);
910        if !super_owners.is_empty() {
911            command
912                .arg("--super-owners")
913                .args(super_owners.iter().map(AccountOwner::to_string));
914        }
915        if !owners.is_empty() {
916            command
917                .arg("--owners")
918                .args(owners.iter().map(AccountOwner::to_string));
919        }
920        command.spawn_and_wait_for_stdout().await?;
921        Ok(())
922    }
923
924    /// Runs `linera wallet follow-chain CHAIN_ID`.
925    pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
926        let mut command = self.command().await?;
927        command
928            .args(["wallet", "follow-chain"])
929            .arg(chain_id.to_string());
930        if sync {
931            command.arg("--sync");
932        }
933        command.spawn_and_wait_for_stdout().await?;
934        Ok(())
935    }
936
937    /// Runs `linera wallet forget-chain CHAIN_ID`.
938    pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
939        let mut command = self.command().await?;
940        command
941            .args(["wallet", "forget-chain"])
942            .arg(chain_id.to_string());
943        command.spawn_and_wait_for_stdout().await?;
944        Ok(())
945    }
946
947    pub async fn retry_pending_block(
948        &self,
949        chain_id: Option<ChainId>,
950    ) -> Result<Option<CryptoHash>> {
951        let mut command = self.command().await?;
952        command.arg("retry-pending-block");
953        if let Some(chain_id) = chain_id {
954            command.arg(chain_id.to_string());
955        }
956        let stdout = command.spawn_and_wait_for_stdout().await?;
957        let stdout = stdout.trim();
958        if stdout.is_empty() {
959            Ok(None)
960        } else {
961            Ok(Some(CryptoHash::from_str(stdout)?))
962        }
963    }
964
965    /// Runs `linera publish-data-blob`.
966    pub async fn publish_data_blob(
967        &self,
968        path: &Path,
969        chain_id: Option<ChainId>,
970    ) -> Result<CryptoHash> {
971        let mut command = self.command().await?;
972        command.arg("publish-data-blob").arg(path);
973        if let Some(chain_id) = chain_id {
974            command.arg(chain_id.to_string());
975        }
976        let stdout = command.spawn_and_wait_for_stdout().await?;
977        let stdout = stdout.trim();
978        Ok(CryptoHash::from_str(stdout)?)
979    }
980
981    /// Runs `linera read-data-blob`.
982    pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
983        let mut command = self.command().await?;
984        command.arg("read-data-blob").arg(hash.to_string());
985        if let Some(chain_id) = chain_id {
986            command.arg(chain_id.to_string());
987        }
988        command.spawn_and_wait_for_stdout().await?;
989        Ok(())
990    }
991
992    pub fn load_wallet(&self) -> Result<Wallet> {
993        util::read_json(self.wallet_path())
994    }
995
996    pub fn load_keystore(&self) -> Result<InMemorySigner> {
997        util::read_json(self.keystore_path())
998    }
999
1000    pub fn wallet_path(&self) -> PathBuf {
1001        self.path_provider.path().join(&self.wallet)
1002    }
1003
1004    pub fn keystore_path(&self) -> PathBuf {
1005        self.path_provider.path().join(&self.keystore)
1006    }
1007
1008    pub fn storage_path(&self) -> &str {
1009        &self.storage
1010    }
1011
1012    pub fn get_owner(&self) -> Option<AccountOwner> {
1013        let wallet = self.load_wallet().ok()?;
1014        let chain_id = wallet.default_chain()?;
1015        wallet.get(chain_id)?.owner
1016    }
1017
1018    pub fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
1019        self.load_wallet()
1020            .ok()
1021            .is_some_and(|wallet| wallet.get(chain).is_some())
1022    }
1023
1024    pub async fn set_validator(
1025        &self,
1026        validator_key: &(String, String),
1027        port: usize,
1028        votes: usize,
1029    ) -> Result<()> {
1030        let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1031        self.command()
1032            .await?
1033            .arg("validator")
1034            .arg("add")
1035            .args(["--public-key", &validator_key.0])
1036            .args(["--account-key", &validator_key.1])
1037            .args(["--address", &address])
1038            .args(["--votes", &votes.to_string()])
1039            .spawn_and_wait_for_stdout()
1040            .await?;
1041        Ok(())
1042    }
1043
1044    pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
1045        self.command()
1046            .await?
1047            .arg("validator")
1048            .arg("remove")
1049            .args(["--public-key", validator_key])
1050            .spawn_and_wait_for_stdout()
1051            .await?;
1052        Ok(())
1053    }
1054
1055    pub async fn change_validators(
1056        &self,
1057        add_validators: &[(String, String, usize, usize)], // (public_key, account_key, port, votes)
1058        modify_validators: &[(String, String, usize, usize)], // (public_key, account_key, port, votes)
1059        remove_validators: &[String],
1060    ) -> Result<()> {
1061        use std::str::FromStr;
1062
1063        use linera_base::crypto::{AccountPublicKey, ValidatorPublicKey};
1064
1065        // Build a map that will be serialized to JSON
1066        // Use the exact types that deserialization expects
1067        let mut changes = std::collections::HashMap::new();
1068
1069        // Add/modify validators
1070        for (public_key_str, account_key_str, port, votes) in
1071            add_validators.iter().chain(modify_validators.iter())
1072        {
1073            let public_key = ValidatorPublicKey::from_str(public_key_str)
1074                .with_context(|| format!("Invalid validator public key: {}", public_key_str))?;
1075
1076            let account_key = AccountPublicKey::from_str(account_key_str)
1077                .with_context(|| format!("Invalid account public key: {}", account_key_str))?;
1078
1079            let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1080
1081            // Create ValidatorChange struct
1082            let change = crate::cli::validator::ValidatorChange {
1083                account_key,
1084                network_address: address,
1085                votes: std::num::NonZero::new(*votes as u64).context("Votes must be non-zero")?,
1086            };
1087
1088            changes.insert(public_key, Some(change));
1089        }
1090
1091        // Remove validators (set to None)
1092        for validator_key_str in remove_validators {
1093            let public_key = ValidatorPublicKey::from_str(validator_key_str)
1094                .with_context(|| format!("Invalid validator public key: {}", validator_key_str))?;
1095            changes.insert(public_key, None);
1096        }
1097
1098        // Create temporary file with JSON
1099        let temp_file = tempfile::NamedTempFile::new()
1100            .context("Failed to create temporary file for validator changes")?;
1101        serde_json::to_writer(&temp_file, &changes)
1102            .context("Failed to write validator changes to file")?;
1103        let temp_path = temp_file.path();
1104
1105        self.command()
1106            .await?
1107            .arg("validator")
1108            .arg("update")
1109            .arg(temp_path)
1110            .arg("--yes") // Skip confirmation prompt
1111            .spawn_and_wait_for_stdout()
1112            .await?;
1113
1114        Ok(())
1115    }
1116
1117    pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
1118        self.command()
1119            .await?
1120            .arg("revoke-epochs")
1121            .arg(epoch.to_string())
1122            .spawn_and_wait_for_stdout()
1123            .await?;
1124        Ok(())
1125    }
1126
1127    /// Runs `linera keygen`.
1128    pub async fn keygen(&self) -> Result<AccountOwner> {
1129        let stdout = self
1130            .command()
1131            .await?
1132            .arg("keygen")
1133            .spawn_and_wait_for_stdout()
1134            .await?;
1135        AccountOwner::from_str(stdout.as_str().trim())
1136    }
1137
1138    /// Returns the default chain.
1139    pub fn default_chain(&self) -> Option<ChainId> {
1140        self.load_wallet().ok()?.default_chain()
1141    }
1142
1143    /// Runs `linera assign`.
1144    pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
1145        let _stdout = self
1146            .command()
1147            .await?
1148            .arg("assign")
1149            .args(["--owner", &owner.to_string()])
1150            .args(["--chain-id", &chain_id.to_string()])
1151            .spawn_and_wait_for_stdout()
1152            .await?;
1153        Ok(())
1154    }
1155
1156    /// Runs `linera set-preferred-owner` for `chain_id`.
1157    pub async fn set_preferred_owner(
1158        &self,
1159        chain_id: ChainId,
1160        owner: Option<AccountOwner>,
1161    ) -> Result<()> {
1162        let mut owner_arg = vec!["--owner".to_string()];
1163        if let Some(owner) = owner {
1164            owner_arg.push(owner.to_string());
1165        };
1166        self.command()
1167            .await?
1168            .arg("set-preferred-owner")
1169            .args(["--chain-id", &chain_id.to_string()])
1170            .args(owner_arg)
1171            .spawn_and_wait_for_stdout()
1172            .await?;
1173        Ok(())
1174    }
1175
1176    pub async fn build_application(
1177        &self,
1178        path: &Path,
1179        name: &str,
1180        is_workspace: bool,
1181    ) -> Result<(PathBuf, PathBuf)> {
1182        Command::new("cargo")
1183            .current_dir(self.path_provider.path())
1184            .arg("build")
1185            .arg("--release")
1186            .args(["--target", "wasm32-unknown-unknown"])
1187            .arg("--manifest-path")
1188            .arg(path.join("Cargo.toml"))
1189            .spawn_and_wait_for_stdout()
1190            .await?;
1191
1192        let release_dir = match is_workspace {
1193            true => path.join("../target/wasm32-unknown-unknown/release"),
1194            false => path.join("target/wasm32-unknown-unknown/release"),
1195        };
1196
1197        let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1198        let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1199
1200        let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1201        let service_size = fs_err::tokio::metadata(&service).await?.len();
1202        tracing::info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1203
1204        Ok((contract, service))
1205    }
1206}
1207
1208impl Drop for ClientWrapper {
1209    fn drop(&mut self) {
1210        use std::process::Command as SyncCommand;
1211
1212        if self.on_drop != OnClientDrop::CloseChains {
1213            return;
1214        }
1215
1216        let Ok(binary_path) = self.binary_path.lock() else {
1217            tracing::error!(
1218                "Failed to close chains because a thread panicked with a lock to `binary_path`"
1219            );
1220            return;
1221        };
1222
1223        let Some(binary_path) = binary_path.as_ref() else {
1224            tracing::warn!(
1225                "Assuming no chains need to be closed, because the command binary was never \
1226                resolved and therefore presumably never called"
1227            );
1228            return;
1229        };
1230
1231        let working_directory = self.path_provider.path();
1232        let mut wallet_show_command = SyncCommand::new(binary_path);
1233
1234        for argument in self.command_arguments() {
1235            wallet_show_command.arg(&*argument);
1236        }
1237
1238        let Ok(wallet_show_output) = wallet_show_command
1239            .current_dir(working_directory)
1240            .args(["wallet", "show", "--short", "--owned"])
1241            .output()
1242        else {
1243            tracing::warn!("Failed to execute `wallet show --short` to list chains to close");
1244            return;
1245        };
1246
1247        if !wallet_show_output.status.success() {
1248            tracing::warn!("Failed to list chains in the wallet to close them");
1249            return;
1250        }
1251
1252        let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1253            tracing::warn!(
1254                "Failed to close chains because `linera wallet show --short` \
1255                returned a non-UTF-8 output"
1256            );
1257            return;
1258        };
1259
1260        let chain_ids = chain_list_string
1261            .split('\n')
1262            .map(|line| line.trim())
1263            .filter(|line| !line.is_empty());
1264
1265        for chain_id in chain_ids {
1266            let mut close_chain_command = SyncCommand::new(binary_path);
1267
1268            for argument in self.command_arguments() {
1269                close_chain_command.arg(&*argument);
1270            }
1271
1272            close_chain_command.current_dir(working_directory);
1273
1274            match close_chain_command.args(["close-chain", chain_id]).status() {
1275                Ok(status) if status.success() => (),
1276                Ok(failure) => tracing::warn!("Failed to close chain {chain_id}: {failure}"),
1277                Err(error) => tracing::warn!("Failed to close chain {chain_id}: {error}"),
1278            }
1279        }
1280    }
1281}
1282
1283#[cfg(with_testing)]
1284impl ClientWrapper {
1285    pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1286        self.build_application(Self::example_path(name)?.as_path(), name, true)
1287            .await
1288    }
1289
1290    pub fn example_path(name: &str) -> Result<PathBuf> {
1291        Ok(env::current_dir()?.join("../examples/").join(name))
1292    }
1293}
1294
1295fn truncate_query_output(input: &str) -> String {
1296    let max_len = 1000;
1297    if input.len() < max_len {
1298        input.to_string()
1299    } else {
1300        format!("{} ...", input.get(..max_len).unwrap())
1301    }
1302}
1303
1304fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1305    let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1306    let max_len = 200;
1307    if query.len() < max_len {
1308        query
1309    } else {
1310        format!("{} ...", query.get(..max_len).unwrap())
1311    }
1312}
1313
1314/// A running node service.
1315pub struct NodeService {
1316    port: u16,
1317    child: Child,
1318}
1319
1320impl NodeService {
1321    fn new(port: u16, child: Child) -> Self {
1322        Self { port, child }
1323    }
1324
1325    pub async fn terminate(mut self) -> Result<()> {
1326        self.child.kill().await.context("terminating node service")
1327    }
1328
1329    pub fn port(&self) -> u16 {
1330        self.port
1331    }
1332
1333    pub fn ensure_is_running(&mut self) -> Result<()> {
1334        self.child.ensure_is_running()
1335    }
1336
1337    pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1338        let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1339        let mut data = self.query_node(query).await?;
1340        Ok(serde_json::from_value(data["processInbox"].take())?)
1341    }
1342
1343    pub async fn sync(&self, chain_id: &ChainId) -> Result<u64> {
1344        let query = format!("mutation {{ sync(chainId: \"{chain_id}\") }}");
1345        let mut data = self.query_node(query).await?;
1346        Ok(serde_json::from_value(data["sync"].take())?)
1347    }
1348
1349    pub async fn transfer(
1350        &self,
1351        chain_id: ChainId,
1352        owner: AccountOwner,
1353        recipient: Account,
1354        amount: Amount,
1355    ) -> Result<CryptoHash> {
1356        let json_owner = owner.to_value();
1357        let json_recipient = recipient.to_value();
1358        let query = format!(
1359            "mutation {{ transfer(\
1360                 chainId: \"{chain_id}\", \
1361                 owner: {json_owner}, \
1362                 recipient: {json_recipient}, \
1363                 amount: \"{amount}\") \
1364             }}"
1365        );
1366        let data = self.query_node(query).await?;
1367        serde_json::from_value(data["transfer"].clone())
1368            .context("missing transfer field in response")
1369    }
1370
1371    pub async fn balance(&self, account: &Account) -> Result<Amount> {
1372        let chain = account.chain_id;
1373        let owner = account.owner;
1374        if matches!(owner, AccountOwner::CHAIN) {
1375            let query = format!(
1376                "query {{ chain(chainId:\"{chain}\") {{
1377                    executionState {{ system {{ balance }} }}
1378                }} }}"
1379            );
1380            let response = self.query_node(query).await?;
1381            let balance = &response["chain"]["executionState"]["system"]["balance"]
1382                .as_str()
1383                .unwrap();
1384            return Ok(Amount::from_str(balance)?);
1385        }
1386        let query = format!(
1387            "query {{ chain(chainId:\"{chain}\") {{
1388                executionState {{ system {{ balances {{
1389                    entry(key:\"{owner}\") {{ value }}
1390                }} }} }}
1391            }} }}"
1392        );
1393        let response = self.query_node(query).await?;
1394        let balances = &response["chain"]["executionState"]["system"]["balances"];
1395        let balance = balances["entry"]["value"].as_str();
1396        match balance {
1397            None => Ok(Amount::ZERO),
1398            Some(amount) => Ok(Amount::from_str(amount)?),
1399        }
1400    }
1401
1402    pub fn make_application<A: ContractAbi>(
1403        &self,
1404        chain_id: &ChainId,
1405        application_id: &ApplicationId<A>,
1406    ) -> Result<ApplicationWrapper<A>> {
1407        let application_id = application_id.forget_abi().to_string();
1408        let link = format!(
1409            "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1410            self.port
1411        );
1412        Ok(ApplicationWrapper::from(link))
1413    }
1414
1415    pub async fn publish_data_blob(
1416        &self,
1417        chain_id: &ChainId,
1418        bytes: Vec<u8>,
1419    ) -> Result<CryptoHash> {
1420        let query = format!(
1421            "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1422            chain_id.to_value(),
1423            bytes.to_value(),
1424        );
1425        let data = self.query_node(query).await?;
1426        serde_json::from_value(data["publishDataBlob"].clone())
1427            .context("missing publishDataBlob field in response")
1428    }
1429
1430    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1431        &self,
1432        chain_id: &ChainId,
1433        contract: PathBuf,
1434        service: PathBuf,
1435        vm_runtime: VmRuntime,
1436    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1437        let contract_code = Bytecode::load_from_file(&contract)?;
1438        let service_code = Bytecode::load_from_file(&service)?;
1439        let query = format!(
1440            "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1441            chain_id.to_value(),
1442            contract_code.to_value(),
1443            service_code.to_value(),
1444            vm_runtime.to_value(),
1445        );
1446        let data = self.query_node(query).await?;
1447        let module_str = data["publishModule"]
1448            .as_str()
1449            .context("module ID not found")?;
1450        let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1451        Ok(module_id.with_abi())
1452    }
1453
1454    pub async fn query_committees(&self, chain_id: &ChainId) -> Result<BTreeMap<Epoch, Committee>> {
1455        let query = format!(
1456            "query {{ chain(chainId:\"{chain_id}\") {{
1457                executionState {{ system {{ committees }} }}
1458            }} }}"
1459        );
1460        let mut response = self.query_node(query).await?;
1461        let committees = response["chain"]["executionState"]["system"]["committees"].take();
1462        Ok(serde_json::from_value(committees)?)
1463    }
1464
1465    pub async fn events_from_index(
1466        &self,
1467        chain_id: &ChainId,
1468        stream_id: &StreamId,
1469        start_index: u32,
1470    ) -> Result<Vec<IndexAndEvent>> {
1471        let query = format!(
1472            "query {{
1473               eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1474               {{ index event }}
1475             }}",
1476            stream_id.to_value()
1477        );
1478        let mut response = self.query_node(query).await?;
1479        let response = response["eventsFromIndex"].take();
1480        Ok(serde_json::from_value(response)?)
1481    }
1482
1483    pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1484        let n_try = 5;
1485        let query = query.as_ref();
1486        for i in 0..n_try {
1487            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1488            let url = format!("http://localhost:{}/", self.port);
1489            let client = reqwest_client();
1490            let result = client
1491                .post(url)
1492                .json(&json!({ "query": query }))
1493                .send()
1494                .await;
1495            if matches!(result, Err(ref error) if error.is_timeout()) {
1496                tracing::warn!(
1497                    "Timeout when sending query {} to the node service",
1498                    truncate_query_output(query)
1499                );
1500                continue;
1501            }
1502            let response = result.with_context(|| {
1503                format!(
1504                    "query_node: failed to post query={}",
1505                    truncate_query_output(query)
1506                )
1507            })?;
1508            ensure!(
1509                response.status().is_success(),
1510                "Query \"{}\" failed: {}",
1511                truncate_query_output(query),
1512                response
1513                    .text()
1514                    .await
1515                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1516            );
1517            let value: Value = response.json().await.context("invalid JSON")?;
1518            if let Some(errors) = value.get("errors") {
1519                tracing::warn!(
1520                    "Query \"{}\" failed: {}",
1521                    truncate_query_output(query),
1522                    errors
1523                );
1524            } else {
1525                return Ok(value["data"].clone());
1526            }
1527        }
1528        bail!(
1529            "Query \"{}\" failed after {} retries.",
1530            truncate_query_output(query),
1531            n_try
1532        );
1533    }
1534
1535    pub async fn create_application<
1536        Abi: ContractAbi,
1537        Parameters: Serialize,
1538        InstantiationArgument: Serialize,
1539    >(
1540        &self,
1541        chain_id: &ChainId,
1542        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1543        parameters: &Parameters,
1544        argument: &InstantiationArgument,
1545        required_application_ids: &[ApplicationId],
1546    ) -> Result<ApplicationId<Abi>> {
1547        let module_id = module_id.forget_abi();
1548        let json_required_applications_ids = required_application_ids
1549            .iter()
1550            .map(ApplicationId::to_string)
1551            .collect::<Vec<_>>()
1552            .to_value();
1553        // Convert to `serde_json::Value` then `async_graphql::Value` via the trait `InputType`.
1554        let new_parameters = serde_json::to_value(parameters)
1555            .context("could not create parameters JSON")?
1556            .to_value();
1557        let new_argument = serde_json::to_value(argument)
1558            .context("could not create argument JSON")?
1559            .to_value();
1560        let query = format!(
1561            "mutation {{ createApplication(\
1562                 chainId: \"{chain_id}\",
1563                 moduleId: \"{module_id}\", \
1564                 parameters: {new_parameters}, \
1565                 instantiationArgument: {new_argument}, \
1566                 requiredApplicationIds: {json_required_applications_ids}) \
1567             }}"
1568        );
1569        let data = self.query_node(query).await?;
1570        let app_id_str = data["createApplication"]
1571            .as_str()
1572            .context("missing createApplication string in response")?
1573            .trim();
1574        Ok(app_id_str
1575            .parse::<ApplicationId>()
1576            .context("invalid application ID")?
1577            .with_abi())
1578    }
1579
1580    /// Obtains the hash and height of the `chain`'s tip block, as known by this node service.
1581    pub async fn chain_tip(&self, chain: ChainId) -> Result<Option<(CryptoHash, BlockHeight)>> {
1582        let query = format!(
1583            r#"query {{ block(chainId: "{chain}") {{
1584                hash
1585                block {{ header {{ height }} }}
1586            }} }}"#
1587        );
1588
1589        let mut response = self.query_node(&query).await?;
1590
1591        match (
1592            mem::take(&mut response["block"]["hash"]),
1593            mem::take(&mut response["block"]["block"]["header"]["height"]),
1594        ) {
1595            (Value::Null, Value::Null) => Ok(None),
1596            (Value::String(hash), Value::Number(height)) => Ok(Some((
1597                hash.parse()
1598                    .context("Received an invalid hash {hash:?} for chain tip")?,
1599                BlockHeight(height.as_u64().unwrap()),
1600            ))),
1601            invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
1602        }
1603    }
1604
1605    /// Subscribes to the node service and returns a stream of notifications about a chain.
1606    pub async fn notifications(
1607        &self,
1608        chain_id: ChainId,
1609    ) -> Result<Pin<Box<impl Stream<Item = Result<Notification>>>>> {
1610        let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
1611        let url = format!("ws://localhost:{}/ws", self.port);
1612        let mut request = url.into_client_request()?;
1613        request.headers_mut().insert(
1614            "Sec-WebSocket-Protocol",
1615            HeaderValue::from_str("graphql-transport-ws")?,
1616        );
1617        let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1618        let init_json = json!({
1619          "type": "connection_init",
1620          "payload": {}
1621        });
1622        websocket.send(init_json.to_string().into()).await?;
1623        let text = websocket
1624            .next()
1625            .await
1626            .context("Failed to establish connection")??
1627            .into_text()?;
1628        ensure!(
1629            text == "{\"type\":\"connection_ack\"}",
1630            "Unexpected response: {text}"
1631        );
1632        let query_json = json!({
1633          "id": "1",
1634          "type": "start",
1635          "payload": {
1636            "query": query,
1637            "variables": {},
1638            "operationName": null
1639          }
1640        });
1641        websocket.send(query_json.to_string().into()).await?;
1642        Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
1643            |message| async {
1644                let text = message.into_text()?;
1645                let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1646                if let Some(errors) = value["payload"].get("errors") {
1647                    bail!("Notification subscription failed: {errors:?}");
1648                }
1649                serde_json::from_value(value["payload"]["data"]["notifications"].clone())
1650                    .context("Failed to deserialize notification")
1651            },
1652        )))
1653    }
1654}
1655
1656/// A running faucet service.
1657pub struct FaucetService {
1658    port: u16,
1659    child: Child,
1660    _temp_dir: tempfile::TempDir,
1661}
1662
1663impl FaucetService {
1664    fn new(port: u16, child: Child, temp_dir: tempfile::TempDir) -> Self {
1665        Self {
1666            port,
1667            child,
1668            _temp_dir: temp_dir,
1669        }
1670    }
1671
1672    pub async fn terminate(mut self) -> Result<()> {
1673        self.child
1674            .kill()
1675            .await
1676            .context("terminating faucet service")
1677    }
1678
1679    pub fn ensure_is_running(&mut self) -> Result<()> {
1680        self.child.ensure_is_running()
1681    }
1682
1683    pub fn instance(&self) -> Faucet {
1684        Faucet::new(format!("http://localhost:{}/", self.port))
1685    }
1686}
1687
1688/// A running `Application` to be queried in GraphQL.
1689pub struct ApplicationWrapper<A> {
1690    uri: String,
1691    _phantom: PhantomData<A>,
1692}
1693
1694impl<A> ApplicationWrapper<A> {
1695    pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
1696        let query = query.as_ref();
1697        let value = self.run_json_query(json!({ "query": query })).await?;
1698        Ok(value["data"].clone())
1699    }
1700
1701    pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
1702        const MAX_RETRIES: usize = 5;
1703
1704        for i in 0.. {
1705            let client = reqwest_client();
1706            let result = client.post(&self.uri).json(&query).send().await;
1707            let response = match result {
1708                Ok(response) => response,
1709                Err(error) if i < MAX_RETRIES => {
1710                    tracing::warn!(
1711                        "Failed to post query \"{}\": {error}; retrying",
1712                        truncate_query_output_serialize(&query),
1713                    );
1714                    continue;
1715                }
1716                Err(error) => {
1717                    let query = truncate_query_output_serialize(&query);
1718                    return Err(error)
1719                        .with_context(|| format!("run_json_query: failed to post query={query}"));
1720                }
1721            };
1722            ensure!(
1723                response.status().is_success(),
1724                "Query \"{}\" failed: {}",
1725                truncate_query_output_serialize(&query),
1726                response
1727                    .text()
1728                    .await
1729                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1730            );
1731            let value: Value = response.json().await.context("invalid JSON")?;
1732            if let Some(errors) = value.get("errors") {
1733                bail!(
1734                    "Query \"{}\" failed: {}",
1735                    truncate_query_output_serialize(&query),
1736                    errors
1737                );
1738            }
1739            return Ok(value);
1740        }
1741        unreachable!()
1742    }
1743
1744    pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
1745        let query = query.as_ref();
1746        self.run_graphql_query(&format!("query {{ {query} }}"))
1747            .await
1748    }
1749
1750    pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
1751        let query = query.as_ref().trim();
1752        let name = query
1753            .split_once(|ch: char| !ch.is_alphanumeric())
1754            .map_or(query, |(name, _)| name);
1755        let data = self.query(query).await?;
1756        serde_json::from_value(data[name].clone())
1757            .with_context(|| format!("{name} field missing in response"))
1758    }
1759
1760    pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
1761        let mutation = mutation.as_ref();
1762        self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
1763            .await
1764    }
1765
1766    pub async fn multiple_mutate(&self, mutations: &[String]) -> Result<Value> {
1767        let mut out = String::from("mutation {\n");
1768        for (index, mutation) in mutations.iter().enumerate() {
1769            out = format!("{}  u{}: {}\n", out, index, mutation);
1770        }
1771        out.push_str("}\n");
1772        self.run_graphql_query(&out).await
1773    }
1774}
1775
1776impl<A> From<String> for ApplicationWrapper<A> {
1777    fn from(uri: String) -> ApplicationWrapper<A> {
1778        ApplicationWrapper {
1779            uri,
1780            _phantom: PhantomData,
1781        }
1782    }
1783}
1784
1785/// Returns the timeout for tests that wait for notifications, either read from the env
1786/// variable `LINERA_TEST_NOTIFICATION_TIMEOUT_MS`, or the default value of 10 seconds.
1787#[cfg(with_testing)]
1788fn notification_timeout() -> Duration {
1789    const NOTIFICATION_TIMEOUT_MS_ENV: &str = "LINERA_TEST_NOTIFICATION_TIMEOUT_MS";
1790    const NOTIFICATION_TIMEOUT_MS_DEFAULT: u64 = 10_000;
1791
1792    match env::var(NOTIFICATION_TIMEOUT_MS_ENV) {
1793        Ok(var) => Duration::from_millis(var.parse().unwrap_or_else(|error| {
1794            panic!("{NOTIFICATION_TIMEOUT_MS_ENV} is not a valid number: {error}")
1795        })),
1796        Err(env::VarError::NotPresent) => Duration::from_millis(NOTIFICATION_TIMEOUT_MS_DEFAULT),
1797        Err(env::VarError::NotUnicode(_)) => {
1798            panic!("{NOTIFICATION_TIMEOUT_MS_ENV} must be valid Unicode")
1799        }
1800    }
1801}
1802
1803#[cfg(with_testing)]
1804pub trait NotificationsExt {
1805    /// Waits for a notification for which `f` returns `Some(t)`, and returns `t`.
1806    fn wait_for<T>(
1807        &mut self,
1808        f: impl FnMut(Notification) -> Option<T>,
1809    ) -> impl Future<Output = Result<T>>;
1810
1811    /// Waits for a `NewEvents` notification for the given block height. If no height is specified,
1812    /// any height is accepted.
1813    fn wait_for_events(
1814        &mut self,
1815        expected_height: impl Into<Option<BlockHeight>>,
1816    ) -> impl Future<Output = Result<BTreeSet<StreamId>>> {
1817        let expected_height = expected_height.into();
1818        self.wait_for(move |notification| {
1819            if let Reason::NewEvents {
1820                height,
1821                event_streams,
1822                ..
1823            } = notification.reason
1824            {
1825                if expected_height.is_none_or(|h| h == height) {
1826                    return Some(event_streams);
1827                }
1828            }
1829            None
1830        })
1831    }
1832
1833    /// Waits for a `NewBlock` notification for the given block height. If no height is specified,
1834    /// any height is accepted.
1835    fn wait_for_block(
1836        &mut self,
1837        expected_height: impl Into<Option<BlockHeight>>,
1838    ) -> impl Future<Output = Result<CryptoHash>> {
1839        let expected_height = expected_height.into();
1840        self.wait_for(move |notification| {
1841            if let Reason::NewBlock { height, hash, .. } = notification.reason {
1842                if expected_height.is_none_or(|h| h == height) {
1843                    return Some(hash);
1844                }
1845            }
1846            None
1847        })
1848    }
1849
1850    /// Waits for a `NewIncomingBundle` notification for the given sender chain and sender block
1851    /// height. If no height is specified, any height is accepted.
1852    fn wait_for_bundle(
1853        &mut self,
1854        expected_origin: ChainId,
1855        expected_height: impl Into<Option<BlockHeight>>,
1856    ) -> impl Future<Output = Result<()>> {
1857        let expected_height = expected_height.into();
1858        self.wait_for(move |notification| {
1859            if let Reason::NewIncomingBundle { height, origin } = notification.reason {
1860                if expected_height.is_none_or(|h| h == height) && origin == expected_origin {
1861                    return Some(());
1862                }
1863            }
1864            None
1865        })
1866    }
1867}
1868
1869#[cfg(with_testing)]
1870impl<S: Stream<Item = Result<Notification>>> NotificationsExt for Pin<Box<S>> {
1871    async fn wait_for<T>(&mut self, mut f: impl FnMut(Notification) -> Option<T>) -> Result<T> {
1872        let mut timeout = Box::pin(linera_base::time::timer::sleep(notification_timeout())).fuse();
1873        loop {
1874            let notification = futures::select! {
1875                () = timeout => bail!("Timeout waiting for notification"),
1876                notification = self.next().fuse() => notification.context("Stream closed")??,
1877            };
1878            if let Some(t) = f(notification) {
1879                return Ok(t);
1880            }
1881        }
1882    }
1883}