Skip to main content

linera_service/cli_wrappers/
wallet.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    borrow::Cow,
6    collections::BTreeMap,
7    env,
8    marker::PhantomData,
9    mem,
10    path::{Path, PathBuf},
11    pin::Pin,
12    process::Stdio,
13    str::FromStr,
14    sync,
15    time::Duration,
16};
17
18use anyhow::{bail, ensure, Context, Result};
19use async_graphql::InputType;
20use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
21use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
22use heck::ToKebabCase;
23use linera_base::{
24    abi::ContractAbi,
25    command::{resolve_binary, CommandExt},
26    crypto::{CryptoHash, InMemorySigner},
27    data_types::{Amount, ApplicationPermissions, BlockHeight, Bytecode, Epoch},
28    identifiers::{
29        Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
30    },
31    vm::VmRuntime,
32};
33use linera_client::client_options::ResourceControlPolicyConfig;
34use linera_core::worker::Notification;
35use linera_faucet_client::Faucet;
36use serde::{de::DeserializeOwned, ser::Serialize};
37use serde_command_opts::to_args;
38use serde_json::{json, Value};
39use tempfile::TempDir;
40use tokio::{
41    io::{AsyncBufReadExt, BufReader},
42    process::{Child, Command},
43    sync::oneshot,
44    task::JoinHandle,
45};
46#[cfg(with_testing)]
47use {
48    futures::FutureExt as _,
49    linera_core::worker::Reason,
50    std::{collections::BTreeSet, future::Future},
51};
52
53use crate::{
54    cli::command::{BenchmarkCommand, ResourceControlPolicyOverrides},
55    cli_wrappers::{
56        local_net::{PathProvider, ProcessInbox},
57        Network,
58    },
59    util::{self, ChildExt},
60    Wallet,
61};
62
63/// The name of the environment variable that allows specifying additional arguments to be passed
64/// to the node-service command of the client.
65const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
66
67fn reqwest_client() -> reqwest::Client {
68    reqwest::ClientBuilder::new()
69        .timeout(Duration::from_secs(60))
70        .build()
71        .unwrap()
72}
73
74/// Wrapper to run a Linera client command.
75pub struct ClientWrapper {
76    binary_path: sync::Mutex<Option<PathBuf>>,
77    testing_prng_seed: Option<u64>,
78    storage: String,
79    wallet: String,
80    keystore: String,
81    max_pending_message_bundles: usize,
82    network: Network,
83    /// Provides the working directory for the client's files.
84    pub path_provider: PathProvider,
85    on_drop: OnClientDrop,
86    extra_args: Vec<String>,
87}
88
89/// Action to perform when the [`ClientWrapper`] is dropped.
90#[derive(Clone, Copy, Debug, Eq, PartialEq)]
91pub enum OnClientDrop {
92    /// Close all the chains on the wallet.
93    CloseChains,
94    /// Do not close any chains, leaving them active.
95    LeakChains,
96}
97
98impl ClientWrapper {
99    /// Creates a new client wrapper with the default extra arguments.
100    pub fn new(
101        path_provider: PathProvider,
102        network: Network,
103        testing_prng_seed: Option<u64>,
104        id: usize,
105        on_drop: OnClientDrop,
106    ) -> Self {
107        Self::new_with_extra_args(
108            path_provider,
109            network,
110            testing_prng_seed,
111            id,
112            on_drop,
113            vec!["--wait-for-outgoing-messages".to_string()],
114            None,
115        )
116    }
117
118    /// Creates a new client wrapper with the given extra arguments and binary directory.
119    pub fn new_with_extra_args(
120        path_provider: PathProvider,
121        network: Network,
122        testing_prng_seed: Option<u64>,
123        id: usize,
124        on_drop: OnClientDrop,
125        extra_args: Vec<String>,
126        binary_dir: Option<PathBuf>,
127    ) -> Self {
128        let storage = format!(
129            "rocksdb:{}/client_{}.db",
130            path_provider.path().display(),
131            id
132        );
133        let wallet = format!("wallet_{id}.json");
134        let keystore = format!("keystore_{id}.json");
135        let binary_path = binary_dir.map(|dir| dir.join("linera"));
136        Self {
137            binary_path: sync::Mutex::new(binary_path),
138            testing_prng_seed,
139            storage,
140            wallet,
141            keystore,
142            max_pending_message_bundles: 10_000,
143            network,
144            path_provider,
145            on_drop,
146            extra_args,
147        }
148    }
149
150    /// Runs `linera project new`.
151    pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
152        let tmp = TempDir::new()?;
153        let mut command = self.command().await?;
154        command
155            .current_dir(tmp.path())
156            .arg("project")
157            .arg("new")
158            .arg(project_name)
159            .arg("--linera-root")
160            .arg(linera_root)
161            .spawn_and_wait_for_stdout()
162            .await?;
163        Ok(tmp)
164    }
165
166    /// Runs `linera project publish`.
167    pub async fn project_publish<T: Serialize>(
168        &self,
169        path: PathBuf,
170        required_application_ids: Vec<String>,
171        publisher: impl Into<Option<ChainId>>,
172        argument: &T,
173    ) -> Result<String> {
174        let json_parameters = serde_json::to_string(&())?;
175        let json_argument = serde_json::to_string(argument)?;
176        let mut command = self.command().await?;
177        command
178            .arg("project")
179            .arg("publish-and-create")
180            .arg(path)
181            .args(publisher.into().iter().map(ChainId::to_string))
182            .args(["--json-parameters", &json_parameters])
183            .args(["--json-argument", &json_argument]);
184        if !required_application_ids.is_empty() {
185            command.arg("--required-application-ids");
186            command.args(required_application_ids);
187        }
188        let stdout = command.spawn_and_wait_for_stdout().await?;
189        Ok(stdout.trim().to_string())
190    }
191
192    /// Runs `linera project test`.
193    pub async fn project_test(&self, path: &Path) -> Result<()> {
194        self.command()
195            .await
196            .context("failed to create project test command")?
197            .current_dir(path)
198            .arg("project")
199            .arg("test")
200            .spawn_and_wait_for_stdout()
201            .await?;
202        Ok(())
203    }
204
205    async fn command_with_envs_and_arguments(
206        &self,
207        envs: &[(&str, &str)],
208        arguments: impl IntoIterator<Item = Cow<'_, str>>,
209    ) -> Result<Command> {
210        let mut command = self.command_binary().await?;
211        command.current_dir(self.path_provider.path());
212        for (key, value) in envs {
213            command.env(key, value);
214        }
215        for argument in arguments {
216            command.arg(&*argument);
217        }
218        Ok(command)
219    }
220
221    async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
222        self.command_with_envs_and_arguments(envs, self.command_arguments())
223            .await
224    }
225
226    async fn command_with_arguments(
227        &self,
228        arguments: impl IntoIterator<Item = Cow<'_, str>>,
229    ) -> Result<Command> {
230        self.command_with_envs_and_arguments(
231            &[(
232                "RUST_LOG",
233                &std::env::var("RUST_LOG").unwrap_or_else(|_| String::from("linera=debug")),
234            )],
235            arguments,
236        )
237        .await
238    }
239
240    async fn command(&self) -> Result<Command> {
241        self.command_with_envs(&[(
242            "RUST_LOG",
243            &std::env::var("RUST_LOG").unwrap_or_else(|_| String::from("linera=debug")),
244        )])
245        .await
246    }
247
248    fn required_command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
249        [
250            "--wallet".into(),
251            self.wallet.as_str().into(),
252            "--keystore".into(),
253            self.keystore.as_str().into(),
254            "--storage".into(),
255            self.storage.as_str().into(),
256            "--send-timeout-ms".into(),
257            "500000".into(),
258            "--recv-timeout-ms".into(),
259            "500000".into(),
260        ]
261        .into_iter()
262        .chain(self.extra_args.iter().map(|s| s.as_str().into()))
263    }
264
265    /// Returns an iterator over the arguments that should be added to all command invocations.
266    fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
267        self.required_command_arguments().chain([
268            "--max-pending-message-bundles".into(),
269            self.max_pending_message_bundles.to_string().into(),
270        ])
271    }
272
273    /// Returns the [`Command`] instance configured to run the appropriate binary.
274    ///
275    /// The path is resolved once and cached inside `self` for subsequent usages.
276    async fn command_binary(&self) -> Result<Command> {
277        match self.command_with_cached_binary_path() {
278            Some(command) => Ok(command),
279            None => {
280                let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
281                let command = Command::new(&resolved_path);
282
283                self.set_cached_binary_path(resolved_path);
284
285                Ok(command)
286            }
287        }
288    }
289
290    /// Returns a [`Command`] instance configured with the cached `binary_path`, if available.
291    fn command_with_cached_binary_path(&self) -> Option<Command> {
292        let binary_path = self.binary_path.lock().unwrap();
293
294        binary_path.as_ref().map(Command::new)
295    }
296
297    /// Sets the cached `binary_path` with the `new_binary_path`.
298    ///
299    /// # Panics
300    ///
301    /// If the cache is already set to a different value. In theory the two threads calling
302    /// `command_binary` can race and resolve the binary path twice, but they should always be the
303    /// same path.
304    fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
305        let mut binary_path = self.binary_path.lock().unwrap();
306
307        if binary_path.is_none() {
308            *binary_path = Some(new_binary_path);
309        } else {
310            assert_eq!(*binary_path, Some(new_binary_path));
311        }
312    }
313
314    /// Runs `linera create-genesis-config`.
315    pub async fn create_genesis_config(
316        &self,
317        num_other_initial_chains: u32,
318        initial_funding: Amount,
319        policy_config: ResourceControlPolicyConfig,
320        http_allow_list: Option<Vec<String>>,
321    ) -> Result<()> {
322        let mut command = self.command().await?;
323        command
324            .args([
325                "create-genesis-config",
326                &num_other_initial_chains.to_string(),
327            ])
328            .args(["--initial-funding", &initial_funding.to_string()])
329            .args(["--committee", "committee.json"])
330            .args(["--genesis", "genesis.json"])
331            .args([
332                "--policy-config",
333                &policy_config.to_string().to_kebab_case(),
334            ]);
335        if let Some(allow_list) = http_allow_list {
336            command
337                .arg("--http-request-allow-list")
338                .arg(allow_list.join(","));
339        }
340        if let Some(seed) = self.testing_prng_seed {
341            command.arg("--testing-prng-seed").arg(seed.to_string());
342        }
343        command.spawn_and_wait_for_stdout().await?;
344        Ok(())
345    }
346
347    /// Runs `linera wallet init`. The genesis config is read from `genesis.json`, or from the
348    /// faucet if provided.
349    pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
350        let mut command = self.command().await?;
351        command.args(["wallet", "init"]);
352        match faucet {
353            None => command.args(["--genesis", "genesis.json"]),
354            Some(faucet) => command.args(["--faucet", faucet.url()]),
355        };
356        if let Some(seed) = self.testing_prng_seed {
357            command.arg("--testing-prng-seed").arg(seed.to_string());
358        }
359        command.spawn_and_wait_for_stdout().await?;
360        Ok(())
361    }
362
363    /// Runs `linera wallet request-chain`.
364    pub async fn request_chain(
365        &self,
366        faucet: &Faucet,
367        set_default: bool,
368    ) -> Result<(ChainId, AccountOwner)> {
369        let mut command = self.command().await?;
370        command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
371        if set_default {
372            command.arg("--set-default");
373        }
374        let stdout = command.spawn_and_wait_for_stdout().await?;
375        let mut lines = stdout.split_whitespace();
376        let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
377        let owner = lines.next().context("missing chain owner")?.parse()?;
378        Ok((chain_id, owner))
379    }
380
381    /// Runs `linera wallet publish-and-create`.
382    #[expect(clippy::too_many_arguments)]
383    pub async fn publish_and_create<
384        A: ContractAbi,
385        Parameters: Serialize,
386        InstantiationArgument: Serialize,
387    >(
388        &self,
389        contract: PathBuf,
390        service: PathBuf,
391        vm_runtime: VmRuntime,
392        parameters: &Parameters,
393        argument: &InstantiationArgument,
394        required_application_ids: &[ApplicationId],
395        publisher: impl Into<Option<ChainId>>,
396    ) -> Result<ApplicationId<A>> {
397        let json_parameters = serde_json::to_string(parameters)?;
398        let json_argument = serde_json::to_string(argument)?;
399        let mut command = self.command().await?;
400        let vm_runtime = format!("{vm_runtime}");
401        command
402            .arg("publish-and-create")
403            .args([contract, service])
404            .args(["--vm-runtime", &vm_runtime.to_lowercase()])
405            .args(publisher.into().iter().map(ChainId::to_string))
406            .args(["--json-parameters", &json_parameters])
407            .args(["--json-argument", &json_argument]);
408        if !required_application_ids.is_empty() {
409            command.arg("--required-application-ids");
410            command.args(
411                required_application_ids
412                    .iter()
413                    .map(ApplicationId::to_string),
414            );
415        }
416        let stdout = command.spawn_and_wait_for_stdout().await?;
417        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
418    }
419
420    /// Runs `linera publish-module`.
421    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
422        &self,
423        contract: PathBuf,
424        service: PathBuf,
425        vm_runtime: VmRuntime,
426        publisher: impl Into<Option<ChainId>>,
427    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
428        self.publish_module_with_formats(contract, service, None, vm_runtime, publisher)
429            .await
430    }
431
432    /// Runs `linera publish-module` with an optional `--formats` SNAP file path.
433    pub async fn publish_module_with_formats<Abi, Parameters, InstantiationArgument>(
434        &self,
435        contract: PathBuf,
436        service: PathBuf,
437        formats: Option<PathBuf>,
438        vm_runtime: VmRuntime,
439        publisher: impl Into<Option<ChainId>>,
440    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
441        let mut command = self.command().await?;
442        command
443            .arg("publish-module")
444            .args([contract, service])
445            .args(["--vm-runtime", &format!("{vm_runtime}").to_lowercase()]);
446        if let Some(formats_path) = formats {
447            command.arg("--formats").arg(formats_path);
448        }
449        let stdout = command
450            .args(publisher.into().iter().map(ChainId::to_string))
451            .spawn_and_wait_for_stdout()
452            .await?;
453        let module_id: ModuleId = stdout.trim().parse()?;
454        Ok(module_id.with_abi())
455    }
456
457    /// Runs `linera create-application`.
458    pub async fn create_application<
459        Abi: ContractAbi,
460        Parameters: Serialize,
461        InstantiationArgument: Serialize,
462    >(
463        &self,
464        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
465        parameters: &Parameters,
466        argument: &InstantiationArgument,
467        required_application_ids: &[ApplicationId],
468        creator: impl Into<Option<ChainId>>,
469    ) -> Result<ApplicationId<Abi>> {
470        let json_parameters = serde_json::to_string(parameters)?;
471        let json_argument = serde_json::to_string(argument)?;
472        let mut command = self.command().await?;
473        command
474            .arg("create-application")
475            .arg(module_id.forget_abi().to_string())
476            .args(["--json-parameters", &json_parameters])
477            .args(["--json-argument", &json_argument])
478            .args(creator.into().iter().map(ChainId::to_string));
479        if !required_application_ids.is_empty() {
480            command.arg("--required-application-ids");
481            command.args(
482                required_application_ids
483                    .iter()
484                    .map(ApplicationId::to_string),
485            );
486        }
487        let stdout = command.spawn_and_wait_for_stdout().await?;
488        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
489    }
490
491    /// Runs `linera service`.
492    pub async fn run_node_service(
493        &self,
494        port: impl Into<Option<u16>>,
495        process_inbox: ProcessInbox,
496    ) -> Result<NodeService> {
497        self.run_node_service_with_options(port, process_inbox, &[], &[], false)
498            .await
499    }
500
501    /// Runs `linera service` with optional task processor configuration.
502    pub async fn run_node_service_with_options(
503        &self,
504        port: impl Into<Option<u16>>,
505        process_inbox: ProcessInbox,
506        operator_application_ids: &[ApplicationId],
507        operators: &[(String, PathBuf)],
508        read_only: bool,
509    ) -> Result<NodeService> {
510        self.run_node_service_with_all_options(
511            port,
512            process_inbox,
513            operator_application_ids,
514            operators,
515            read_only,
516            &[],
517            &[],
518        )
519        .await
520    }
521
522    /// Runs `linera service` with all available options.
523    #[expect(clippy::too_many_arguments)]
524    pub async fn run_node_service_with_all_options(
525        &self,
526        port: impl Into<Option<u16>>,
527        process_inbox: ProcessInbox,
528        operator_application_ids: &[ApplicationId],
529        operators: &[(String, PathBuf)],
530        read_only: bool,
531        allowed_subscriptions: &[String],
532        subscription_ttls: &[(String, u64)],
533    ) -> Result<NodeService> {
534        let port = port.into().unwrap_or(8080);
535        let mut command = self.command().await?;
536        command.arg("service");
537        if let ProcessInbox::Skip = process_inbox {
538            command.arg("--listener-skip-process-inbox");
539        }
540        if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
541            command.args(var.split_whitespace());
542        }
543        for app_id in operator_application_ids {
544            command.args(["--operator-application-ids", &app_id.to_string()]);
545        }
546        for (name, path) in operators {
547            command.args(["--operators", &format!("{}={}", name, path.display())]);
548        }
549        if read_only {
550            command.arg("--read-only");
551        }
552        for query in allowed_subscriptions {
553            command.args(["--allow-subscription", query]);
554        }
555        for (name, secs) in subscription_ttls {
556            command.args(["--subscription-ttl-secs", &format!("{name}={secs}")]);
557        }
558        let child = command
559            .args(["--port".to_string(), port.to_string()])
560            .spawn_into()?;
561        let client = reqwest_client();
562        for i in 0..10 {
563            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
564            let request = client.get(format!("http://localhost:{port}/")).send().await;
565            if request.is_ok() {
566                tracing::info!("Node service has started");
567                return Ok(NodeService::new(port, child));
568            } else {
569                tracing::warn!("Waiting for node service to start");
570            }
571        }
572        bail!("Failed to start node service");
573    }
574
575    /// Runs `linera service` with a controller application.
576    pub async fn run_node_service_with_controller(
577        &self,
578        port: impl Into<Option<u16>>,
579        process_inbox: ProcessInbox,
580        controller_id: &ApplicationId,
581        operators: &[(String, PathBuf)],
582    ) -> Result<NodeService> {
583        let port = port.into().unwrap_or(8080);
584        let mut command = self.command().await?;
585        command.arg("service");
586        if let ProcessInbox::Skip = process_inbox {
587            command.arg("--listener-skip-process-inbox");
588        }
589        if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
590            command.args(var.split_whitespace());
591        }
592        command.args(["--controller-id", &controller_id.to_string()]);
593        for (name, path) in operators {
594            command.args(["--operators", &format!("{}={}", name, path.display())]);
595        }
596        let child = command
597            .args(["--port".to_string(), port.to_string()])
598            .spawn_into()?;
599        let client = reqwest_client();
600        for i in 0..10 {
601            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
602            let request = client.get(format!("http://localhost:{port}/")).send().await;
603            if request.is_ok() {
604                tracing::info!("Node service has started");
605                return Ok(NodeService::new(port, child));
606            } else {
607                tracing::warn!("Waiting for node service to start");
608            }
609        }
610        bail!("Failed to start node service");
611    }
612
613    /// Runs `linera validator query`
614    pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
615        let mut command = self.command().await?;
616        command.arg("validator").arg("query").arg(address);
617        let stdout = command.spawn_and_wait_for_stdout().await?;
618
619        // Parse the genesis config hash from the output.
620        // It's on a line like "Genesis config hash: <hash>"
621        let hash = stdout
622            .lines()
623            .find_map(|line| {
624                line.strip_prefix("Genesis config hash: ")
625                    .and_then(|hash_str| hash_str.trim().parse().ok())
626            })
627            .context("error while parsing the result of `linera validator query`")?;
628        Ok(hash)
629    }
630
631    /// Runs `linera validator list`.
632    pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
633        let mut command = self.command().await?;
634        command.arg("validator").arg("list");
635        if let Some(chain_id) = chain_id {
636            command.args(["--chain-id", &chain_id.to_string()]);
637        }
638        command.spawn_and_wait_for_stdout().await?;
639        Ok(())
640    }
641
642    /// Runs `linera sync-validator`.
643    pub async fn sync_validator(
644        &self,
645        chain_ids: impl IntoIterator<Item = &ChainId>,
646        validator_address: impl Into<String>,
647    ) -> Result<()> {
648        let mut command = self.command().await?;
649        command
650            .arg("validator")
651            .arg("sync")
652            .arg(validator_address.into());
653        let mut chain_ids = chain_ids.into_iter().peekable();
654        if chain_ids.peek().is_some() {
655            command
656                .arg("--chains")
657                .args(chain_ids.map(ChainId::to_string));
658        }
659        command.spawn_and_wait_for_stdout().await?;
660        Ok(())
661    }
662
663    /// Runs `linera validator benchmark` and returns its stdout.
664    pub async fn validator_benchmark(
665        &self,
666        address: impl Into<String>,
667        chains: impl IntoIterator<Item = &ChainId>,
668        extra_args: &[&str],
669    ) -> Result<String> {
670        let mut command = self.command().await?;
671        command
672            .arg("validator")
673            .arg("benchmark")
674            .arg(address.into());
675        for chain in chains {
676            command.args(["--chain", &chain.to_string()]);
677        }
678        command.args(extra_args);
679        command.spawn_and_wait_for_stdout().await
680    }
681
682    /// Runs `linera faucet`.
683    pub async fn run_faucet(
684        &self,
685        port: impl Into<Option<u16>>,
686        chain_id: Option<ChainId>,
687        amount: Amount,
688    ) -> Result<FaucetService> {
689        let port = port.into().unwrap_or(8080);
690        let temp_dir = tempfile::tempdir()
691            .context("Failed to create temporary directory for faucet storage")?;
692        let storage_path = temp_dir.path().join("faucet_storage.sqlite");
693        let mut command = self.command().await?;
694        let command = command
695            .arg("faucet")
696            .args(["--port".to_string(), port.to_string()])
697            .args(["--amount".to_string(), amount.to_string()])
698            .args([
699                "--storage-path".to_string(),
700                storage_path.to_string_lossy().to_string(),
701            ]);
702        if let Some(chain_id) = chain_id {
703            command.arg(chain_id.to_string());
704        }
705        let child = command.spawn_into()?;
706        let client = reqwest_client();
707        for i in 0..10 {
708            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
709            let request = client.get(format!("http://localhost:{port}/")).send().await;
710            if request.is_ok() {
711                tracing::info!("Faucet has started");
712                return Ok(FaucetService::new(port, child, temp_dir));
713            } else {
714                tracing::debug!("Waiting for faucet to start");
715            }
716        }
717        bail!("Failed to start faucet");
718    }
719
720    /// Runs `linera local-balance`.
721    pub async fn local_balance(&self, account: Account) -> Result<Amount> {
722        let stdout = self
723            .command()
724            .await?
725            .arg("local-balance")
726            .arg(account.to_string())
727            .spawn_and_wait_for_stdout()
728            .await?;
729        let amount = stdout
730            .trim()
731            .parse()
732            .context("error while parsing the result of `linera local-balance`")?;
733        Ok(amount)
734    }
735
736    /// Runs `linera query-balance`.
737    pub async fn query_balance(&self, account: Account) -> Result<Amount> {
738        let stdout = self
739            .command()
740            .await?
741            .arg("query-balance")
742            .arg(account.to_string())
743            .spawn_and_wait_for_stdout()
744            .await?;
745        let amount = stdout
746            .trim()
747            .parse()
748            .context("error while parsing the result of `linera query-balance`")?;
749        Ok(amount)
750    }
751
752    /// Runs `linera query-application` and parses the JSON result.
753    pub async fn query_application_json<T: DeserializeOwned>(
754        &self,
755        chain_id: ChainId,
756        application_id: ApplicationId,
757        query: impl AsRef<str>,
758    ) -> Result<T> {
759        let query = query.as_ref().trim();
760        let name = query
761            .split_once(|ch: char| !ch.is_alphanumeric())
762            .map_or(query, |(name, _)| name);
763        let stdout = self
764            .command()
765            .await?
766            .arg("query-application")
767            .arg("--chain-id")
768            .arg(chain_id.to_string())
769            .arg("--application-id")
770            .arg(application_id.to_string())
771            .arg(query)
772            .spawn_and_wait_for_stdout()
773            .await?;
774        let data: serde_json::Value =
775            serde_json::from_str(stdout.trim()).context("invalid JSON from query-application")?;
776        serde_json::from_value(data[name].clone())
777            .with_context(|| format!("{name} field missing in query-application response"))
778    }
779
780    /// Runs `linera sync`.
781    pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
782        self.command()
783            .await?
784            .arg("sync")
785            .arg(chain_id.to_string())
786            .spawn_and_wait_for_stdout()
787            .await?;
788        Ok(())
789    }
790
791    /// Runs `linera process-inbox`.
792    pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
793        self.command()
794            .await?
795            .arg("process-inbox")
796            .arg(chain_id.to_string())
797            .spawn_and_wait_for_stdout()
798            .await?;
799        Ok(())
800    }
801
802    /// Runs `linera transfer`.
803    pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
804        self.command()
805            .await?
806            .arg("transfer")
807            .arg(amount.to_string())
808            .args(["--from", &from.to_string()])
809            .args(["--to", &to.to_string()])
810            .spawn_and_wait_for_stdout()
811            .await?;
812        Ok(())
813    }
814
815    /// Runs `linera transfer` with no logging.
816    pub async fn transfer_with_silent_logs(
817        &self,
818        amount: Amount,
819        from: ChainId,
820        to: ChainId,
821    ) -> Result<()> {
822        self.command()
823            .await?
824            .env("RUST_LOG", "off")
825            .arg("transfer")
826            .arg(amount.to_string())
827            .args(["--from", &from.to_string()])
828            .args(["--to", &to.to_string()])
829            .spawn_and_wait_for_stdout()
830            .await?;
831        Ok(())
832    }
833
834    /// Runs `linera transfer` with owner accounts.
835    pub async fn transfer_with_accounts(
836        &self,
837        amount: Amount,
838        from: Account,
839        to: Account,
840    ) -> Result<()> {
841        self.command()
842            .await?
843            .arg("transfer")
844            .arg(amount.to_string())
845            .args(["--from", &from.to_string()])
846            .args(["--to", &to.to_string()])
847            .spawn_and_wait_for_stdout()
848            .await?;
849        Ok(())
850    }
851
852    fn benchmark_command_internal(command: &mut Command, args: &BenchmarkCommand) -> Result<()> {
853        let mut formatted_args = to_args(&args)?;
854        let subcommand = formatted_args.remove(0);
855        // The subcommand is followed by the flattened options, which are preceded by "options".
856        // So remove that as well.
857        formatted_args.remove(0);
858        let options = formatted_args
859            .chunks_exact(2)
860            .flat_map(|pair| {
861                let option = format!("--{}", pair[0]);
862                match pair[1].as_str() {
863                    "true" => vec![option],
864                    "false" => vec![],
865                    _ => vec![option, pair[1].clone()],
866                }
867            })
868            .collect::<Vec<_>>();
869        command
870            .args([
871                "--max-pending-message-bundles",
872                &args.transactions_per_block().to_string(),
873            ])
874            .arg("benchmark")
875            .arg(subcommand)
876            .args(options);
877        Ok(())
878    }
879
880    async fn benchmark_command_with_envs(
881        &self,
882        args: BenchmarkCommand,
883        envs: &[(&str, &str)],
884    ) -> Result<Command> {
885        let mut command = self
886            .command_with_envs_and_arguments(envs, self.required_command_arguments())
887            .await?;
888        Self::benchmark_command_internal(&mut command, &args)?;
889        Ok(command)
890    }
891
892    async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
893        let mut command = self
894            .command_with_arguments(self.required_command_arguments())
895            .await?;
896        Self::benchmark_command_internal(&mut command, &args)?;
897        Ok(command)
898    }
899
900    /// Runs `linera benchmark`.
901    pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
902        let mut command = self.benchmark_command(args).await?;
903        command.spawn_and_wait_for_stdout().await?;
904        Ok(())
905    }
906
907    /// Runs `linera benchmark`, but detached: don't wait for the command to finish, just spawn it
908    /// and return the child process, and the handles to the stdout and stderr.
909    pub async fn benchmark_detached(
910        &self,
911        args: BenchmarkCommand,
912        tx: oneshot::Sender<()>,
913    ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
914        let mut child = self
915            .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
916            .await?
917            .kill_on_drop(true)
918            .stdin(Stdio::piped())
919            .stdout(Stdio::piped())
920            .stderr(Stdio::piped())
921            .spawn()?;
922
923        let pid = child.id().expect("failed to get pid");
924        let stdout = child.stdout.take().expect("stdout not open");
925        let stdout_handle = tokio::spawn(async move {
926            let mut lines = BufReader::new(stdout).lines();
927            while let Ok(Some(line)) = lines.next_line().await {
928                println!("benchmark{{pid={pid}}} {line}");
929            }
930        });
931
932        let stderr = child.stderr.take().expect("stderr not open");
933        let stderr_handle = tokio::spawn(async move {
934            let mut lines = BufReader::new(stderr).lines();
935            let mut tx = Some(tx);
936            while let Ok(Some(line)) = lines.next_line().await {
937                if line.contains("Ready to start benchmark") {
938                    tx.take()
939                        .expect("Should only send signal once")
940                        .send(())
941                        .expect("failed to send ready signal to main thread");
942                } else {
943                    println!("benchmark{{pid={pid}}} {line}");
944                }
945            }
946        });
947        Ok((child, stdout_handle, stderr_handle))
948    }
949
950    async fn open_chain_internal(
951        &self,
952        from: ChainId,
953        owner: Option<AccountOwner>,
954        initial_balance: Amount,
955        super_owner: bool,
956    ) -> Result<(ChainId, AccountOwner)> {
957        let mut command = self.command().await?;
958        command
959            .arg("open-chain")
960            .args(["--from", &from.to_string()])
961            .args(["--initial-balance", &initial_balance.to_string()]);
962
963        if let Some(owner) = owner {
964            command.args(["--owner", &owner.to_string()]);
965        }
966
967        if super_owner {
968            command.arg("--super-owner");
969        }
970
971        let stdout = command.spawn_and_wait_for_stdout().await?;
972        let mut split = stdout.split('\n');
973        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
974        let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
975        if let Some(owner) = owner {
976            assert_eq!(owner, new_owner);
977        }
978        Ok((chain_id, new_owner))
979    }
980
981    /// Runs `linera open-chain --super-owner`.
982    pub async fn open_chain_super_owner(
983        &self,
984        from: ChainId,
985        owner: Option<AccountOwner>,
986        initial_balance: Amount,
987    ) -> Result<(ChainId, AccountOwner)> {
988        self.open_chain_internal(from, owner, initial_balance, true)
989            .await
990    }
991
992    /// Runs `linera open-chain`.
993    pub async fn open_chain(
994        &self,
995        from: ChainId,
996        owner: Option<AccountOwner>,
997        initial_balance: Amount,
998    ) -> Result<(ChainId, AccountOwner)> {
999        self.open_chain_internal(from, owner, initial_balance, false)
1000            .await
1001    }
1002
1003    /// Runs `linera open-chain` then `linera assign`.
1004    pub async fn open_and_assign(
1005        &self,
1006        client: &ClientWrapper,
1007        initial_balance: Amount,
1008    ) -> Result<ChainId> {
1009        let our_chain = self
1010            .load_wallet()?
1011            .default_chain()
1012            .context("no default chain found")?;
1013        let owner = client.keygen().await?;
1014        let (new_chain, _) = self
1015            .open_chain(our_chain, Some(owner), initial_balance)
1016            .await?;
1017        client.assign(owner, new_chain).await?;
1018        Ok(new_chain)
1019    }
1020
1021    /// Runs `linera open-multi-owner-chain` and returns the new chain's ID.
1022    pub async fn open_multi_owner_chain(
1023        &self,
1024        from: ChainId,
1025        owners: BTreeMap<AccountOwner, u64>,
1026        multi_leader_rounds: u32,
1027        balance: Amount,
1028        base_timeout_ms: u64,
1029    ) -> Result<ChainId> {
1030        let mut command = self.command().await?;
1031        command
1032            .arg("open-multi-owner-chain")
1033            .args(["--from", &from.to_string()])
1034            .arg("--owners")
1035            .arg(serde_json::to_string(&owners)?)
1036            .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
1037        command
1038            .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
1039            .args(["--initial-balance", &balance.to_string()]);
1040
1041        let stdout = command.spawn_and_wait_for_stdout().await?;
1042        let mut split = stdout.split('\n');
1043        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
1044
1045        Ok(chain_id)
1046    }
1047
1048    /// Runs `linera change-ownership` to set the super owners and owners of a chain.
1049    pub async fn change_ownership(
1050        &self,
1051        chain_id: ChainId,
1052        super_owners: Vec<AccountOwner>,
1053        owners: Vec<AccountOwner>,
1054    ) -> Result<()> {
1055        let mut command = self.command().await?;
1056        command
1057            .arg("change-ownership")
1058            .args(["--chain-id", &chain_id.to_string()]);
1059        command
1060            .arg("--super-owners")
1061            .arg(serde_json::to_string(&super_owners)?);
1062        command.arg("--owners").arg(serde_json::to_string(
1063            &owners
1064                .into_iter()
1065                .zip(std::iter::repeat(100u64))
1066                .collect::<BTreeMap<_, _>>(),
1067        )?);
1068        command.spawn_and_wait_for_stdout().await?;
1069        Ok(())
1070    }
1071
1072    /// Runs `linera change-application-permissions` for the given chain.
1073    pub async fn change_application_permissions(
1074        &self,
1075        chain_id: ChainId,
1076        application_permissions: ApplicationPermissions,
1077    ) -> Result<()> {
1078        let mut command = self.command().await?;
1079        command
1080            .arg("change-application-permissions")
1081            .args(["--chain-id", &chain_id.to_string()]);
1082        command.arg("--manage-chain").arg(serde_json::to_string(
1083            &application_permissions.manage_chain,
1084        )?);
1085        // TODO: add other fields
1086        command.spawn_and_wait_for_stdout().await?;
1087        Ok(())
1088    }
1089
1090    /// Runs `linera wallet follow-chain CHAIN_ID`.
1091    pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
1092        let mut command = self.command().await?;
1093        command
1094            .args(["wallet", "follow-chain"])
1095            .arg(chain_id.to_string());
1096        if sync {
1097            command.arg("--sync");
1098        }
1099        command.spawn_and_wait_for_stdout().await?;
1100        Ok(())
1101    }
1102
1103    /// Runs `linera wallet forget-chain CHAIN_ID`.
1104    pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
1105        let mut command = self.command().await?;
1106        command
1107            .args(["wallet", "forget-chain"])
1108            .arg(chain_id.to_string());
1109        command.spawn_and_wait_for_stdout().await?;
1110        Ok(())
1111    }
1112
1113    /// Runs `linera wallet set-default CHAIN_ID`.
1114    pub async fn set_default_chain(&self, chain_id: ChainId) -> Result<()> {
1115        let mut command = self.command().await?;
1116        command
1117            .args(["wallet", "set-default"])
1118            .arg(chain_id.to_string());
1119        command.spawn_and_wait_for_stdout().await?;
1120        Ok(())
1121    }
1122
1123    /// Runs `linera retry-pending-block` for the given chain.
1124    pub async fn retry_pending_block(
1125        &self,
1126        chain_id: Option<ChainId>,
1127    ) -> Result<Option<CryptoHash>> {
1128        let mut command = self.command().await?;
1129        command.arg("retry-pending-block");
1130        if let Some(chain_id) = chain_id {
1131            command.arg(chain_id.to_string());
1132        }
1133        let stdout = command.spawn_and_wait_for_stdout().await?;
1134        let stdout = stdout.trim();
1135        if stdout.is_empty() {
1136            Ok(None)
1137        } else {
1138            Ok(Some(CryptoHash::from_str(stdout)?))
1139        }
1140    }
1141
1142    /// Runs `linera publish-data-blob`.
1143    pub async fn publish_data_blob(
1144        &self,
1145        path: &Path,
1146        chain_id: Option<ChainId>,
1147    ) -> Result<CryptoHash> {
1148        let mut command = self.command().await?;
1149        command.arg("publish-data-blob").arg(path);
1150        if let Some(chain_id) = chain_id {
1151            command.arg(chain_id.to_string());
1152        }
1153        let stdout = command.spawn_and_wait_for_stdout().await?;
1154        let stdout = stdout.trim();
1155        Ok(CryptoHash::from_str(stdout)?)
1156    }
1157
1158    /// Runs `linera read-data-blob`.
1159    pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
1160        let mut command = self.command().await?;
1161        command.arg("read-data-blob").arg(hash.to_string());
1162        if let Some(chain_id) = chain_id {
1163            command.arg(chain_id.to_string());
1164        }
1165        command.spawn_and_wait_for_stdout().await?;
1166        Ok(())
1167    }
1168
1169    /// Reads the wallet from disk.
1170    pub fn load_wallet(&self) -> Result<Wallet> {
1171        Ok(Wallet::read(&self.wallet_path())?)
1172    }
1173
1174    /// Reads the keystore from disk.
1175    pub fn load_keystore(&self) -> Result<InMemorySigner> {
1176        util::read_json(self.keystore_path())
1177    }
1178
1179    /// Returns the path to the wallet file.
1180    pub fn wallet_path(&self) -> PathBuf {
1181        self.path_provider.path().join(&self.wallet)
1182    }
1183
1184    /// Returns the path to the keystore file.
1185    pub fn keystore_path(&self) -> PathBuf {
1186        self.path_provider.path().join(&self.keystore)
1187    }
1188
1189    /// Returns the storage configuration string.
1190    pub fn storage_path(&self) -> &str {
1191        &self.storage
1192    }
1193
1194    /// Returns the owner of the wallet's default chain, if any.
1195    pub fn get_owner(&self) -> Option<AccountOwner> {
1196        let wallet = self.load_wallet().ok()?;
1197        wallet
1198            .get(wallet.default_chain()?)
1199            .expect("default chain must be in wallet")
1200            .owner
1201    }
1202
1203    /// Returns whether the given chain is present in the wallet.
1204    pub fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
1205        self.load_wallet()
1206            .ok()
1207            .is_some_and(|wallet| wallet.get(chain).is_some())
1208    }
1209
1210    /// Runs `linera validator add` for the given validator key, port, and number of votes.
1211    pub async fn set_validator(
1212        &self,
1213        validator_key: &(String, String),
1214        port: usize,
1215        votes: usize,
1216    ) -> Result<()> {
1217        let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1218        self.command()
1219            .await?
1220            .arg("validator")
1221            .arg("add")
1222            .args(["--public-key", &validator_key.0])
1223            .args(["--account-key", &validator_key.1])
1224            .args(["--address", &address])
1225            .args(["--votes", &votes.to_string()])
1226            .spawn_and_wait_for_stdout()
1227            .await?;
1228        Ok(())
1229    }
1230
1231    /// Runs `linera validator remove` for the given validator public key.
1232    pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
1233        self.command()
1234            .await?
1235            .arg("validator")
1236            .arg("remove")
1237            .args(["--public-key", validator_key])
1238            .spawn_and_wait_for_stdout()
1239            .await?;
1240        Ok(())
1241    }
1242
1243    /// Runs `linera validator update` to add, modify, and remove validators in one batch.
1244    pub async fn change_validators(
1245        &self,
1246        add_validators: &[(String, String, usize, usize)], // (public_key, account_key, port, votes)
1247        modify_validators: &[(String, String, usize, usize)], // (public_key, account_key, port, votes)
1248        remove_validators: &[String],
1249    ) -> Result<()> {
1250        use std::str::FromStr;
1251
1252        use linera_base::crypto::{AccountPublicKey, ValidatorPublicKey};
1253
1254        // Build a map that will be serialized to JSON
1255        // Use the exact types that deserialization expects
1256        let mut changes = std::collections::HashMap::new();
1257
1258        // Add/modify validators
1259        for (public_key_str, account_key_str, port, votes) in
1260            add_validators.iter().chain(modify_validators.iter())
1261        {
1262            let public_key = ValidatorPublicKey::from_str(public_key_str)
1263                .with_context(|| format!("Invalid validator public key: {public_key_str}"))?;
1264
1265            let account_key = AccountPublicKey::from_str(account_key_str)
1266                .with_context(|| format!("Invalid account public key: {account_key_str}"))?;
1267
1268            let address = format!("{}:127.0.0.1:{}", self.network.short(), port)
1269                .parse()
1270                .unwrap();
1271
1272            // Create ValidatorChange struct
1273            let change = crate::cli::validator::Change {
1274                account_key,
1275                address,
1276                votes: crate::cli::validator::Votes(
1277                    std::num::NonZero::new(*votes as u64).context("Votes must be non-zero")?,
1278                ),
1279            };
1280
1281            changes.insert(public_key, Some(change));
1282        }
1283
1284        // Remove validators (set to None)
1285        for validator_key_str in remove_validators {
1286            let public_key = ValidatorPublicKey::from_str(validator_key_str)
1287                .with_context(|| format!("Invalid validator public key: {validator_key_str}"))?;
1288            changes.insert(public_key, None);
1289        }
1290
1291        // Create temporary file with JSON
1292        let temp_file = tempfile::NamedTempFile::new()
1293            .context("Failed to create temporary file for validator changes")?;
1294        serde_json::to_writer(&temp_file, &changes)
1295            .context("Failed to write validator changes to file")?;
1296        let temp_path = temp_file.path();
1297
1298        self.command()
1299            .await?
1300            .arg("validator")
1301            .arg("update")
1302            .arg(temp_path)
1303            .arg("--yes") // Skip confirmation prompt
1304            .spawn_and_wait_for_stdout()
1305            .await?;
1306
1307        Ok(())
1308    }
1309
1310    /// Runs `linera revoke-epochs` up to the given epoch.
1311    pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
1312        self.command()
1313            .await?
1314            .arg("revoke-epochs")
1315            .arg(epoch.to_string())
1316            .spawn_and_wait_for_stdout()
1317            .await?;
1318        Ok(())
1319    }
1320
1321    /// Runs `linera resource-control-policy` with the given overrides.
1322    pub async fn set_resource_control_policy(
1323        &self,
1324        overrides: ResourceControlPolicyOverrides,
1325    ) -> Result<()> {
1326        let mut command = self.command().await?;
1327        command.arg("resource-control-policy");
1328        let ResourceControlPolicyOverrides {
1329            wasm_fuel_unit,
1330            evm_fuel_unit,
1331            read_operation,
1332            write_operation,
1333            byte_runtime,
1334            byte_read,
1335            byte_written,
1336            blob_read,
1337            blob_published,
1338            blob_byte_read,
1339            blob_byte_published,
1340            operation,
1341            operation_byte,
1342            message,
1343            message_byte,
1344            service_as_oracle_query,
1345            http_request,
1346            maximum_wasm_fuel_per_block,
1347            maximum_evm_fuel_per_block,
1348            maximum_service_oracle_execution_ms,
1349            maximum_block_size,
1350            maximum_blob_size,
1351            maximum_published_blobs,
1352            maximum_bytecode_size,
1353            maximum_block_proposal_size,
1354            maximum_bytes_read_per_block,
1355            maximum_bytes_written_per_block,
1356            maximum_oracle_response_bytes,
1357            maximum_http_response_bytes,
1358            http_request_timeout_ms,
1359            http_request_allow_list,
1360            free_application_ids,
1361            flags,
1362        } = overrides;
1363        if let Some(value) = wasm_fuel_unit {
1364            command.args(["--wasm-fuel-unit", &value.to_string()]);
1365        }
1366        if let Some(value) = evm_fuel_unit {
1367            command.args(["--evm-fuel-unit", &value.to_string()]);
1368        }
1369        if let Some(value) = read_operation {
1370            command.args(["--read-operation", &value.to_string()]);
1371        }
1372        if let Some(value) = write_operation {
1373            command.args(["--write-operation", &value.to_string()]);
1374        }
1375        if let Some(value) = byte_runtime {
1376            command.args(["--byte-runtime", &value.to_string()]);
1377        }
1378        if let Some(value) = byte_read {
1379            command.args(["--byte-read", &value.to_string()]);
1380        }
1381        if let Some(value) = byte_written {
1382            command.args(["--byte-written", &value.to_string()]);
1383        }
1384        if let Some(value) = blob_read {
1385            command.args(["--blob-read", &value.to_string()]);
1386        }
1387        if let Some(value) = blob_published {
1388            command.args(["--blob-published", &value.to_string()]);
1389        }
1390        if let Some(value) = blob_byte_read {
1391            command.args(["--blob-byte-read", &value.to_string()]);
1392        }
1393        if let Some(value) = blob_byte_published {
1394            command.args(["--blob-byte-published", &value.to_string()]);
1395        }
1396        if let Some(value) = operation {
1397            command.args(["--operation", &value.to_string()]);
1398        }
1399        if let Some(value) = operation_byte {
1400            command.args(["--operation-byte", &value.to_string()]);
1401        }
1402        if let Some(value) = message {
1403            command.args(["--message", &value.to_string()]);
1404        }
1405        if let Some(value) = message_byte {
1406            command.args(["--message-byte", &value.to_string()]);
1407        }
1408        if let Some(value) = service_as_oracle_query {
1409            command.args(["--service-as-oracle-query", &value.to_string()]);
1410        }
1411        if let Some(value) = http_request {
1412            command.args(["--http-request", &value.to_string()]);
1413        }
1414        if let Some(value) = maximum_wasm_fuel_per_block {
1415            command.args(["--maximum-wasm-fuel-per-block", &value.to_string()]);
1416        }
1417        if let Some(value) = maximum_evm_fuel_per_block {
1418            command.args(["--maximum-evm-fuel-per-block", &value.to_string()]);
1419        }
1420        if let Some(value) = maximum_service_oracle_execution_ms {
1421            command.args(["--maximum-service-oracle-execution-ms", &value.to_string()]);
1422        }
1423        if let Some(value) = maximum_block_size {
1424            command.args(["--maximum-block-size", &value.to_string()]);
1425        }
1426        if let Some(value) = maximum_blob_size {
1427            command.args(["--maximum-blob-size", &value.to_string()]);
1428        }
1429        if let Some(value) = maximum_published_blobs {
1430            command.args(["--maximum-published-blobs", &value.to_string()]);
1431        }
1432        if let Some(value) = maximum_bytecode_size {
1433            command.args(["--maximum-bytecode-size", &value.to_string()]);
1434        }
1435        if let Some(value) = maximum_block_proposal_size {
1436            command.args(["--maximum-block-proposal-size", &value.to_string()]);
1437        }
1438        if let Some(value) = maximum_bytes_read_per_block {
1439            command.args(["--maximum-bytes-read-per-block", &value.to_string()]);
1440        }
1441        if let Some(value) = maximum_bytes_written_per_block {
1442            command.args(["--maximum-bytes-written-per-block", &value.to_string()]);
1443        }
1444        if let Some(value) = maximum_oracle_response_bytes {
1445            command.args(["--maximum-oracle-response-bytes", &value.to_string()]);
1446        }
1447        if let Some(value) = maximum_http_response_bytes {
1448            command.args(["--maximum-http-response-bytes", &value.to_string()]);
1449        }
1450        if let Some(value) = http_request_timeout_ms {
1451            command.args(["--http-request-timeout-ms", &value.to_string()]);
1452        }
1453        if let Some(values) = http_request_allow_list {
1454            command.args(["--http-request-allow-list", &values.join(",")]);
1455        }
1456        if let Some(values) = free_application_ids {
1457            command.args(["--free-application-ids", &values.join(",")]);
1458        }
1459        if let Some(values) = flags {
1460            command.args(["--flags", &values.join(",")]);
1461        }
1462        command.spawn_and_wait_for_stdout().await?;
1463        Ok(())
1464    }
1465
1466    /// Runs `linera keygen`.
1467    pub async fn keygen(&self) -> Result<AccountOwner> {
1468        let stdout = self
1469            .command()
1470            .await?
1471            .arg("keygen")
1472            .spawn_and_wait_for_stdout()
1473            .await?;
1474        AccountOwner::from_str(stdout.as_str().trim())
1475    }
1476
1477    /// Returns the default chain.
1478    pub fn default_chain(&self) -> Option<ChainId> {
1479        self.load_wallet().ok()?.default_chain()
1480    }
1481
1482    /// Runs `linera assign`.
1483    pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
1484        let _stdout = self
1485            .command()
1486            .await?
1487            .arg("assign")
1488            .args(["--owner", &owner.to_string()])
1489            .args(["--chain-id", &chain_id.to_string()])
1490            .spawn_and_wait_for_stdout()
1491            .await?;
1492        Ok(())
1493    }
1494
1495    /// Runs `linera set-preferred-owner` for `chain_id`.
1496    pub async fn set_preferred_owner(
1497        &self,
1498        chain_id: ChainId,
1499        owner: Option<AccountOwner>,
1500    ) -> Result<()> {
1501        let mut owner_arg = vec!["--owner".to_string()];
1502        if let Some(owner) = owner {
1503            owner_arg.push(owner.to_string());
1504        };
1505        self.command()
1506            .await?
1507            .arg("set-preferred-owner")
1508            .args(["--chain-id", &chain_id.to_string()])
1509            .args(owner_arg)
1510            .spawn_and_wait_for_stdout()
1511            .await?;
1512        Ok(())
1513    }
1514
1515    /// Builds the application at the given path with `cargo` and returns the contract and service paths.
1516    pub async fn build_application(
1517        &self,
1518        path: &Path,
1519        name: &str,
1520        is_workspace: bool,
1521    ) -> Result<(PathBuf, PathBuf)> {
1522        Command::new("cargo")
1523            .current_dir(self.path_provider.path())
1524            .arg("build")
1525            .arg("--release")
1526            .args(["--target", "wasm32-unknown-unknown"])
1527            .arg("--manifest-path")
1528            .arg(path.join("Cargo.toml"))
1529            .spawn_and_wait_for_stdout()
1530            .await?;
1531
1532        let release_dir = match is_workspace {
1533            true => path.join("../target/wasm32-unknown-unknown/release"),
1534            false => path.join("target/wasm32-unknown-unknown/release"),
1535        };
1536
1537        let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1538        let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1539
1540        let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1541        let service_size = fs_err::tokio::metadata(&service).await?.len();
1542        tracing::info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1543
1544        Ok((contract, service))
1545    }
1546}
1547
1548impl Drop for ClientWrapper {
1549    fn drop(&mut self) {
1550        use std::process::Command as SyncCommand;
1551
1552        if self.on_drop != OnClientDrop::CloseChains {
1553            return;
1554        }
1555
1556        let Ok(binary_path) = self.binary_path.lock() else {
1557            tracing::error!(
1558                "Failed to close chains because a thread panicked with a lock to `binary_path`"
1559            );
1560            return;
1561        };
1562
1563        let Some(binary_path) = binary_path.as_ref() else {
1564            tracing::warn!(
1565                "Assuming no chains need to be closed, because the command binary was never \
1566                resolved and therefore presumably never called"
1567            );
1568            return;
1569        };
1570
1571        let working_directory = self.path_provider.path();
1572        let mut wallet_show_command = SyncCommand::new(binary_path);
1573
1574        for argument in self.command_arguments() {
1575            wallet_show_command.arg(&*argument);
1576        }
1577
1578        let Ok(wallet_show_output) = wallet_show_command
1579            .current_dir(working_directory)
1580            .args(["wallet", "show", "--short", "--owned"])
1581            .output()
1582        else {
1583            tracing::warn!("Failed to execute `wallet show --short` to list chains to close");
1584            return;
1585        };
1586
1587        if !wallet_show_output.status.success() {
1588            tracing::warn!("Failed to list chains in the wallet to close them");
1589            return;
1590        }
1591
1592        let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1593            tracing::warn!(
1594                "Failed to close chains because `linera wallet show --short` \
1595                returned a non-UTF-8 output"
1596            );
1597            return;
1598        };
1599
1600        let chain_ids = chain_list_string
1601            .split('\n')
1602            .map(|line| line.trim())
1603            .filter(|line| !line.is_empty());
1604
1605        for chain_id in chain_ids {
1606            let mut close_chain_command = SyncCommand::new(binary_path);
1607
1608            for argument in self.command_arguments() {
1609                close_chain_command.arg(&*argument);
1610            }
1611
1612            close_chain_command.current_dir(working_directory);
1613
1614            match close_chain_command.args(["close-chain", chain_id]).status() {
1615                Ok(status) if status.success() => (),
1616                Ok(failure) => tracing::warn!("Failed to close chain {chain_id}: {failure}"),
1617                Err(error) => tracing::warn!("Failed to close chain {chain_id}: {error}"),
1618            }
1619        }
1620    }
1621}
1622
1623#[cfg(with_testing)]
1624impl ClientWrapper {
1625    /// Builds the example application with the given name from the `examples` directory.
1626    pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1627        self.build_application(Self::example_path(name)?.as_path(), name, true)
1628            .await
1629    }
1630
1631    /// Builds the test example application with the given name from the test fixtures.
1632    pub async fn build_test_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1633        self.build_application(Self::test_example_path(name)?.as_path(), name, true)
1634            .await
1635    }
1636
1637    /// Returns the path to the example application with the given name.
1638    pub fn example_path(name: &str) -> Result<PathBuf> {
1639        Ok(env::current_dir()?.join("../examples/").join(name))
1640    }
1641
1642    /// Returns the path to the test example application with the given name.
1643    pub fn test_example_path(name: &str) -> Result<PathBuf> {
1644        Ok(env::current_dir()?
1645            .join("../linera-sdk/tests/fixtures/")
1646            .join(name))
1647    }
1648}
1649
1650fn truncate_query_output(input: &str) -> String {
1651    let max_len = 1000;
1652    if input.len() < max_len {
1653        input.to_string()
1654    } else {
1655        format!("{} ...", input.get(..max_len).unwrap())
1656    }
1657}
1658
1659fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1660    let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1661    let max_len = 200;
1662    if query.len() < max_len {
1663        query
1664    } else {
1665        format!("{} ...", query.get(..max_len).unwrap())
1666    }
1667}
1668
1669/// A running node service.
1670/// Logs a warning if a child process has already exited when its wrapper is dropped.
1671/// On Unix, includes the signal number if the process was killed (e.g. signal 9 = OOM).
1672fn log_unexpected_exit(child: &mut Child, service_kind: &str, port: u16) {
1673    match child.try_wait() {
1674        Ok(Some(status)) => {
1675            #[cfg(unix)]
1676            {
1677                use std::os::unix::process::ExitStatusExt as _;
1678                if let Some(signal) = status.signal() {
1679                    tracing::error!(
1680                        port,
1681                        signal,
1682                        "The {service_kind} service was killed by signal {signal}",
1683                    );
1684                    return;
1685                }
1686            }
1687            if !status.success() {
1688                tracing::error!(
1689                    port,
1690                    %status,
1691                    "The {service_kind} service exited unexpectedly with {status}",
1692                );
1693            }
1694        }
1695        Ok(None) => {} // Still running — normal case when terminate() was called.
1696        Err(error) => {
1697            tracing::warn!(
1698                port,
1699                %error,
1700                "Failed to check {service_kind} service status",
1701            );
1702        }
1703    }
1704}
1705
1706/// A running node service that can be queried over GraphQL.
1707pub struct NodeService {
1708    port: u16,
1709    child: Child,
1710    terminated: bool,
1711}
1712
1713impl Drop for NodeService {
1714    fn drop(&mut self) {
1715        if !self.terminated {
1716            log_unexpected_exit(&mut self.child, "node", self.port);
1717        }
1718    }
1719}
1720
1721impl NodeService {
1722    fn new(port: u16, child: Child) -> Self {
1723        Self {
1724            port,
1725            child,
1726            terminated: false,
1727        }
1728    }
1729
1730    /// Terminates the node service process.
1731    pub async fn terminate(mut self) -> Result<()> {
1732        self.terminated = true;
1733        self.child.kill().await.context("terminating node service")
1734    }
1735
1736    /// Returns the port the node service is listening on.
1737    pub fn port(&self) -> u16 {
1738        self.port
1739    }
1740
1741    /// Checks that the node service process is still running.
1742    pub fn ensure_is_running(&mut self) -> Result<()> {
1743        self.child.ensure_is_running()
1744    }
1745
1746    /// Runs the `processInbox` GraphQL mutation for the given chain.
1747    pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1748        let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1749        let mut data = self.query_node(query).await?;
1750        Ok(serde_json::from_value(data["processInbox"].take())?)
1751    }
1752
1753    /// Runs the `sync` GraphQL mutation for the given chain.
1754    pub async fn sync(&self, chain_id: &ChainId) -> Result<u64> {
1755        let query = format!("mutation {{ sync(chainId: \"{chain_id}\") }}");
1756        let mut data = self.query_node(query).await?;
1757        Ok(serde_json::from_value(data["sync"].take())?)
1758    }
1759
1760    /// Transfers the given amount from an owner to a recipient via the `transfer` mutation.
1761    pub async fn transfer(
1762        &self,
1763        chain_id: ChainId,
1764        owner: AccountOwner,
1765        recipient: Account,
1766        amount: Amount,
1767    ) -> Result<CryptoHash> {
1768        let json_owner = owner.to_value();
1769        let json_recipient = recipient.to_value();
1770        let query = format!(
1771            "mutation {{ transfer(\
1772                 chainId: \"{chain_id}\", \
1773                 owner: {json_owner}, \
1774                 recipient: {json_recipient}, \
1775                 amount: \"{amount}\") \
1776             }}"
1777        );
1778        let data = self.query_node(query).await?;
1779        serde_json::from_value(data["transfer"].clone())
1780            .context("missing transfer field in response")
1781    }
1782
1783    /// Queries the balance of the given account.
1784    pub async fn balance(&self, account: &Account) -> Result<Amount> {
1785        let chain = account.chain_id;
1786        let owner = account.owner;
1787        if matches!(owner, AccountOwner::CHAIN) {
1788            let query = format!(
1789                "query {{ chain(chainId:\"{chain}\") {{
1790                    executionState {{ system {{ balance }} }}
1791                }} }}"
1792            );
1793            let response = self.query_node(query).await?;
1794            let balance = &response["chain"]["executionState"]["system"]["balance"]
1795                .as_str()
1796                .unwrap();
1797            return Ok(Amount::from_str(balance)?);
1798        }
1799        let query = format!(
1800            "query {{ chain(chainId:\"{chain}\") {{
1801                executionState {{ system {{ balances {{
1802                    entry(key:\"{owner}\") {{ value }}
1803                }} }} }}
1804            }} }}"
1805        );
1806        let response = self.query_node(query).await?;
1807        let balances = &response["chain"]["executionState"]["system"]["balances"];
1808        let balance = balances["entry"]["value"].as_str();
1809        match balance {
1810            None => Ok(Amount::ZERO),
1811            Some(amount) => Ok(Amount::from_str(amount)?),
1812        }
1813    }
1814
1815    /// Returns an [`ApplicationWrapper`] for querying the given application over GraphQL.
1816    pub fn make_application<A: ContractAbi>(
1817        &self,
1818        chain_id: &ChainId,
1819        application_id: &ApplicationId<A>,
1820    ) -> Result<ApplicationWrapper<A>> {
1821        let application_id = application_id.forget_abi().to_string();
1822        let link = format!(
1823            "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1824            self.port
1825        );
1826        Ok(ApplicationWrapper::from(link))
1827    }
1828
1829    /// Publishes a data blob to the given chain via the `publishDataBlob` mutation.
1830    pub async fn publish_data_blob(
1831        &self,
1832        chain_id: &ChainId,
1833        bytes: Vec<u8>,
1834    ) -> Result<CryptoHash> {
1835        let query = format!(
1836            "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1837            chain_id.to_value(),
1838            bytes.to_value(),
1839        );
1840        let data = self.query_node(query).await?;
1841        serde_json::from_value(data["publishDataBlob"].clone())
1842            .context("missing publishDataBlob field in response")
1843    }
1844
1845    /// Publishes a module to the given chain via the `publishModule` mutation.
1846    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1847        &self,
1848        chain_id: &ChainId,
1849        contract: PathBuf,
1850        service: PathBuf,
1851        vm_runtime: VmRuntime,
1852    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1853        let contract_code = Bytecode::load_from_file(&contract).await?;
1854        let service_code = Bytecode::load_from_file(&service).await?;
1855        let query = format!(
1856            "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1857            chain_id.to_value(),
1858            contract_code.to_value(),
1859            service_code.to_value(),
1860            vm_runtime.to_value(),
1861        );
1862        let data = self.query_node(query).await?;
1863        let module_str = data["publishModule"]
1864            .as_str()
1865            .context("module ID not found")?;
1866        let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1867        Ok(module_id.with_abi())
1868    }
1869
1870    /// Queries the committee hash of the given chain.
1871    pub async fn query_committee_hash(&self, chain_id: &ChainId) -> Result<Option<CryptoHash>> {
1872        let query = format!(
1873            "query {{ chain(chainId:\"{chain_id}\") {{
1874                executionState {{ system {{ committeeHash }} }}
1875            }} }}"
1876        );
1877        let mut response = self.query_node(query).await?;
1878        let hash = response["chain"]["executionState"]["system"]["committeeHash"].take();
1879        Ok(serde_json::from_value(hash)?)
1880    }
1881
1882    /// Queries the current epoch of the given chain.
1883    pub async fn query_chain_epoch(&self, chain_id: &ChainId) -> Result<Epoch> {
1884        let query = format!(
1885            "query {{ chain(chainId:\"{chain_id}\") {{
1886                executionState {{ system {{ epoch }} }}
1887            }} }}"
1888        );
1889        let mut response = self.query_node(query).await?;
1890        let epoch = response["chain"]["executionState"]["system"]["epoch"].take();
1891        Ok(serde_json::from_value(epoch)?)
1892    }
1893
1894    /// Queries the events of the given stream starting from the given index.
1895    pub async fn events_from_index(
1896        &self,
1897        chain_id: &ChainId,
1898        stream_id: &StreamId,
1899        start_index: u32,
1900    ) -> Result<Vec<IndexAndEvent>> {
1901        let query = format!(
1902            "query {{
1903               eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1904               {{ index event }}
1905             }}",
1906            stream_id.to_value()
1907        );
1908        let mut response = self.query_node(query).await?;
1909        let response = response["eventsFromIndex"].take();
1910        Ok(serde_json::from_value(response)?)
1911    }
1912
1913    /// Posts the given GraphQL query to the node service, retrying on transient errors.
1914    pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1915        let n_try = 5;
1916        let query = query.as_ref();
1917        for i in 0..n_try {
1918            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1919            let url = format!("http://localhost:{}/", self.port);
1920            let client = reqwest_client();
1921            let result = client
1922                .post(url)
1923                .json(&json!({ "query": query }))
1924                .send()
1925                .await;
1926            if matches!(result, Err(ref error) if error.is_timeout()) {
1927                tracing::warn!(
1928                    "Timeout when sending query {} to the node service",
1929                    truncate_query_output(query)
1930                );
1931                continue;
1932            }
1933            let response = result.with_context(|| {
1934                format!(
1935                    "query_node: failed to post query={}",
1936                    truncate_query_output(query)
1937                )
1938            })?;
1939            ensure!(
1940                response.status().is_success(),
1941                "Query \"{}\" failed: {}",
1942                truncate_query_output(query),
1943                response
1944                    .text()
1945                    .await
1946                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1947            );
1948            let value: Value = response.json().await.context("invalid JSON")?;
1949            if let Some(errors) = value.get("errors") {
1950                tracing::warn!(
1951                    "Query \"{}\" failed: {}",
1952                    truncate_query_output(query),
1953                    errors
1954                );
1955            } else {
1956                return Ok(value["data"].clone());
1957            }
1958        }
1959        bail!(
1960            "Query \"{}\" failed after {} retries.",
1961            truncate_query_output(query),
1962            n_try
1963        );
1964    }
1965
1966    /// Creates an application from a published module via the `createApplication` mutation.
1967    pub async fn create_application<
1968        Abi: ContractAbi,
1969        Parameters: Serialize,
1970        InstantiationArgument: Serialize,
1971    >(
1972        &self,
1973        chain_id: &ChainId,
1974        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1975        parameters: &Parameters,
1976        argument: &InstantiationArgument,
1977        required_application_ids: &[ApplicationId],
1978    ) -> Result<ApplicationId<Abi>> {
1979        let module_id = module_id.forget_abi();
1980        let json_required_applications_ids = required_application_ids
1981            .iter()
1982            .map(ApplicationId::to_string)
1983            .collect::<Vec<_>>()
1984            .to_value();
1985        // Convert to `serde_json::Value` then `async_graphql::Value` via the trait `InputType`.
1986        let new_parameters = serde_json::to_value(parameters)
1987            .context("could not create parameters JSON")?
1988            .to_value();
1989        let new_argument = serde_json::to_value(argument)
1990            .context("could not create argument JSON")?
1991            .to_value();
1992        let query = format!(
1993            "mutation {{ createApplication(\
1994                 chainId: \"{chain_id}\",
1995                 moduleId: \"{module_id}\", \
1996                 parameters: {new_parameters}, \
1997                 instantiationArgument: {new_argument}, \
1998                 requiredApplicationIds: {json_required_applications_ids}) \
1999             }}"
2000        );
2001        let data = self.query_node(query).await?;
2002        let app_id_str = data["createApplication"]
2003            .as_str()
2004            .context("missing createApplication string in response")?
2005            .trim();
2006        Ok(app_id_str
2007            .parse::<ApplicationId>()
2008            .context("invalid application ID")?
2009            .with_abi())
2010    }
2011
2012    /// Obtains the hash and height of the `chain`'s tip block, as known by this node service.
2013    pub async fn chain_tip(&self, chain: ChainId) -> Result<Option<(CryptoHash, BlockHeight)>> {
2014        let query = format!(
2015            r#"query {{ block(chainId: "{chain}") {{
2016                hash
2017                block {{ header {{ height }} }}
2018            }} }}"#
2019        );
2020
2021        let mut response = self.query_node(&query).await?;
2022
2023        match (
2024            mem::take(&mut response["block"]["hash"]),
2025            mem::take(&mut response["block"]["block"]["header"]["height"]),
2026        ) {
2027            (Value::Null, Value::Null) => Ok(None),
2028            (Value::String(hash), Value::Number(height)) => Ok(Some((
2029                hash.parse()
2030                    .context("Received an invalid hash {hash:?} for chain tip")?,
2031                BlockHeight(height.as_u64().unwrap()),
2032            ))),
2033            invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
2034        }
2035    }
2036
2037    /// Subscribes to the node service and returns a stream of notifications about a chain.
2038    pub async fn notifications(
2039        &self,
2040        chain_id: ChainId,
2041    ) -> Result<Pin<Box<impl Stream<Item = Result<Notification>>>>> {
2042        let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
2043        let url = format!("ws://localhost:{}/ws", self.port);
2044        let mut request = url.into_client_request()?;
2045        request.headers_mut().insert(
2046            "Sec-WebSocket-Protocol",
2047            HeaderValue::from_str("graphql-transport-ws")?,
2048        );
2049        let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
2050        let init_json = json!({
2051          "type": "connection_init",
2052          "payload": {}
2053        });
2054        websocket.send(init_json.to_string().into()).await?;
2055        let text = websocket
2056            .next()
2057            .await
2058            .context("Failed to establish connection")??
2059            .into_text()?;
2060        ensure!(
2061            text == "{\"type\":\"connection_ack\"}",
2062            "Unexpected response: {text}"
2063        );
2064        let query_json = json!({
2065          "id": "1",
2066          "type": "start",
2067          "payload": {
2068            "query": query,
2069            "variables": {},
2070            "operationName": null
2071          }
2072        });
2073        websocket.send(query_json.to_string().into()).await?;
2074        Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
2075            |message| async {
2076                let text = message.into_text()?;
2077                let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
2078                if let Some(errors) = value["payload"].get("errors") {
2079                    bail!("Notification subscription failed: {errors:?}");
2080                }
2081                serde_json::from_value(value["payload"]["data"]["notifications"].clone())
2082                    .context("Failed to deserialize notification")
2083            },
2084        )))
2085    }
2086
2087    /// Subscribes to query results via the `queryResult` GraphQL subscription.
2088    pub async fn query_result(
2089        &self,
2090        name: &str,
2091        chain_id: ChainId,
2092        application_id: &ApplicationId,
2093    ) -> Result<Pin<Box<impl Stream<Item = Result<Value>>>>> {
2094        let query = format!(
2095            r#"subscription {{ queryResult(name: "{name}", chainId: "{chain_id}", applicationId: "{application_id}") }}"#,
2096        );
2097        let url = format!("ws://localhost:{}/ws", self.port);
2098        let mut request = url.into_client_request()?;
2099        request.headers_mut().insert(
2100            "Sec-WebSocket-Protocol",
2101            HeaderValue::from_str("graphql-transport-ws")?,
2102        );
2103        let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
2104        let init_json = json!({
2105          "type": "connection_init",
2106          "payload": {}
2107        });
2108        websocket.send(init_json.to_string().into()).await?;
2109        let text = websocket
2110            .next()
2111            .await
2112            .context("Failed to establish connection")??
2113            .into_text()?;
2114        ensure!(
2115            text == "{\"type\":\"connection_ack\"}",
2116            "Unexpected response: {text}"
2117        );
2118        let query_json = json!({
2119          "id": "1",
2120          "type": "start",
2121          "payload": {
2122            "query": query,
2123            "variables": {},
2124            "operationName": null
2125          }
2126        });
2127        websocket.send(query_json.to_string().into()).await?;
2128        Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
2129            |message| async {
2130                let text = message.into_text()?;
2131                let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
2132                if let Some(errors) = value["payload"].get("errors") {
2133                    bail!("Query result subscription failed: {errors:?}");
2134                }
2135                Ok(value["payload"]["data"]["queryResult"].clone())
2136            },
2137        )))
2138    }
2139}
2140
2141/// A running faucet service.
2142pub struct FaucetService {
2143    port: u16,
2144    child: Child,
2145    _temp_dir: tempfile::TempDir,
2146    terminated: bool,
2147}
2148
2149impl Drop for FaucetService {
2150    fn drop(&mut self) {
2151        if !self.terminated {
2152            log_unexpected_exit(&mut self.child, "faucet", self.port);
2153        }
2154    }
2155}
2156
2157impl FaucetService {
2158    fn new(port: u16, child: Child, temp_dir: tempfile::TempDir) -> Self {
2159        Self {
2160            port,
2161            child,
2162            _temp_dir: temp_dir,
2163            terminated: false,
2164        }
2165    }
2166
2167    /// Terminates the faucet service process.
2168    pub async fn terminate(mut self) -> Result<()> {
2169        self.terminated = true;
2170        self.child
2171            .kill()
2172            .await
2173            .context("terminating faucet service")
2174    }
2175
2176    /// Checks that the faucet service process is still running.
2177    pub fn ensure_is_running(&mut self) -> Result<()> {
2178        self.child.ensure_is_running()
2179    }
2180
2181    /// Returns a [`Faucet`] client pointing at this service.
2182    pub fn instance(&self) -> Faucet {
2183        Faucet::new(format!("http://localhost:{}/", self.port))
2184    }
2185}
2186
2187/// A running `Application` to be queried in GraphQL.
2188pub struct ApplicationWrapper<A> {
2189    uri: String,
2190    _phantom: PhantomData<A>,
2191}
2192
2193impl<A> ApplicationWrapper<A> {
2194    /// Posts the given GraphQL query and returns the `data` field of the response.
2195    pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
2196        let query = query.as_ref();
2197        let value = self.run_json_query(json!({ "query": query })).await?;
2198        Ok(value["data"].clone())
2199    }
2200
2201    /// Posts the given JSON request to the application, retrying on transient errors.
2202    pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
2203        const MAX_RETRIES: usize = 5;
2204
2205        for i in 0.. {
2206            let client = reqwest_client();
2207            let result = client.post(&self.uri).json(&query).send().await;
2208            let response = match result {
2209                Ok(response) => response,
2210                Err(error) if i < MAX_RETRIES => {
2211                    tracing::warn!(
2212                        "Failed to post query \"{}\": {error}; retrying",
2213                        truncate_query_output_serialize(&query),
2214                    );
2215                    continue;
2216                }
2217                Err(error) => {
2218                    let query = truncate_query_output_serialize(&query);
2219                    return Err(error)
2220                        .with_context(|| format!("run_json_query: failed to post query={query}"));
2221                }
2222            };
2223            ensure!(
2224                response.status().is_success(),
2225                "Query \"{}\" failed: {}",
2226                truncate_query_output_serialize(&query),
2227                response
2228                    .text()
2229                    .await
2230                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
2231            );
2232            let value: Value = response.json().await.context("invalid JSON")?;
2233            if let Some(errors) = value.get("errors") {
2234                bail!(
2235                    "Query \"{}\" failed: {}",
2236                    truncate_query_output_serialize(&query),
2237                    errors
2238                );
2239            }
2240            return Ok(value);
2241        }
2242        unreachable!()
2243    }
2244
2245    /// Runs the given string as a GraphQL query, wrapping it in `query { ... }`.
2246    pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
2247        let query = query.as_ref();
2248        self.run_graphql_query(&format!("query {{ {query} }}"))
2249            .await
2250    }
2251
2252    /// Runs the given GraphQL query and deserializes the named field of the response.
2253    pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
2254        let query = query.as_ref().trim();
2255        let name = query
2256            .split_once(|ch: char| !ch.is_alphanumeric())
2257            .map_or(query, |(name, _)| name);
2258        let data = self.query(query).await?;
2259        serde_json::from_value(data[name].clone())
2260            .with_context(|| format!("{name} field missing in response"))
2261    }
2262
2263    /// Runs the given string as a GraphQL mutation, wrapping it in `mutation { ... }`.
2264    pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
2265        let mutation = mutation.as_ref();
2266        self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
2267            .await
2268    }
2269
2270    /// Runs several GraphQL mutations as aliased fields in a single `mutation` request.
2271    pub async fn multiple_mutate(&self, mutations: &[String]) -> Result<Value> {
2272        let mut out = String::from("mutation {\n");
2273        for (index, mutation) in mutations.iter().enumerate() {
2274            out = format!("{out}  u{index}: {mutation}\n");
2275        }
2276        out.push_str("}\n");
2277        self.run_graphql_query(&out).await
2278    }
2279}
2280
2281impl<A> From<String> for ApplicationWrapper<A> {
2282    fn from(uri: String) -> ApplicationWrapper<A> {
2283        ApplicationWrapper {
2284            uri,
2285            _phantom: PhantomData,
2286        }
2287    }
2288}
2289
2290/// Returns the timeout for tests that wait for notifications, either read from the env
2291/// variable `LINERA_TEST_NOTIFICATION_TIMEOUT_MS`, or the default value of 10 seconds.
2292#[cfg(with_testing)]
2293fn notification_timeout() -> Duration {
2294    const NOTIFICATION_TIMEOUT_MS_ENV: &str = "LINERA_TEST_NOTIFICATION_TIMEOUT_MS";
2295    const NOTIFICATION_TIMEOUT_MS_DEFAULT: u64 = 10_000;
2296
2297    match env::var(NOTIFICATION_TIMEOUT_MS_ENV) {
2298        Ok(var) => Duration::from_millis(var.parse().unwrap_or_else(|error| {
2299            panic!("{NOTIFICATION_TIMEOUT_MS_ENV} is not a valid number: {error}")
2300        })),
2301        Err(env::VarError::NotPresent) => Duration::from_millis(NOTIFICATION_TIMEOUT_MS_DEFAULT),
2302        Err(env::VarError::NotUnicode(_)) => {
2303            panic!("{NOTIFICATION_TIMEOUT_MS_ENV} must be valid Unicode")
2304        }
2305    }
2306}
2307
2308/// Extension trait for waiting on specific notifications from a stream.
2309#[cfg(with_testing)]
2310pub trait NotificationsExt {
2311    /// Waits for a notification for which `f` returns `Some(t)`, and returns `t`.
2312    fn wait_for<T>(
2313        &mut self,
2314        f: impl FnMut(Notification) -> Option<T>,
2315    ) -> impl Future<Output = Result<T>>;
2316
2317    /// Waits for a `NewEvents` notification for the given block height. If no height is specified,
2318    /// any height is accepted.
2319    fn wait_for_events(
2320        &mut self,
2321        expected_height: impl Into<Option<BlockHeight>>,
2322    ) -> impl Future<Output = Result<BTreeSet<StreamId>>> {
2323        let expected_height = expected_height.into();
2324        self.wait_for(move |notification| {
2325            if let Reason::NewEvents {
2326                height,
2327                event_streams,
2328                ..
2329            } = notification.reason
2330            {
2331                if expected_height.is_none_or(|h| h == height) {
2332                    return Some(event_streams);
2333                }
2334            }
2335            None
2336        })
2337    }
2338
2339    /// Waits for a `NewBlock` notification for the given block height. If no height is specified,
2340    /// any height is accepted.
2341    fn wait_for_block(
2342        &mut self,
2343        expected_height: impl Into<Option<BlockHeight>>,
2344    ) -> impl Future<Output = Result<CryptoHash>> {
2345        let expected_height = expected_height.into();
2346        self.wait_for(move |notification| {
2347            if let Reason::NewBlock { height, hash, .. } = notification.reason {
2348                if expected_height.is_none_or(|h| h == height) {
2349                    return Some(hash);
2350                }
2351            }
2352            None
2353        })
2354    }
2355
2356    /// Waits for a `NewIncomingBundle` notification for the given sender chain and sender block
2357    /// height. If no height is specified, any height is accepted.
2358    fn wait_for_bundle(
2359        &mut self,
2360        expected_origin: ChainId,
2361        expected_height: impl Into<Option<BlockHeight>>,
2362    ) -> impl Future<Output = Result<()>> {
2363        let expected_height = expected_height.into();
2364        self.wait_for(move |notification| {
2365            if let Reason::NewIncomingBundle { height, origin } = notification.reason {
2366                if expected_height.is_none_or(|h| h == height) && origin == expected_origin {
2367                    return Some(());
2368                }
2369            }
2370            None
2371        })
2372    }
2373}
2374
2375#[cfg(with_testing)]
2376impl<S: Stream<Item = Result<Notification>>> NotificationsExt for Pin<Box<S>> {
2377    async fn wait_for<T>(&mut self, mut f: impl FnMut(Notification) -> Option<T>) -> Result<T> {
2378        let mut timeout = Box::pin(linera_base::time::timer::sleep(notification_timeout())).fuse();
2379        loop {
2380            let notification = futures::select! {
2381                () = timeout => bail!("Timeout waiting for notification"),
2382                notification = self.next().fuse() => notification.context("Stream closed")??,
2383            };
2384            if let Some(t) = f(notification) {
2385                return Ok(t);
2386            }
2387        }
2388    }
2389}