Skip to main content

linera_service/cli_wrappers/
wallet.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    borrow::Cow,
6    collections::BTreeMap,
7    env,
8    marker::PhantomData,
9    mem,
10    path::{Path, PathBuf},
11    pin::Pin,
12    process::Stdio,
13    str::FromStr,
14    sync,
15    time::Duration,
16};
17
18use anyhow::{bail, ensure, Context, Result};
19use async_graphql::InputType;
20use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
21use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
22use heck::ToKebabCase;
23use linera_base::{
24    abi::ContractAbi,
25    command::{resolve_binary, CommandExt},
26    crypto::{CryptoHash, InMemorySigner},
27    data_types::{Amount, ApplicationPermissions, BlockHeight, Bytecode, Epoch},
28    identifiers::{
29        Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
30    },
31    vm::VmRuntime,
32};
33use linera_client::client_options::ResourceControlPolicyConfig;
34use linera_core::worker::Notification;
35use linera_faucet_client::Faucet;
36use serde::{de::DeserializeOwned, ser::Serialize};
37use serde_command_opts::to_args;
38use serde_json::{json, Value};
39use tempfile::TempDir;
40use tokio::{
41    io::{AsyncBufReadExt, BufReader},
42    process::{Child, Command},
43    sync::oneshot,
44    task::JoinHandle,
45};
46#[cfg(with_testing)]
47use {
48    futures::FutureExt as _,
49    linera_core::worker::Reason,
50    std::{collections::BTreeSet, future::Future},
51};
52
53use crate::{
54    cli::command::{BenchmarkCommand, ResourceControlPolicyOverrides},
55    cli_wrappers::{
56        local_net::{PathProvider, ProcessInbox},
57        Network,
58    },
59    util::{self, ChildExt},
60    Wallet,
61};
62
63/// The name of the environment variable that allows specifying additional arguments to be passed
64/// to the node-service command of the client.
65const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
66
67fn reqwest_client() -> reqwest::Client {
68    reqwest::ClientBuilder::new()
69        .timeout(Duration::from_secs(60))
70        .build()
71        .unwrap()
72}
73
74/// Wrapper to run a Linera client command.
75pub struct ClientWrapper {
76    binary_path: sync::Mutex<Option<PathBuf>>,
77    testing_prng_seed: Option<u64>,
78    storage: String,
79    wallet: String,
80    keystore: String,
81    max_pending_message_bundles: usize,
82    network: Network,
83    pub path_provider: PathProvider,
84    on_drop: OnClientDrop,
85    extra_args: Vec<String>,
86}
87
88/// Action to perform when the [`ClientWrapper`] is dropped.
89#[derive(Clone, Copy, Debug, Eq, PartialEq)]
90pub enum OnClientDrop {
91    /// Close all the chains on the wallet.
92    CloseChains,
93    /// Do not close any chains, leaving them active.
94    LeakChains,
95}
96
97impl ClientWrapper {
98    pub fn new(
99        path_provider: PathProvider,
100        network: Network,
101        testing_prng_seed: Option<u64>,
102        id: usize,
103        on_drop: OnClientDrop,
104    ) -> Self {
105        Self::new_with_extra_args(
106            path_provider,
107            network,
108            testing_prng_seed,
109            id,
110            on_drop,
111            vec!["--wait-for-outgoing-messages".to_string()],
112            None,
113        )
114    }
115
116    pub fn new_with_extra_args(
117        path_provider: PathProvider,
118        network: Network,
119        testing_prng_seed: Option<u64>,
120        id: usize,
121        on_drop: OnClientDrop,
122        extra_args: Vec<String>,
123        binary_dir: Option<PathBuf>,
124    ) -> Self {
125        let storage = format!(
126            "rocksdb:{}/client_{}.db",
127            path_provider.path().display(),
128            id
129        );
130        let wallet = format!("wallet_{id}.json");
131        let keystore = format!("keystore_{id}.json");
132        let binary_path = binary_dir.map(|dir| dir.join("linera"));
133        Self {
134            binary_path: sync::Mutex::new(binary_path),
135            testing_prng_seed,
136            storage,
137            wallet,
138            keystore,
139            max_pending_message_bundles: 10_000,
140            network,
141            path_provider,
142            on_drop,
143            extra_args,
144        }
145    }
146
147    /// Runs `linera project new`.
148    pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
149        let tmp = TempDir::new()?;
150        let mut command = self.command().await?;
151        command
152            .current_dir(tmp.path())
153            .arg("project")
154            .arg("new")
155            .arg(project_name)
156            .arg("--linera-root")
157            .arg(linera_root)
158            .spawn_and_wait_for_stdout()
159            .await?;
160        Ok(tmp)
161    }
162
163    /// Runs `linera project publish`.
164    pub async fn project_publish<T: Serialize>(
165        &self,
166        path: PathBuf,
167        required_application_ids: Vec<String>,
168        publisher: impl Into<Option<ChainId>>,
169        argument: &T,
170    ) -> Result<String> {
171        let json_parameters = serde_json::to_string(&())?;
172        let json_argument = serde_json::to_string(argument)?;
173        let mut command = self.command().await?;
174        command
175            .arg("project")
176            .arg("publish-and-create")
177            .arg(path)
178            .args(publisher.into().iter().map(ChainId::to_string))
179            .args(["--json-parameters", &json_parameters])
180            .args(["--json-argument", &json_argument]);
181        if !required_application_ids.is_empty() {
182            command.arg("--required-application-ids");
183            command.args(required_application_ids);
184        }
185        let stdout = command.spawn_and_wait_for_stdout().await?;
186        Ok(stdout.trim().to_string())
187    }
188
189    /// Runs `linera project test`.
190    pub async fn project_test(&self, path: &Path) -> Result<()> {
191        self.command()
192            .await
193            .context("failed to create project test command")?
194            .current_dir(path)
195            .arg("project")
196            .arg("test")
197            .spawn_and_wait_for_stdout()
198            .await?;
199        Ok(())
200    }
201
202    async fn command_with_envs_and_arguments(
203        &self,
204        envs: &[(&str, &str)],
205        arguments: impl IntoIterator<Item = Cow<'_, str>>,
206    ) -> Result<Command> {
207        let mut command = self.command_binary().await?;
208        command.current_dir(self.path_provider.path());
209        for (key, value) in envs {
210            command.env(key, value);
211        }
212        for argument in arguments {
213            command.arg(&*argument);
214        }
215        Ok(command)
216    }
217
218    async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
219        self.command_with_envs_and_arguments(envs, self.command_arguments())
220            .await
221    }
222
223    async fn command_with_arguments(
224        &self,
225        arguments: impl IntoIterator<Item = Cow<'_, str>>,
226    ) -> Result<Command> {
227        self.command_with_envs_and_arguments(
228            &[(
229                "RUST_LOG",
230                &std::env::var("RUST_LOG").unwrap_or_else(|_| String::from("linera=debug")),
231            )],
232            arguments,
233        )
234        .await
235    }
236
237    async fn command(&self) -> Result<Command> {
238        self.command_with_envs(&[(
239            "RUST_LOG",
240            &std::env::var("RUST_LOG").unwrap_or_else(|_| String::from("linera=debug")),
241        )])
242        .await
243    }
244
245    fn required_command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
246        [
247            "--wallet".into(),
248            self.wallet.as_str().into(),
249            "--keystore".into(),
250            self.keystore.as_str().into(),
251            "--storage".into(),
252            self.storage.as_str().into(),
253            "--send-timeout-ms".into(),
254            "500000".into(),
255            "--recv-timeout-ms".into(),
256            "500000".into(),
257        ]
258        .into_iter()
259        .chain(self.extra_args.iter().map(|s| s.as_str().into()))
260    }
261
262    /// Returns an iterator over the arguments that should be added to all command invocations.
263    fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
264        self.required_command_arguments().chain([
265            "--max-pending-message-bundles".into(),
266            self.max_pending_message_bundles.to_string().into(),
267        ])
268    }
269
270    /// Returns the [`Command`] instance configured to run the appropriate binary.
271    ///
272    /// The path is resolved once and cached inside `self` for subsequent usages.
273    async fn command_binary(&self) -> Result<Command> {
274        match self.command_with_cached_binary_path() {
275            Some(command) => Ok(command),
276            None => {
277                let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
278                let command = Command::new(&resolved_path);
279
280                self.set_cached_binary_path(resolved_path);
281
282                Ok(command)
283            }
284        }
285    }
286
287    /// Returns a [`Command`] instance configured with the cached `binary_path`, if available.
288    fn command_with_cached_binary_path(&self) -> Option<Command> {
289        let binary_path = self.binary_path.lock().unwrap();
290
291        binary_path.as_ref().map(Command::new)
292    }
293
294    /// Sets the cached `binary_path` with the `new_binary_path`.
295    ///
296    /// # Panics
297    ///
298    /// If the cache is already set to a different value. In theory the two threads calling
299    /// `command_binary` can race and resolve the binary path twice, but they should always be the
300    /// same path.
301    fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
302        let mut binary_path = self.binary_path.lock().unwrap();
303
304        if binary_path.is_none() {
305            *binary_path = Some(new_binary_path);
306        } else {
307            assert_eq!(*binary_path, Some(new_binary_path));
308        }
309    }
310
311    /// Runs `linera create-genesis-config`.
312    pub async fn create_genesis_config(
313        &self,
314        num_other_initial_chains: u32,
315        initial_funding: Amount,
316        policy_config: ResourceControlPolicyConfig,
317        http_allow_list: Option<Vec<String>>,
318    ) -> Result<()> {
319        let mut command = self.command().await?;
320        command
321            .args([
322                "create-genesis-config",
323                &num_other_initial_chains.to_string(),
324            ])
325            .args(["--initial-funding", &initial_funding.to_string()])
326            .args(["--committee", "committee.json"])
327            .args(["--genesis", "genesis.json"])
328            .args([
329                "--policy-config",
330                &policy_config.to_string().to_kebab_case(),
331            ]);
332        if let Some(allow_list) = http_allow_list {
333            command
334                .arg("--http-request-allow-list")
335                .arg(allow_list.join(","));
336        }
337        if let Some(seed) = self.testing_prng_seed {
338            command.arg("--testing-prng-seed").arg(seed.to_string());
339        }
340        command.spawn_and_wait_for_stdout().await?;
341        Ok(())
342    }
343
344    /// Runs `linera wallet init`. The genesis config is read from `genesis.json`, or from the
345    /// faucet if provided.
346    pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
347        let mut command = self.command().await?;
348        command.args(["wallet", "init"]);
349        match faucet {
350            None => command.args(["--genesis", "genesis.json"]),
351            Some(faucet) => command.args(["--faucet", faucet.url()]),
352        };
353        if let Some(seed) = self.testing_prng_seed {
354            command.arg("--testing-prng-seed").arg(seed.to_string());
355        }
356        command.spawn_and_wait_for_stdout().await?;
357        Ok(())
358    }
359
360    /// Runs `linera wallet request-chain`.
361    pub async fn request_chain(
362        &self,
363        faucet: &Faucet,
364        set_default: bool,
365    ) -> Result<(ChainId, AccountOwner)> {
366        let mut command = self.command().await?;
367        command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
368        if set_default {
369            command.arg("--set-default");
370        }
371        let stdout = command.spawn_and_wait_for_stdout().await?;
372        let mut lines = stdout.split_whitespace();
373        let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
374        let owner = lines.next().context("missing chain owner")?.parse()?;
375        Ok((chain_id, owner))
376    }
377
378    /// Runs `linera wallet publish-and-create`.
379    #[expect(clippy::too_many_arguments)]
380    pub async fn publish_and_create<
381        A: ContractAbi,
382        Parameters: Serialize,
383        InstantiationArgument: Serialize,
384    >(
385        &self,
386        contract: PathBuf,
387        service: PathBuf,
388        vm_runtime: VmRuntime,
389        parameters: &Parameters,
390        argument: &InstantiationArgument,
391        required_application_ids: &[ApplicationId],
392        publisher: impl Into<Option<ChainId>>,
393    ) -> Result<ApplicationId<A>> {
394        let json_parameters = serde_json::to_string(parameters)?;
395        let json_argument = serde_json::to_string(argument)?;
396        let mut command = self.command().await?;
397        let vm_runtime = format!("{vm_runtime}");
398        command
399            .arg("publish-and-create")
400            .args([contract, service])
401            .args(["--vm-runtime", &vm_runtime.to_lowercase()])
402            .args(publisher.into().iter().map(ChainId::to_string))
403            .args(["--json-parameters", &json_parameters])
404            .args(["--json-argument", &json_argument]);
405        if !required_application_ids.is_empty() {
406            command.arg("--required-application-ids");
407            command.args(
408                required_application_ids
409                    .iter()
410                    .map(ApplicationId::to_string),
411            );
412        }
413        let stdout = command.spawn_and_wait_for_stdout().await?;
414        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
415    }
416
417    /// Runs `linera publish-module`.
418    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
419        &self,
420        contract: PathBuf,
421        service: PathBuf,
422        vm_runtime: VmRuntime,
423        publisher: impl Into<Option<ChainId>>,
424    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
425        self.publish_module_with_formats(contract, service, None, vm_runtime, publisher)
426            .await
427    }
428
429    /// Runs `linera publish-module` with an optional `--formats` SNAP file path.
430    pub async fn publish_module_with_formats<Abi, Parameters, InstantiationArgument>(
431        &self,
432        contract: PathBuf,
433        service: PathBuf,
434        formats: Option<PathBuf>,
435        vm_runtime: VmRuntime,
436        publisher: impl Into<Option<ChainId>>,
437    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
438        let mut command = self.command().await?;
439        command
440            .arg("publish-module")
441            .args([contract, service])
442            .args(["--vm-runtime", &format!("{vm_runtime}").to_lowercase()]);
443        if let Some(formats_path) = formats {
444            command.arg("--formats").arg(formats_path);
445        }
446        let stdout = command
447            .args(publisher.into().iter().map(ChainId::to_string))
448            .spawn_and_wait_for_stdout()
449            .await?;
450        let module_id: ModuleId = stdout.trim().parse()?;
451        Ok(module_id.with_abi())
452    }
453
454    /// Runs `linera create-application`.
455    pub async fn create_application<
456        Abi: ContractAbi,
457        Parameters: Serialize,
458        InstantiationArgument: Serialize,
459    >(
460        &self,
461        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
462        parameters: &Parameters,
463        argument: &InstantiationArgument,
464        required_application_ids: &[ApplicationId],
465        creator: impl Into<Option<ChainId>>,
466    ) -> Result<ApplicationId<Abi>> {
467        let json_parameters = serde_json::to_string(parameters)?;
468        let json_argument = serde_json::to_string(argument)?;
469        let mut command = self.command().await?;
470        command
471            .arg("create-application")
472            .arg(module_id.forget_abi().to_string())
473            .args(["--json-parameters", &json_parameters])
474            .args(["--json-argument", &json_argument])
475            .args(creator.into().iter().map(ChainId::to_string));
476        if !required_application_ids.is_empty() {
477            command.arg("--required-application-ids");
478            command.args(
479                required_application_ids
480                    .iter()
481                    .map(ApplicationId::to_string),
482            );
483        }
484        let stdout = command.spawn_and_wait_for_stdout().await?;
485        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
486    }
487
488    /// Runs `linera service`.
489    pub async fn run_node_service(
490        &self,
491        port: impl Into<Option<u16>>,
492        process_inbox: ProcessInbox,
493    ) -> Result<NodeService> {
494        self.run_node_service_with_options(port, process_inbox, &[], &[], false)
495            .await
496    }
497
498    /// Runs `linera service` with optional task processor configuration.
499    pub async fn run_node_service_with_options(
500        &self,
501        port: impl Into<Option<u16>>,
502        process_inbox: ProcessInbox,
503        operator_application_ids: &[ApplicationId],
504        operators: &[(String, PathBuf)],
505        read_only: bool,
506    ) -> Result<NodeService> {
507        self.run_node_service_with_all_options(
508            port,
509            process_inbox,
510            operator_application_ids,
511            operators,
512            read_only,
513            &[],
514            &[],
515        )
516        .await
517    }
518
519    /// Runs `linera service` with all available options.
520    #[expect(clippy::too_many_arguments)]
521    pub async fn run_node_service_with_all_options(
522        &self,
523        port: impl Into<Option<u16>>,
524        process_inbox: ProcessInbox,
525        operator_application_ids: &[ApplicationId],
526        operators: &[(String, PathBuf)],
527        read_only: bool,
528        allowed_subscriptions: &[String],
529        subscription_ttls: &[(String, u64)],
530    ) -> Result<NodeService> {
531        let port = port.into().unwrap_or(8080);
532        let mut command = self.command().await?;
533        command.arg("service");
534        if let ProcessInbox::Skip = process_inbox {
535            command.arg("--listener-skip-process-inbox");
536        }
537        if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
538            command.args(var.split_whitespace());
539        }
540        for app_id in operator_application_ids {
541            command.args(["--operator-application-ids", &app_id.to_string()]);
542        }
543        for (name, path) in operators {
544            command.args(["--operators", &format!("{}={}", name, path.display())]);
545        }
546        if read_only {
547            command.arg("--read-only");
548        }
549        for query in allowed_subscriptions {
550            command.args(["--allow-subscription", query]);
551        }
552        for (name, secs) in subscription_ttls {
553            command.args(["--subscription-ttl-secs", &format!("{name}={secs}")]);
554        }
555        let child = command
556            .args(["--port".to_string(), port.to_string()])
557            .spawn_into()?;
558        let client = reqwest_client();
559        for i in 0..10 {
560            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
561            let request = client.get(format!("http://localhost:{port}/")).send().await;
562            if request.is_ok() {
563                tracing::info!("Node service has started");
564                return Ok(NodeService::new(port, child));
565            } else {
566                tracing::warn!("Waiting for node service to start");
567            }
568        }
569        bail!("Failed to start node service");
570    }
571
572    /// Runs `linera service` with a controller application.
573    pub async fn run_node_service_with_controller(
574        &self,
575        port: impl Into<Option<u16>>,
576        process_inbox: ProcessInbox,
577        controller_id: &ApplicationId,
578        operators: &[(String, PathBuf)],
579    ) -> Result<NodeService> {
580        let port = port.into().unwrap_or(8080);
581        let mut command = self.command().await?;
582        command.arg("service");
583        if let ProcessInbox::Skip = process_inbox {
584            command.arg("--listener-skip-process-inbox");
585        }
586        if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
587            command.args(var.split_whitespace());
588        }
589        command.args(["--controller-id", &controller_id.to_string()]);
590        for (name, path) in operators {
591            command.args(["--operators", &format!("{}={}", name, path.display())]);
592        }
593        let child = command
594            .args(["--port".to_string(), port.to_string()])
595            .spawn_into()?;
596        let client = reqwest_client();
597        for i in 0..10 {
598            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
599            let request = client.get(format!("http://localhost:{port}/")).send().await;
600            if request.is_ok() {
601                tracing::info!("Node service has started");
602                return Ok(NodeService::new(port, child));
603            } else {
604                tracing::warn!("Waiting for node service to start");
605            }
606        }
607        bail!("Failed to start node service");
608    }
609
610    /// Runs `linera validator query`
611    pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
612        let mut command = self.command().await?;
613        command.arg("validator").arg("query").arg(address);
614        let stdout = command.spawn_and_wait_for_stdout().await?;
615
616        // Parse the genesis config hash from the output.
617        // It's on a line like "Genesis config hash: <hash>"
618        let hash = stdout
619            .lines()
620            .find_map(|line| {
621                line.strip_prefix("Genesis config hash: ")
622                    .and_then(|hash_str| hash_str.trim().parse().ok())
623            })
624            .context("error while parsing the result of `linera validator query`")?;
625        Ok(hash)
626    }
627
628    /// Runs `linera validator list`.
629    pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
630        let mut command = self.command().await?;
631        command.arg("validator").arg("list");
632        if let Some(chain_id) = chain_id {
633            command.args(["--chain-id", &chain_id.to_string()]);
634        }
635        command.spawn_and_wait_for_stdout().await?;
636        Ok(())
637    }
638
639    /// Runs `linera sync-validator`.
640    pub async fn sync_validator(
641        &self,
642        chain_ids: impl IntoIterator<Item = &ChainId>,
643        validator_address: impl Into<String>,
644    ) -> Result<()> {
645        let mut command = self.command().await?;
646        command
647            .arg("validator")
648            .arg("sync")
649            .arg(validator_address.into());
650        let mut chain_ids = chain_ids.into_iter().peekable();
651        if chain_ids.peek().is_some() {
652            command
653                .arg("--chains")
654                .args(chain_ids.map(ChainId::to_string));
655        }
656        command.spawn_and_wait_for_stdout().await?;
657        Ok(())
658    }
659
660    /// Runs `linera validator benchmark` and returns its stdout.
661    pub async fn validator_benchmark(
662        &self,
663        address: impl Into<String>,
664        chains: impl IntoIterator<Item = &ChainId>,
665        extra_args: &[&str],
666    ) -> Result<String> {
667        let mut command = self.command().await?;
668        command
669            .arg("validator")
670            .arg("benchmark")
671            .arg(address.into());
672        for chain in chains {
673            command.args(["--chain", &chain.to_string()]);
674        }
675        command.args(extra_args);
676        command.spawn_and_wait_for_stdout().await
677    }
678
679    /// Runs `linera faucet`.
680    pub async fn run_faucet(
681        &self,
682        port: impl Into<Option<u16>>,
683        chain_id: Option<ChainId>,
684        amount: Amount,
685    ) -> Result<FaucetService> {
686        let port = port.into().unwrap_or(8080);
687        let temp_dir = tempfile::tempdir()
688            .context("Failed to create temporary directory for faucet storage")?;
689        let storage_path = temp_dir.path().join("faucet_storage.sqlite");
690        let mut command = self.command().await?;
691        let command = command
692            .arg("faucet")
693            .args(["--port".to_string(), port.to_string()])
694            .args(["--amount".to_string(), amount.to_string()])
695            .args([
696                "--storage-path".to_string(),
697                storage_path.to_string_lossy().to_string(),
698            ]);
699        if let Some(chain_id) = chain_id {
700            command.arg(chain_id.to_string());
701        }
702        let child = command.spawn_into()?;
703        let client = reqwest_client();
704        for i in 0..10 {
705            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
706            let request = client.get(format!("http://localhost:{port}/")).send().await;
707            if request.is_ok() {
708                tracing::info!("Faucet has started");
709                return Ok(FaucetService::new(port, child, temp_dir));
710            } else {
711                tracing::debug!("Waiting for faucet to start");
712            }
713        }
714        bail!("Failed to start faucet");
715    }
716
717    /// Runs `linera local-balance`.
718    pub async fn local_balance(&self, account: Account) -> Result<Amount> {
719        let stdout = self
720            .command()
721            .await?
722            .arg("local-balance")
723            .arg(account.to_string())
724            .spawn_and_wait_for_stdout()
725            .await?;
726        let amount = stdout
727            .trim()
728            .parse()
729            .context("error while parsing the result of `linera local-balance`")?;
730        Ok(amount)
731    }
732
733    /// Runs `linera query-balance`.
734    pub async fn query_balance(&self, account: Account) -> Result<Amount> {
735        let stdout = self
736            .command()
737            .await?
738            .arg("query-balance")
739            .arg(account.to_string())
740            .spawn_and_wait_for_stdout()
741            .await?;
742        let amount = stdout
743            .trim()
744            .parse()
745            .context("error while parsing the result of `linera query-balance`")?;
746        Ok(amount)
747    }
748
749    /// Runs `linera query-application` and parses the JSON result.
750    pub async fn query_application_json<T: DeserializeOwned>(
751        &self,
752        chain_id: ChainId,
753        application_id: ApplicationId,
754        query: impl AsRef<str>,
755    ) -> Result<T> {
756        let query = query.as_ref().trim();
757        let name = query
758            .split_once(|ch: char| !ch.is_alphanumeric())
759            .map_or(query, |(name, _)| name);
760        let stdout = self
761            .command()
762            .await?
763            .arg("query-application")
764            .arg("--chain-id")
765            .arg(chain_id.to_string())
766            .arg("--application-id")
767            .arg(application_id.to_string())
768            .arg(query)
769            .spawn_and_wait_for_stdout()
770            .await?;
771        let data: serde_json::Value =
772            serde_json::from_str(stdout.trim()).context("invalid JSON from query-application")?;
773        serde_json::from_value(data[name].clone())
774            .with_context(|| format!("{name} field missing in query-application response"))
775    }
776
777    /// Runs `linera sync`.
778    pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
779        self.command()
780            .await?
781            .arg("sync")
782            .arg(chain_id.to_string())
783            .spawn_and_wait_for_stdout()
784            .await?;
785        Ok(())
786    }
787
788    /// Runs `linera process-inbox`.
789    pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
790        self.command()
791            .await?
792            .arg("process-inbox")
793            .arg(chain_id.to_string())
794            .spawn_and_wait_for_stdout()
795            .await?;
796        Ok(())
797    }
798
799    /// Runs `linera transfer`.
800    pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
801        self.command()
802            .await?
803            .arg("transfer")
804            .arg(amount.to_string())
805            .args(["--from", &from.to_string()])
806            .args(["--to", &to.to_string()])
807            .spawn_and_wait_for_stdout()
808            .await?;
809        Ok(())
810    }
811
812    /// Runs `linera transfer` with no logging.
813    pub async fn transfer_with_silent_logs(
814        &self,
815        amount: Amount,
816        from: ChainId,
817        to: ChainId,
818    ) -> Result<()> {
819        self.command()
820            .await?
821            .env("RUST_LOG", "off")
822            .arg("transfer")
823            .arg(amount.to_string())
824            .args(["--from", &from.to_string()])
825            .args(["--to", &to.to_string()])
826            .spawn_and_wait_for_stdout()
827            .await?;
828        Ok(())
829    }
830
831    /// Runs `linera transfer` with owner accounts.
832    pub async fn transfer_with_accounts(
833        &self,
834        amount: Amount,
835        from: Account,
836        to: Account,
837    ) -> Result<()> {
838        self.command()
839            .await?
840            .arg("transfer")
841            .arg(amount.to_string())
842            .args(["--from", &from.to_string()])
843            .args(["--to", &to.to_string()])
844            .spawn_and_wait_for_stdout()
845            .await?;
846        Ok(())
847    }
848
849    fn benchmark_command_internal(command: &mut Command, args: &BenchmarkCommand) -> Result<()> {
850        let mut formatted_args = to_args(&args)?;
851        let subcommand = formatted_args.remove(0);
852        // The subcommand is followed by the flattened options, which are preceded by "options".
853        // So remove that as well.
854        formatted_args.remove(0);
855        let options = formatted_args
856            .chunks_exact(2)
857            .flat_map(|pair| {
858                let option = format!("--{}", pair[0]);
859                match pair[1].as_str() {
860                    "true" => vec![option],
861                    "false" => vec![],
862                    _ => vec![option, pair[1].clone()],
863                }
864            })
865            .collect::<Vec<_>>();
866        command
867            .args([
868                "--max-pending-message-bundles",
869                &args.transactions_per_block().to_string(),
870            ])
871            .arg("benchmark")
872            .arg(subcommand)
873            .args(options);
874        Ok(())
875    }
876
877    async fn benchmark_command_with_envs(
878        &self,
879        args: BenchmarkCommand,
880        envs: &[(&str, &str)],
881    ) -> Result<Command> {
882        let mut command = self
883            .command_with_envs_and_arguments(envs, self.required_command_arguments())
884            .await?;
885        Self::benchmark_command_internal(&mut command, &args)?;
886        Ok(command)
887    }
888
889    async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
890        let mut command = self
891            .command_with_arguments(self.required_command_arguments())
892            .await?;
893        Self::benchmark_command_internal(&mut command, &args)?;
894        Ok(command)
895    }
896
897    /// Runs `linera benchmark`.
898    pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
899        let mut command = self.benchmark_command(args).await?;
900        command.spawn_and_wait_for_stdout().await?;
901        Ok(())
902    }
903
904    /// Runs `linera benchmark`, but detached: don't wait for the command to finish, just spawn it
905    /// and return the child process, and the handles to the stdout and stderr.
906    pub async fn benchmark_detached(
907        &self,
908        args: BenchmarkCommand,
909        tx: oneshot::Sender<()>,
910    ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
911        let mut child = self
912            .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
913            .await?
914            .kill_on_drop(true)
915            .stdin(Stdio::piped())
916            .stdout(Stdio::piped())
917            .stderr(Stdio::piped())
918            .spawn()?;
919
920        let pid = child.id().expect("failed to get pid");
921        let stdout = child.stdout.take().expect("stdout not open");
922        let stdout_handle = tokio::spawn(async move {
923            let mut lines = BufReader::new(stdout).lines();
924            while let Ok(Some(line)) = lines.next_line().await {
925                println!("benchmark{{pid={pid}}} {line}");
926            }
927        });
928
929        let stderr = child.stderr.take().expect("stderr not open");
930        let stderr_handle = tokio::spawn(async move {
931            let mut lines = BufReader::new(stderr).lines();
932            let mut tx = Some(tx);
933            while let Ok(Some(line)) = lines.next_line().await {
934                if line.contains("Ready to start benchmark") {
935                    tx.take()
936                        .expect("Should only send signal once")
937                        .send(())
938                        .expect("failed to send ready signal to main thread");
939                } else {
940                    println!("benchmark{{pid={pid}}} {line}");
941                }
942            }
943        });
944        Ok((child, stdout_handle, stderr_handle))
945    }
946
947    async fn open_chain_internal(
948        &self,
949        from: ChainId,
950        owner: Option<AccountOwner>,
951        initial_balance: Amount,
952        super_owner: bool,
953    ) -> Result<(ChainId, AccountOwner)> {
954        let mut command = self.command().await?;
955        command
956            .arg("open-chain")
957            .args(["--from", &from.to_string()])
958            .args(["--initial-balance", &initial_balance.to_string()]);
959
960        if let Some(owner) = owner {
961            command.args(["--owner", &owner.to_string()]);
962        }
963
964        if super_owner {
965            command.arg("--super-owner");
966        }
967
968        let stdout = command.spawn_and_wait_for_stdout().await?;
969        let mut split = stdout.split('\n');
970        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
971        let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
972        if let Some(owner) = owner {
973            assert_eq!(owner, new_owner);
974        }
975        Ok((chain_id, new_owner))
976    }
977
978    /// Runs `linera open-chain --super-owner`.
979    pub async fn open_chain_super_owner(
980        &self,
981        from: ChainId,
982        owner: Option<AccountOwner>,
983        initial_balance: Amount,
984    ) -> Result<(ChainId, AccountOwner)> {
985        self.open_chain_internal(from, owner, initial_balance, true)
986            .await
987    }
988
989    /// Runs `linera open-chain`.
990    pub async fn open_chain(
991        &self,
992        from: ChainId,
993        owner: Option<AccountOwner>,
994        initial_balance: Amount,
995    ) -> Result<(ChainId, AccountOwner)> {
996        self.open_chain_internal(from, owner, initial_balance, false)
997            .await
998    }
999
1000    /// Runs `linera open-chain` then `linera assign`.
1001    pub async fn open_and_assign(
1002        &self,
1003        client: &ClientWrapper,
1004        initial_balance: Amount,
1005    ) -> Result<ChainId> {
1006        let our_chain = self
1007            .load_wallet()?
1008            .default_chain()
1009            .context("no default chain found")?;
1010        let owner = client.keygen().await?;
1011        let (new_chain, _) = self
1012            .open_chain(our_chain, Some(owner), initial_balance)
1013            .await?;
1014        client.assign(owner, new_chain).await?;
1015        Ok(new_chain)
1016    }
1017
1018    pub async fn open_multi_owner_chain(
1019        &self,
1020        from: ChainId,
1021        owners: BTreeMap<AccountOwner, u64>,
1022        multi_leader_rounds: u32,
1023        balance: Amount,
1024        base_timeout_ms: u64,
1025    ) -> Result<ChainId> {
1026        let mut command = self.command().await?;
1027        command
1028            .arg("open-multi-owner-chain")
1029            .args(["--from", &from.to_string()])
1030            .arg("--owners")
1031            .arg(serde_json::to_string(&owners)?)
1032            .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
1033        command
1034            .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
1035            .args(["--initial-balance", &balance.to_string()]);
1036
1037        let stdout = command.spawn_and_wait_for_stdout().await?;
1038        let mut split = stdout.split('\n');
1039        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
1040
1041        Ok(chain_id)
1042    }
1043
1044    pub async fn change_ownership(
1045        &self,
1046        chain_id: ChainId,
1047        super_owners: Vec<AccountOwner>,
1048        owners: Vec<AccountOwner>,
1049    ) -> Result<()> {
1050        let mut command = self.command().await?;
1051        command
1052            .arg("change-ownership")
1053            .args(["--chain-id", &chain_id.to_string()]);
1054        command
1055            .arg("--super-owners")
1056            .arg(serde_json::to_string(&super_owners)?);
1057        command.arg("--owners").arg(serde_json::to_string(
1058            &owners
1059                .into_iter()
1060                .zip(std::iter::repeat(100u64))
1061                .collect::<BTreeMap<_, _>>(),
1062        )?);
1063        command.spawn_and_wait_for_stdout().await?;
1064        Ok(())
1065    }
1066
1067    pub async fn change_application_permissions(
1068        &self,
1069        chain_id: ChainId,
1070        application_permissions: ApplicationPermissions,
1071    ) -> Result<()> {
1072        let mut command = self.command().await?;
1073        command
1074            .arg("change-application-permissions")
1075            .args(["--chain-id", &chain_id.to_string()]);
1076        command.arg("--manage-chain").arg(serde_json::to_string(
1077            &application_permissions.manage_chain,
1078        )?);
1079        // TODO: add other fields
1080        command.spawn_and_wait_for_stdout().await?;
1081        Ok(())
1082    }
1083
1084    /// Runs `linera wallet follow-chain CHAIN_ID`.
1085    pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
1086        let mut command = self.command().await?;
1087        command
1088            .args(["wallet", "follow-chain"])
1089            .arg(chain_id.to_string());
1090        if sync {
1091            command.arg("--sync");
1092        }
1093        command.spawn_and_wait_for_stdout().await?;
1094        Ok(())
1095    }
1096
1097    /// Runs `linera wallet forget-chain CHAIN_ID`.
1098    pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
1099        let mut command = self.command().await?;
1100        command
1101            .args(["wallet", "forget-chain"])
1102            .arg(chain_id.to_string());
1103        command.spawn_and_wait_for_stdout().await?;
1104        Ok(())
1105    }
1106
1107    /// Runs `linera wallet set-default CHAIN_ID`.
1108    pub async fn set_default_chain(&self, chain_id: ChainId) -> Result<()> {
1109        let mut command = self.command().await?;
1110        command
1111            .args(["wallet", "set-default"])
1112            .arg(chain_id.to_string());
1113        command.spawn_and_wait_for_stdout().await?;
1114        Ok(())
1115    }
1116
1117    pub async fn retry_pending_block(
1118        &self,
1119        chain_id: Option<ChainId>,
1120    ) -> Result<Option<CryptoHash>> {
1121        let mut command = self.command().await?;
1122        command.arg("retry-pending-block");
1123        if let Some(chain_id) = chain_id {
1124            command.arg(chain_id.to_string());
1125        }
1126        let stdout = command.spawn_and_wait_for_stdout().await?;
1127        let stdout = stdout.trim();
1128        if stdout.is_empty() {
1129            Ok(None)
1130        } else {
1131            Ok(Some(CryptoHash::from_str(stdout)?))
1132        }
1133    }
1134
1135    /// Runs `linera publish-data-blob`.
1136    pub async fn publish_data_blob(
1137        &self,
1138        path: &Path,
1139        chain_id: Option<ChainId>,
1140    ) -> Result<CryptoHash> {
1141        let mut command = self.command().await?;
1142        command.arg("publish-data-blob").arg(path);
1143        if let Some(chain_id) = chain_id {
1144            command.arg(chain_id.to_string());
1145        }
1146        let stdout = command.spawn_and_wait_for_stdout().await?;
1147        let stdout = stdout.trim();
1148        Ok(CryptoHash::from_str(stdout)?)
1149    }
1150
1151    /// Runs `linera read-data-blob`.
1152    pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
1153        let mut command = self.command().await?;
1154        command.arg("read-data-blob").arg(hash.to_string());
1155        if let Some(chain_id) = chain_id {
1156            command.arg(chain_id.to_string());
1157        }
1158        command.spawn_and_wait_for_stdout().await?;
1159        Ok(())
1160    }
1161
1162    pub fn load_wallet(&self) -> Result<Wallet> {
1163        Ok(Wallet::read(&self.wallet_path())?)
1164    }
1165
1166    pub fn load_keystore(&self) -> Result<InMemorySigner> {
1167        util::read_json(self.keystore_path())
1168    }
1169
1170    pub fn wallet_path(&self) -> PathBuf {
1171        self.path_provider.path().join(&self.wallet)
1172    }
1173
1174    pub fn keystore_path(&self) -> PathBuf {
1175        self.path_provider.path().join(&self.keystore)
1176    }
1177
1178    pub fn storage_path(&self) -> &str {
1179        &self.storage
1180    }
1181
1182    pub fn get_owner(&self) -> Option<AccountOwner> {
1183        let wallet = self.load_wallet().ok()?;
1184        wallet
1185            .get(wallet.default_chain()?)
1186            .expect("default chain must be in wallet")
1187            .owner
1188    }
1189
1190    pub fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
1191        self.load_wallet()
1192            .ok()
1193            .is_some_and(|wallet| wallet.get(chain).is_some())
1194    }
1195
1196    pub async fn set_validator(
1197        &self,
1198        validator_key: &(String, String),
1199        port: usize,
1200        votes: usize,
1201    ) -> Result<()> {
1202        let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1203        self.command()
1204            .await?
1205            .arg("validator")
1206            .arg("add")
1207            .args(["--public-key", &validator_key.0])
1208            .args(["--account-key", &validator_key.1])
1209            .args(["--address", &address])
1210            .args(["--votes", &votes.to_string()])
1211            .spawn_and_wait_for_stdout()
1212            .await?;
1213        Ok(())
1214    }
1215
1216    pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
1217        self.command()
1218            .await?
1219            .arg("validator")
1220            .arg("remove")
1221            .args(["--public-key", validator_key])
1222            .spawn_and_wait_for_stdout()
1223            .await?;
1224        Ok(())
1225    }
1226
1227    pub async fn change_validators(
1228        &self,
1229        add_validators: &[(String, String, usize, usize)], // (public_key, account_key, port, votes)
1230        modify_validators: &[(String, String, usize, usize)], // (public_key, account_key, port, votes)
1231        remove_validators: &[String],
1232    ) -> Result<()> {
1233        use std::str::FromStr;
1234
1235        use linera_base::crypto::{AccountPublicKey, ValidatorPublicKey};
1236
1237        // Build a map that will be serialized to JSON
1238        // Use the exact types that deserialization expects
1239        let mut changes = std::collections::HashMap::new();
1240
1241        // Add/modify validators
1242        for (public_key_str, account_key_str, port, votes) in
1243            add_validators.iter().chain(modify_validators.iter())
1244        {
1245            let public_key = ValidatorPublicKey::from_str(public_key_str)
1246                .with_context(|| format!("Invalid validator public key: {public_key_str}"))?;
1247
1248            let account_key = AccountPublicKey::from_str(account_key_str)
1249                .with_context(|| format!("Invalid account public key: {account_key_str}"))?;
1250
1251            let address = format!("{}:127.0.0.1:{}", self.network.short(), port)
1252                .parse()
1253                .unwrap();
1254
1255            // Create ValidatorChange struct
1256            let change = crate::cli::validator::Change {
1257                account_key,
1258                address,
1259                votes: crate::cli::validator::Votes(
1260                    std::num::NonZero::new(*votes as u64).context("Votes must be non-zero")?,
1261                ),
1262            };
1263
1264            changes.insert(public_key, Some(change));
1265        }
1266
1267        // Remove validators (set to None)
1268        for validator_key_str in remove_validators {
1269            let public_key = ValidatorPublicKey::from_str(validator_key_str)
1270                .with_context(|| format!("Invalid validator public key: {validator_key_str}"))?;
1271            changes.insert(public_key, None);
1272        }
1273
1274        // Create temporary file with JSON
1275        let temp_file = tempfile::NamedTempFile::new()
1276            .context("Failed to create temporary file for validator changes")?;
1277        serde_json::to_writer(&temp_file, &changes)
1278            .context("Failed to write validator changes to file")?;
1279        let temp_path = temp_file.path();
1280
1281        self.command()
1282            .await?
1283            .arg("validator")
1284            .arg("update")
1285            .arg(temp_path)
1286            .arg("--yes") // Skip confirmation prompt
1287            .spawn_and_wait_for_stdout()
1288            .await?;
1289
1290        Ok(())
1291    }
1292
1293    pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
1294        self.command()
1295            .await?
1296            .arg("revoke-epochs")
1297            .arg(epoch.to_string())
1298            .spawn_and_wait_for_stdout()
1299            .await?;
1300        Ok(())
1301    }
1302
1303    pub async fn set_resource_control_policy(
1304        &self,
1305        overrides: ResourceControlPolicyOverrides,
1306    ) -> Result<()> {
1307        let mut command = self.command().await?;
1308        command.arg("resource-control-policy");
1309        let ResourceControlPolicyOverrides {
1310            wasm_fuel_unit,
1311            evm_fuel_unit,
1312            read_operation,
1313            write_operation,
1314            byte_runtime,
1315            byte_read,
1316            byte_written,
1317            blob_read,
1318            blob_published,
1319            blob_byte_read,
1320            blob_byte_published,
1321            operation,
1322            operation_byte,
1323            message,
1324            message_byte,
1325            service_as_oracle_query,
1326            http_request,
1327            maximum_wasm_fuel_per_block,
1328            maximum_evm_fuel_per_block,
1329            maximum_service_oracle_execution_ms,
1330            maximum_block_size,
1331            maximum_blob_size,
1332            maximum_published_blobs,
1333            maximum_bytecode_size,
1334            maximum_block_proposal_size,
1335            maximum_bytes_read_per_block,
1336            maximum_bytes_written_per_block,
1337            maximum_oracle_response_bytes,
1338            maximum_http_response_bytes,
1339            http_request_timeout_ms,
1340            http_request_allow_list,
1341            free_application_ids,
1342            flags,
1343        } = overrides;
1344        if let Some(value) = wasm_fuel_unit {
1345            command.args(["--wasm-fuel-unit", &value.to_string()]);
1346        }
1347        if let Some(value) = evm_fuel_unit {
1348            command.args(["--evm-fuel-unit", &value.to_string()]);
1349        }
1350        if let Some(value) = read_operation {
1351            command.args(["--read-operation", &value.to_string()]);
1352        }
1353        if let Some(value) = write_operation {
1354            command.args(["--write-operation", &value.to_string()]);
1355        }
1356        if let Some(value) = byte_runtime {
1357            command.args(["--byte-runtime", &value.to_string()]);
1358        }
1359        if let Some(value) = byte_read {
1360            command.args(["--byte-read", &value.to_string()]);
1361        }
1362        if let Some(value) = byte_written {
1363            command.args(["--byte-written", &value.to_string()]);
1364        }
1365        if let Some(value) = blob_read {
1366            command.args(["--blob-read", &value.to_string()]);
1367        }
1368        if let Some(value) = blob_published {
1369            command.args(["--blob-published", &value.to_string()]);
1370        }
1371        if let Some(value) = blob_byte_read {
1372            command.args(["--blob-byte-read", &value.to_string()]);
1373        }
1374        if let Some(value) = blob_byte_published {
1375            command.args(["--blob-byte-published", &value.to_string()]);
1376        }
1377        if let Some(value) = operation {
1378            command.args(["--operation", &value.to_string()]);
1379        }
1380        if let Some(value) = operation_byte {
1381            command.args(["--operation-byte", &value.to_string()]);
1382        }
1383        if let Some(value) = message {
1384            command.args(["--message", &value.to_string()]);
1385        }
1386        if let Some(value) = message_byte {
1387            command.args(["--message-byte", &value.to_string()]);
1388        }
1389        if let Some(value) = service_as_oracle_query {
1390            command.args(["--service-as-oracle-query", &value.to_string()]);
1391        }
1392        if let Some(value) = http_request {
1393            command.args(["--http-request", &value.to_string()]);
1394        }
1395        if let Some(value) = maximum_wasm_fuel_per_block {
1396            command.args(["--maximum-wasm-fuel-per-block", &value.to_string()]);
1397        }
1398        if let Some(value) = maximum_evm_fuel_per_block {
1399            command.args(["--maximum-evm-fuel-per-block", &value.to_string()]);
1400        }
1401        if let Some(value) = maximum_service_oracle_execution_ms {
1402            command.args(["--maximum-service-oracle-execution-ms", &value.to_string()]);
1403        }
1404        if let Some(value) = maximum_block_size {
1405            command.args(["--maximum-block-size", &value.to_string()]);
1406        }
1407        if let Some(value) = maximum_blob_size {
1408            command.args(["--maximum-blob-size", &value.to_string()]);
1409        }
1410        if let Some(value) = maximum_published_blobs {
1411            command.args(["--maximum-published-blobs", &value.to_string()]);
1412        }
1413        if let Some(value) = maximum_bytecode_size {
1414            command.args(["--maximum-bytecode-size", &value.to_string()]);
1415        }
1416        if let Some(value) = maximum_block_proposal_size {
1417            command.args(["--maximum-block-proposal-size", &value.to_string()]);
1418        }
1419        if let Some(value) = maximum_bytes_read_per_block {
1420            command.args(["--maximum-bytes-read-per-block", &value.to_string()]);
1421        }
1422        if let Some(value) = maximum_bytes_written_per_block {
1423            command.args(["--maximum-bytes-written-per-block", &value.to_string()]);
1424        }
1425        if let Some(value) = maximum_oracle_response_bytes {
1426            command.args(["--maximum-oracle-response-bytes", &value.to_string()]);
1427        }
1428        if let Some(value) = maximum_http_response_bytes {
1429            command.args(["--maximum-http-response-bytes", &value.to_string()]);
1430        }
1431        if let Some(value) = http_request_timeout_ms {
1432            command.args(["--http-request-timeout-ms", &value.to_string()]);
1433        }
1434        if let Some(values) = http_request_allow_list {
1435            command.args(["--http-request-allow-list", &values.join(",")]);
1436        }
1437        if let Some(values) = free_application_ids {
1438            command.args(["--free-application-ids", &values.join(",")]);
1439        }
1440        if let Some(values) = flags {
1441            command.args(["--flags", &values.join(",")]);
1442        }
1443        command.spawn_and_wait_for_stdout().await?;
1444        Ok(())
1445    }
1446
1447    /// Runs `linera keygen`.
1448    pub async fn keygen(&self) -> Result<AccountOwner> {
1449        let stdout = self
1450            .command()
1451            .await?
1452            .arg("keygen")
1453            .spawn_and_wait_for_stdout()
1454            .await?;
1455        AccountOwner::from_str(stdout.as_str().trim())
1456    }
1457
1458    /// Returns the default chain.
1459    pub fn default_chain(&self) -> Option<ChainId> {
1460        self.load_wallet().ok()?.default_chain()
1461    }
1462
1463    /// Runs `linera assign`.
1464    pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
1465        let _stdout = self
1466            .command()
1467            .await?
1468            .arg("assign")
1469            .args(["--owner", &owner.to_string()])
1470            .args(["--chain-id", &chain_id.to_string()])
1471            .spawn_and_wait_for_stdout()
1472            .await?;
1473        Ok(())
1474    }
1475
1476    /// Runs `linera set-preferred-owner` for `chain_id`.
1477    pub async fn set_preferred_owner(
1478        &self,
1479        chain_id: ChainId,
1480        owner: Option<AccountOwner>,
1481    ) -> Result<()> {
1482        let mut owner_arg = vec!["--owner".to_string()];
1483        if let Some(owner) = owner {
1484            owner_arg.push(owner.to_string());
1485        };
1486        self.command()
1487            .await?
1488            .arg("set-preferred-owner")
1489            .args(["--chain-id", &chain_id.to_string()])
1490            .args(owner_arg)
1491            .spawn_and_wait_for_stdout()
1492            .await?;
1493        Ok(())
1494    }
1495
1496    pub async fn build_application(
1497        &self,
1498        path: &Path,
1499        name: &str,
1500        is_workspace: bool,
1501    ) -> Result<(PathBuf, PathBuf)> {
1502        Command::new("cargo")
1503            .current_dir(self.path_provider.path())
1504            .arg("build")
1505            .arg("--release")
1506            .args(["--target", "wasm32-unknown-unknown"])
1507            .arg("--manifest-path")
1508            .arg(path.join("Cargo.toml"))
1509            .spawn_and_wait_for_stdout()
1510            .await?;
1511
1512        let release_dir = match is_workspace {
1513            true => path.join("../target/wasm32-unknown-unknown/release"),
1514            false => path.join("target/wasm32-unknown-unknown/release"),
1515        };
1516
1517        let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1518        let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1519
1520        let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1521        let service_size = fs_err::tokio::metadata(&service).await?.len();
1522        tracing::info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1523
1524        Ok((contract, service))
1525    }
1526}
1527
1528impl Drop for ClientWrapper {
1529    fn drop(&mut self) {
1530        use std::process::Command as SyncCommand;
1531
1532        if self.on_drop != OnClientDrop::CloseChains {
1533            return;
1534        }
1535
1536        let Ok(binary_path) = self.binary_path.lock() else {
1537            tracing::error!(
1538                "Failed to close chains because a thread panicked with a lock to `binary_path`"
1539            );
1540            return;
1541        };
1542
1543        let Some(binary_path) = binary_path.as_ref() else {
1544            tracing::warn!(
1545                "Assuming no chains need to be closed, because the command binary was never \
1546                resolved and therefore presumably never called"
1547            );
1548            return;
1549        };
1550
1551        let working_directory = self.path_provider.path();
1552        let mut wallet_show_command = SyncCommand::new(binary_path);
1553
1554        for argument in self.command_arguments() {
1555            wallet_show_command.arg(&*argument);
1556        }
1557
1558        let Ok(wallet_show_output) = wallet_show_command
1559            .current_dir(working_directory)
1560            .args(["wallet", "show", "--short", "--owned"])
1561            .output()
1562        else {
1563            tracing::warn!("Failed to execute `wallet show --short` to list chains to close");
1564            return;
1565        };
1566
1567        if !wallet_show_output.status.success() {
1568            tracing::warn!("Failed to list chains in the wallet to close them");
1569            return;
1570        }
1571
1572        let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1573            tracing::warn!(
1574                "Failed to close chains because `linera wallet show --short` \
1575                returned a non-UTF-8 output"
1576            );
1577            return;
1578        };
1579
1580        let chain_ids = chain_list_string
1581            .split('\n')
1582            .map(|line| line.trim())
1583            .filter(|line| !line.is_empty());
1584
1585        for chain_id in chain_ids {
1586            let mut close_chain_command = SyncCommand::new(binary_path);
1587
1588            for argument in self.command_arguments() {
1589                close_chain_command.arg(&*argument);
1590            }
1591
1592            close_chain_command.current_dir(working_directory);
1593
1594            match close_chain_command.args(["close-chain", chain_id]).status() {
1595                Ok(status) if status.success() => (),
1596                Ok(failure) => tracing::warn!("Failed to close chain {chain_id}: {failure}"),
1597                Err(error) => tracing::warn!("Failed to close chain {chain_id}: {error}"),
1598            }
1599        }
1600    }
1601}
1602
1603#[cfg(with_testing)]
1604impl ClientWrapper {
1605    pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1606        self.build_application(Self::example_path(name)?.as_path(), name, true)
1607            .await
1608    }
1609
1610    pub async fn build_test_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1611        self.build_application(Self::test_example_path(name)?.as_path(), name, true)
1612            .await
1613    }
1614
1615    pub fn example_path(name: &str) -> Result<PathBuf> {
1616        Ok(env::current_dir()?.join("../examples/").join(name))
1617    }
1618
1619    pub fn test_example_path(name: &str) -> Result<PathBuf> {
1620        Ok(env::current_dir()?
1621            .join("../linera-sdk/tests/fixtures/")
1622            .join(name))
1623    }
1624}
1625
1626fn truncate_query_output(input: &str) -> String {
1627    let max_len = 1000;
1628    if input.len() < max_len {
1629        input.to_string()
1630    } else {
1631        format!("{} ...", input.get(..max_len).unwrap())
1632    }
1633}
1634
1635fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1636    let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1637    let max_len = 200;
1638    if query.len() < max_len {
1639        query
1640    } else {
1641        format!("{} ...", query.get(..max_len).unwrap())
1642    }
1643}
1644
1645/// A running node service.
1646/// Logs a warning if a child process has already exited when its wrapper is dropped.
1647/// On Unix, includes the signal number if the process was killed (e.g. signal 9 = OOM).
1648fn log_unexpected_exit(child: &mut Child, service_kind: &str, port: u16) {
1649    match child.try_wait() {
1650        Ok(Some(status)) => {
1651            #[cfg(unix)]
1652            {
1653                use std::os::unix::process::ExitStatusExt as _;
1654                if let Some(signal) = status.signal() {
1655                    tracing::error!(
1656                        port,
1657                        signal,
1658                        "The {service_kind} service was killed by signal {signal}",
1659                    );
1660                    return;
1661                }
1662            }
1663            if !status.success() {
1664                tracing::error!(
1665                    port,
1666                    %status,
1667                    "The {service_kind} service exited unexpectedly with {status}",
1668                );
1669            }
1670        }
1671        Ok(None) => {} // Still running — normal case when terminate() was called.
1672        Err(error) => {
1673            tracing::warn!(
1674                port,
1675                %error,
1676                "Failed to check {service_kind} service status",
1677            );
1678        }
1679    }
1680}
1681
1682pub struct NodeService {
1683    port: u16,
1684    child: Child,
1685    terminated: bool,
1686}
1687
1688impl Drop for NodeService {
1689    fn drop(&mut self) {
1690        if !self.terminated {
1691            log_unexpected_exit(&mut self.child, "node", self.port);
1692        }
1693    }
1694}
1695
1696impl NodeService {
1697    fn new(port: u16, child: Child) -> Self {
1698        Self {
1699            port,
1700            child,
1701            terminated: false,
1702        }
1703    }
1704
1705    pub async fn terminate(mut self) -> Result<()> {
1706        self.terminated = true;
1707        self.child.kill().await.context("terminating node service")
1708    }
1709
1710    pub fn port(&self) -> u16 {
1711        self.port
1712    }
1713
1714    pub fn ensure_is_running(&mut self) -> Result<()> {
1715        self.child.ensure_is_running()
1716    }
1717
1718    pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1719        let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1720        let mut data = self.query_node(query).await?;
1721        Ok(serde_json::from_value(data["processInbox"].take())?)
1722    }
1723
1724    pub async fn sync(&self, chain_id: &ChainId) -> Result<u64> {
1725        let query = format!("mutation {{ sync(chainId: \"{chain_id}\") }}");
1726        let mut data = self.query_node(query).await?;
1727        Ok(serde_json::from_value(data["sync"].take())?)
1728    }
1729
1730    pub async fn transfer(
1731        &self,
1732        chain_id: ChainId,
1733        owner: AccountOwner,
1734        recipient: Account,
1735        amount: Amount,
1736    ) -> Result<CryptoHash> {
1737        let json_owner = owner.to_value();
1738        let json_recipient = recipient.to_value();
1739        let query = format!(
1740            "mutation {{ transfer(\
1741                 chainId: \"{chain_id}\", \
1742                 owner: {json_owner}, \
1743                 recipient: {json_recipient}, \
1744                 amount: \"{amount}\") \
1745             }}"
1746        );
1747        let data = self.query_node(query).await?;
1748        serde_json::from_value(data["transfer"].clone())
1749            .context("missing transfer field in response")
1750    }
1751
1752    pub async fn balance(&self, account: &Account) -> Result<Amount> {
1753        let chain = account.chain_id;
1754        let owner = account.owner;
1755        if matches!(owner, AccountOwner::CHAIN) {
1756            let query = format!(
1757                "query {{ chain(chainId:\"{chain}\") {{
1758                    executionState {{ system {{ balance }} }}
1759                }} }}"
1760            );
1761            let response = self.query_node(query).await?;
1762            let balance = &response["chain"]["executionState"]["system"]["balance"]
1763                .as_str()
1764                .unwrap();
1765            return Ok(Amount::from_str(balance)?);
1766        }
1767        let query = format!(
1768            "query {{ chain(chainId:\"{chain}\") {{
1769                executionState {{ system {{ balances {{
1770                    entry(key:\"{owner}\") {{ value }}
1771                }} }} }}
1772            }} }}"
1773        );
1774        let response = self.query_node(query).await?;
1775        let balances = &response["chain"]["executionState"]["system"]["balances"];
1776        let balance = balances["entry"]["value"].as_str();
1777        match balance {
1778            None => Ok(Amount::ZERO),
1779            Some(amount) => Ok(Amount::from_str(amount)?),
1780        }
1781    }
1782
1783    pub fn make_application<A: ContractAbi>(
1784        &self,
1785        chain_id: &ChainId,
1786        application_id: &ApplicationId<A>,
1787    ) -> Result<ApplicationWrapper<A>> {
1788        let application_id = application_id.forget_abi().to_string();
1789        let link = format!(
1790            "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1791            self.port
1792        );
1793        Ok(ApplicationWrapper::from(link))
1794    }
1795
1796    pub async fn publish_data_blob(
1797        &self,
1798        chain_id: &ChainId,
1799        bytes: Vec<u8>,
1800    ) -> Result<CryptoHash> {
1801        let query = format!(
1802            "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1803            chain_id.to_value(),
1804            bytes.to_value(),
1805        );
1806        let data = self.query_node(query).await?;
1807        serde_json::from_value(data["publishDataBlob"].clone())
1808            .context("missing publishDataBlob field in response")
1809    }
1810
1811    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1812        &self,
1813        chain_id: &ChainId,
1814        contract: PathBuf,
1815        service: PathBuf,
1816        vm_runtime: VmRuntime,
1817    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1818        let contract_code = Bytecode::load_from_file(&contract).await?;
1819        let service_code = Bytecode::load_from_file(&service).await?;
1820        let query = format!(
1821            "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1822            chain_id.to_value(),
1823            contract_code.to_value(),
1824            service_code.to_value(),
1825            vm_runtime.to_value(),
1826        );
1827        let data = self.query_node(query).await?;
1828        let module_str = data["publishModule"]
1829            .as_str()
1830            .context("module ID not found")?;
1831        let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1832        Ok(module_id.with_abi())
1833    }
1834
1835    pub async fn query_committee_hash(&self, chain_id: &ChainId) -> Result<Option<CryptoHash>> {
1836        let query = format!(
1837            "query {{ chain(chainId:\"{chain_id}\") {{
1838                executionState {{ system {{ committeeHash }} }}
1839            }} }}"
1840        );
1841        let mut response = self.query_node(query).await?;
1842        let hash = response["chain"]["executionState"]["system"]["committeeHash"].take();
1843        Ok(serde_json::from_value(hash)?)
1844    }
1845
1846    pub async fn query_chain_epoch(&self, chain_id: &ChainId) -> Result<Epoch> {
1847        let query = format!(
1848            "query {{ chain(chainId:\"{chain_id}\") {{
1849                executionState {{ system {{ epoch }} }}
1850            }} }}"
1851        );
1852        let mut response = self.query_node(query).await?;
1853        let epoch = response["chain"]["executionState"]["system"]["epoch"].take();
1854        Ok(serde_json::from_value(epoch)?)
1855    }
1856
1857    pub async fn events_from_index(
1858        &self,
1859        chain_id: &ChainId,
1860        stream_id: &StreamId,
1861        start_index: u32,
1862    ) -> Result<Vec<IndexAndEvent>> {
1863        let query = format!(
1864            "query {{
1865               eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1866               {{ index event }}
1867             }}",
1868            stream_id.to_value()
1869        );
1870        let mut response = self.query_node(query).await?;
1871        let response = response["eventsFromIndex"].take();
1872        Ok(serde_json::from_value(response)?)
1873    }
1874
1875    pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1876        let n_try = 5;
1877        let query = query.as_ref();
1878        for i in 0..n_try {
1879            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1880            let url = format!("http://localhost:{}/", self.port);
1881            let client = reqwest_client();
1882            let result = client
1883                .post(url)
1884                .json(&json!({ "query": query }))
1885                .send()
1886                .await;
1887            if matches!(result, Err(ref error) if error.is_timeout()) {
1888                tracing::warn!(
1889                    "Timeout when sending query {} to the node service",
1890                    truncate_query_output(query)
1891                );
1892                continue;
1893            }
1894            let response = result.with_context(|| {
1895                format!(
1896                    "query_node: failed to post query={}",
1897                    truncate_query_output(query)
1898                )
1899            })?;
1900            ensure!(
1901                response.status().is_success(),
1902                "Query \"{}\" failed: {}",
1903                truncate_query_output(query),
1904                response
1905                    .text()
1906                    .await
1907                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1908            );
1909            let value: Value = response.json().await.context("invalid JSON")?;
1910            if let Some(errors) = value.get("errors") {
1911                tracing::warn!(
1912                    "Query \"{}\" failed: {}",
1913                    truncate_query_output(query),
1914                    errors
1915                );
1916            } else {
1917                return Ok(value["data"].clone());
1918            }
1919        }
1920        bail!(
1921            "Query \"{}\" failed after {} retries.",
1922            truncate_query_output(query),
1923            n_try
1924        );
1925    }
1926
1927    pub async fn create_application<
1928        Abi: ContractAbi,
1929        Parameters: Serialize,
1930        InstantiationArgument: Serialize,
1931    >(
1932        &self,
1933        chain_id: &ChainId,
1934        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1935        parameters: &Parameters,
1936        argument: &InstantiationArgument,
1937        required_application_ids: &[ApplicationId],
1938    ) -> Result<ApplicationId<Abi>> {
1939        let module_id = module_id.forget_abi();
1940        let json_required_applications_ids = required_application_ids
1941            .iter()
1942            .map(ApplicationId::to_string)
1943            .collect::<Vec<_>>()
1944            .to_value();
1945        // Convert to `serde_json::Value` then `async_graphql::Value` via the trait `InputType`.
1946        let new_parameters = serde_json::to_value(parameters)
1947            .context("could not create parameters JSON")?
1948            .to_value();
1949        let new_argument = serde_json::to_value(argument)
1950            .context("could not create argument JSON")?
1951            .to_value();
1952        let query = format!(
1953            "mutation {{ createApplication(\
1954                 chainId: \"{chain_id}\",
1955                 moduleId: \"{module_id}\", \
1956                 parameters: {new_parameters}, \
1957                 instantiationArgument: {new_argument}, \
1958                 requiredApplicationIds: {json_required_applications_ids}) \
1959             }}"
1960        );
1961        let data = self.query_node(query).await?;
1962        let app_id_str = data["createApplication"]
1963            .as_str()
1964            .context("missing createApplication string in response")?
1965            .trim();
1966        Ok(app_id_str
1967            .parse::<ApplicationId>()
1968            .context("invalid application ID")?
1969            .with_abi())
1970    }
1971
1972    /// Obtains the hash and height of the `chain`'s tip block, as known by this node service.
1973    pub async fn chain_tip(&self, chain: ChainId) -> Result<Option<(CryptoHash, BlockHeight)>> {
1974        let query = format!(
1975            r#"query {{ block(chainId: "{chain}") {{
1976                hash
1977                block {{ header {{ height }} }}
1978            }} }}"#
1979        );
1980
1981        let mut response = self.query_node(&query).await?;
1982
1983        match (
1984            mem::take(&mut response["block"]["hash"]),
1985            mem::take(&mut response["block"]["block"]["header"]["height"]),
1986        ) {
1987            (Value::Null, Value::Null) => Ok(None),
1988            (Value::String(hash), Value::Number(height)) => Ok(Some((
1989                hash.parse()
1990                    .context("Received an invalid hash {hash:?} for chain tip")?,
1991                BlockHeight(height.as_u64().unwrap()),
1992            ))),
1993            invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
1994        }
1995    }
1996
1997    /// Subscribes to the node service and returns a stream of notifications about a chain.
1998    pub async fn notifications(
1999        &self,
2000        chain_id: ChainId,
2001    ) -> Result<Pin<Box<impl Stream<Item = Result<Notification>>>>> {
2002        let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
2003        let url = format!("ws://localhost:{}/ws", self.port);
2004        let mut request = url.into_client_request()?;
2005        request.headers_mut().insert(
2006            "Sec-WebSocket-Protocol",
2007            HeaderValue::from_str("graphql-transport-ws")?,
2008        );
2009        let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
2010        let init_json = json!({
2011          "type": "connection_init",
2012          "payload": {}
2013        });
2014        websocket.send(init_json.to_string().into()).await?;
2015        let text = websocket
2016            .next()
2017            .await
2018            .context("Failed to establish connection")??
2019            .into_text()?;
2020        ensure!(
2021            text == "{\"type\":\"connection_ack\"}",
2022            "Unexpected response: {text}"
2023        );
2024        let query_json = json!({
2025          "id": "1",
2026          "type": "start",
2027          "payload": {
2028            "query": query,
2029            "variables": {},
2030            "operationName": null
2031          }
2032        });
2033        websocket.send(query_json.to_string().into()).await?;
2034        Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
2035            |message| async {
2036                let text = message.into_text()?;
2037                let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
2038                if let Some(errors) = value["payload"].get("errors") {
2039                    bail!("Notification subscription failed: {errors:?}");
2040                }
2041                serde_json::from_value(value["payload"]["data"]["notifications"].clone())
2042                    .context("Failed to deserialize notification")
2043            },
2044        )))
2045    }
2046
2047    /// Subscribes to query results via the `queryResult` GraphQL subscription.
2048    pub async fn query_result(
2049        &self,
2050        name: &str,
2051        chain_id: ChainId,
2052        application_id: &ApplicationId,
2053    ) -> Result<Pin<Box<impl Stream<Item = Result<Value>>>>> {
2054        let query = format!(
2055            r#"subscription {{ queryResult(name: "{name}", chainId: "{chain_id}", applicationId: "{application_id}") }}"#,
2056        );
2057        let url = format!("ws://localhost:{}/ws", self.port);
2058        let mut request = url.into_client_request()?;
2059        request.headers_mut().insert(
2060            "Sec-WebSocket-Protocol",
2061            HeaderValue::from_str("graphql-transport-ws")?,
2062        );
2063        let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
2064        let init_json = json!({
2065          "type": "connection_init",
2066          "payload": {}
2067        });
2068        websocket.send(init_json.to_string().into()).await?;
2069        let text = websocket
2070            .next()
2071            .await
2072            .context("Failed to establish connection")??
2073            .into_text()?;
2074        ensure!(
2075            text == "{\"type\":\"connection_ack\"}",
2076            "Unexpected response: {text}"
2077        );
2078        let query_json = json!({
2079          "id": "1",
2080          "type": "start",
2081          "payload": {
2082            "query": query,
2083            "variables": {},
2084            "operationName": null
2085          }
2086        });
2087        websocket.send(query_json.to_string().into()).await?;
2088        Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
2089            |message| async {
2090                let text = message.into_text()?;
2091                let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
2092                if let Some(errors) = value["payload"].get("errors") {
2093                    bail!("Query result subscription failed: {errors:?}");
2094                }
2095                Ok(value["payload"]["data"]["queryResult"].clone())
2096            },
2097        )))
2098    }
2099}
2100
2101/// A running faucet service.
2102pub struct FaucetService {
2103    port: u16,
2104    child: Child,
2105    _temp_dir: tempfile::TempDir,
2106    terminated: bool,
2107}
2108
2109impl Drop for FaucetService {
2110    fn drop(&mut self) {
2111        if !self.terminated {
2112            log_unexpected_exit(&mut self.child, "faucet", self.port);
2113        }
2114    }
2115}
2116
2117impl FaucetService {
2118    fn new(port: u16, child: Child, temp_dir: tempfile::TempDir) -> Self {
2119        Self {
2120            port,
2121            child,
2122            _temp_dir: temp_dir,
2123            terminated: false,
2124        }
2125    }
2126
2127    pub async fn terminate(mut self) -> Result<()> {
2128        self.terminated = true;
2129        self.child
2130            .kill()
2131            .await
2132            .context("terminating faucet service")
2133    }
2134
2135    pub fn ensure_is_running(&mut self) -> Result<()> {
2136        self.child.ensure_is_running()
2137    }
2138
2139    pub fn instance(&self) -> Faucet {
2140        Faucet::new(format!("http://localhost:{}/", self.port))
2141    }
2142}
2143
2144/// A running `Application` to be queried in GraphQL.
2145pub struct ApplicationWrapper<A> {
2146    uri: String,
2147    _phantom: PhantomData<A>,
2148}
2149
2150impl<A> ApplicationWrapper<A> {
2151    pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
2152        let query = query.as_ref();
2153        let value = self.run_json_query(json!({ "query": query })).await?;
2154        Ok(value["data"].clone())
2155    }
2156
2157    pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
2158        const MAX_RETRIES: usize = 5;
2159
2160        for i in 0.. {
2161            let client = reqwest_client();
2162            let result = client.post(&self.uri).json(&query).send().await;
2163            let response = match result {
2164                Ok(response) => response,
2165                Err(error) if i < MAX_RETRIES => {
2166                    tracing::warn!(
2167                        "Failed to post query \"{}\": {error}; retrying",
2168                        truncate_query_output_serialize(&query),
2169                    );
2170                    continue;
2171                }
2172                Err(error) => {
2173                    let query = truncate_query_output_serialize(&query);
2174                    return Err(error)
2175                        .with_context(|| format!("run_json_query: failed to post query={query}"));
2176                }
2177            };
2178            ensure!(
2179                response.status().is_success(),
2180                "Query \"{}\" failed: {}",
2181                truncate_query_output_serialize(&query),
2182                response
2183                    .text()
2184                    .await
2185                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
2186            );
2187            let value: Value = response.json().await.context("invalid JSON")?;
2188            if let Some(errors) = value.get("errors") {
2189                bail!(
2190                    "Query \"{}\" failed: {}",
2191                    truncate_query_output_serialize(&query),
2192                    errors
2193                );
2194            }
2195            return Ok(value);
2196        }
2197        unreachable!()
2198    }
2199
2200    pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
2201        let query = query.as_ref();
2202        self.run_graphql_query(&format!("query {{ {query} }}"))
2203            .await
2204    }
2205
2206    pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
2207        let query = query.as_ref().trim();
2208        let name = query
2209            .split_once(|ch: char| !ch.is_alphanumeric())
2210            .map_or(query, |(name, _)| name);
2211        let data = self.query(query).await?;
2212        serde_json::from_value(data[name].clone())
2213            .with_context(|| format!("{name} field missing in response"))
2214    }
2215
2216    pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
2217        let mutation = mutation.as_ref();
2218        self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
2219            .await
2220    }
2221
2222    pub async fn multiple_mutate(&self, mutations: &[String]) -> Result<Value> {
2223        let mut out = String::from("mutation {\n");
2224        for (index, mutation) in mutations.iter().enumerate() {
2225            out = format!("{out}  u{index}: {mutation}\n");
2226        }
2227        out.push_str("}\n");
2228        self.run_graphql_query(&out).await
2229    }
2230}
2231
2232impl<A> From<String> for ApplicationWrapper<A> {
2233    fn from(uri: String) -> ApplicationWrapper<A> {
2234        ApplicationWrapper {
2235            uri,
2236            _phantom: PhantomData,
2237        }
2238    }
2239}
2240
2241/// Returns the timeout for tests that wait for notifications, either read from the env
2242/// variable `LINERA_TEST_NOTIFICATION_TIMEOUT_MS`, or the default value of 10 seconds.
2243#[cfg(with_testing)]
2244fn notification_timeout() -> Duration {
2245    const NOTIFICATION_TIMEOUT_MS_ENV: &str = "LINERA_TEST_NOTIFICATION_TIMEOUT_MS";
2246    const NOTIFICATION_TIMEOUT_MS_DEFAULT: u64 = 10_000;
2247
2248    match env::var(NOTIFICATION_TIMEOUT_MS_ENV) {
2249        Ok(var) => Duration::from_millis(var.parse().unwrap_or_else(|error| {
2250            panic!("{NOTIFICATION_TIMEOUT_MS_ENV} is not a valid number: {error}")
2251        })),
2252        Err(env::VarError::NotPresent) => Duration::from_millis(NOTIFICATION_TIMEOUT_MS_DEFAULT),
2253        Err(env::VarError::NotUnicode(_)) => {
2254            panic!("{NOTIFICATION_TIMEOUT_MS_ENV} must be valid Unicode")
2255        }
2256    }
2257}
2258
2259#[cfg(with_testing)]
2260pub trait NotificationsExt {
2261    /// Waits for a notification for which `f` returns `Some(t)`, and returns `t`.
2262    fn wait_for<T>(
2263        &mut self,
2264        f: impl FnMut(Notification) -> Option<T>,
2265    ) -> impl Future<Output = Result<T>>;
2266
2267    /// Waits for a `NewEvents` notification for the given block height. If no height is specified,
2268    /// any height is accepted.
2269    fn wait_for_events(
2270        &mut self,
2271        expected_height: impl Into<Option<BlockHeight>>,
2272    ) -> impl Future<Output = Result<BTreeSet<StreamId>>> {
2273        let expected_height = expected_height.into();
2274        self.wait_for(move |notification| {
2275            if let Reason::NewEvents {
2276                height,
2277                event_streams,
2278                ..
2279            } = notification.reason
2280            {
2281                if expected_height.is_none_or(|h| h == height) {
2282                    return Some(event_streams);
2283                }
2284            }
2285            None
2286        })
2287    }
2288
2289    /// Waits for a `NewBlock` notification for the given block height. If no height is specified,
2290    /// any height is accepted.
2291    fn wait_for_block(
2292        &mut self,
2293        expected_height: impl Into<Option<BlockHeight>>,
2294    ) -> impl Future<Output = Result<CryptoHash>> {
2295        let expected_height = expected_height.into();
2296        self.wait_for(move |notification| {
2297            if let Reason::NewBlock { height, hash, .. } = notification.reason {
2298                if expected_height.is_none_or(|h| h == height) {
2299                    return Some(hash);
2300                }
2301            }
2302            None
2303        })
2304    }
2305
2306    /// Waits for a `NewIncomingBundle` notification for the given sender chain and sender block
2307    /// height. If no height is specified, any height is accepted.
2308    fn wait_for_bundle(
2309        &mut self,
2310        expected_origin: ChainId,
2311        expected_height: impl Into<Option<BlockHeight>>,
2312    ) -> impl Future<Output = Result<()>> {
2313        let expected_height = expected_height.into();
2314        self.wait_for(move |notification| {
2315            if let Reason::NewIncomingBundle { height, origin } = notification.reason {
2316                if expected_height.is_none_or(|h| h == height) && origin == expected_origin {
2317                    return Some(());
2318                }
2319            }
2320            None
2321        })
2322    }
2323}
2324
2325#[cfg(with_testing)]
2326impl<S: Stream<Item = Result<Notification>>> NotificationsExt for Pin<Box<S>> {
2327    async fn wait_for<T>(&mut self, mut f: impl FnMut(Notification) -> Option<T>) -> Result<T> {
2328        let mut timeout = Box::pin(linera_base::time::timer::sleep(notification_timeout())).fuse();
2329        loop {
2330            let notification = futures::select! {
2331                () = timeout => bail!("Timeout waiting for notification"),
2332                notification = self.next().fuse() => notification.context("Stream closed")??,
2333            };
2334            if let Some(t) = f(notification) {
2335                return Ok(t);
2336            }
2337        }
2338    }
2339}