linera_service/cli_wrappers/
wallet.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    borrow::Cow,
6    collections::BTreeMap,
7    env,
8    marker::PhantomData,
9    mem,
10    path::{Path, PathBuf},
11    pin::Pin,
12    process::Stdio,
13    str::FromStr,
14    sync,
15    time::Duration,
16};
17
18use anyhow::{bail, ensure, Context, Result};
19use async_graphql::InputType;
20use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
21use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
22use heck::ToKebabCase;
23use linera_base::{
24    abi::ContractAbi,
25    command::{resolve_binary, CommandExt},
26    crypto::{CryptoHash, InMemorySigner},
27    data_types::{Amount, ApplicationPermissions, BlockHeight, Bytecode, Epoch},
28    identifiers::{
29        Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
30    },
31    vm::VmRuntime,
32};
33use linera_client::client_options::ResourceControlPolicyConfig;
34use linera_core::worker::Notification;
35use linera_faucet_client::Faucet;
36use serde::{de::DeserializeOwned, ser::Serialize};
37use serde_command_opts::to_args;
38use serde_json::{json, Value};
39use tempfile::TempDir;
40use tokio::{
41    io::{AsyncBufReadExt, BufReader},
42    process::{Child, Command},
43    sync::oneshot,
44    task::JoinHandle,
45};
46#[cfg(with_testing)]
47use {
48    futures::FutureExt as _,
49    linera_core::worker::Reason,
50    std::{collections::BTreeSet, future::Future},
51};
52
53use crate::{
54    cli::command::{BenchmarkCommand, ResourceControlPolicyOverrides},
55    cli_wrappers::{
56        local_net::{PathProvider, ProcessInbox},
57        Network,
58    },
59    util::{self, ChildExt},
60    Wallet,
61};
62
63/// The name of the environment variable that allows specifying additional arguments to be passed
64/// to the node-service command of the client.
65const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
66
67fn reqwest_client() -> reqwest::Client {
68    reqwest::ClientBuilder::new()
69        .timeout(Duration::from_secs(60))
70        .build()
71        .unwrap()
72}
73
74/// Wrapper to run a Linera client command.
75pub struct ClientWrapper {
76    binary_path: sync::Mutex<Option<PathBuf>>,
77    testing_prng_seed: Option<u64>,
78    storage: String,
79    wallet: String,
80    keystore: String,
81    max_pending_message_bundles: usize,
82    network: Network,
83    pub path_provider: PathProvider,
84    on_drop: OnClientDrop,
85    extra_args: Vec<String>,
86}
87
88/// Action to perform when the [`ClientWrapper`] is dropped.
89#[derive(Clone, Copy, Debug, Eq, PartialEq)]
90pub enum OnClientDrop {
91    /// Close all the chains on the wallet.
92    CloseChains,
93    /// Do not close any chains, leaving them active.
94    LeakChains,
95}
96
97impl ClientWrapper {
98    pub fn new(
99        path_provider: PathProvider,
100        network: Network,
101        testing_prng_seed: Option<u64>,
102        id: usize,
103        on_drop: OnClientDrop,
104    ) -> Self {
105        Self::new_with_extra_args(
106            path_provider,
107            network,
108            testing_prng_seed,
109            id,
110            on_drop,
111            vec!["--wait-for-outgoing-messages".to_string()],
112            None,
113        )
114    }
115
116    pub fn new_with_extra_args(
117        path_provider: PathProvider,
118        network: Network,
119        testing_prng_seed: Option<u64>,
120        id: usize,
121        on_drop: OnClientDrop,
122        extra_args: Vec<String>,
123        binary_dir: Option<PathBuf>,
124    ) -> Self {
125        let storage = format!(
126            "rocksdb:{}/client_{}.db",
127            path_provider.path().display(),
128            id
129        );
130        let wallet = format!("wallet_{id}.json");
131        let keystore = format!("keystore_{id}.json");
132        let binary_path = binary_dir.map(|dir| dir.join("linera"));
133        Self {
134            binary_path: sync::Mutex::new(binary_path),
135            testing_prng_seed,
136            storage,
137            wallet,
138            keystore,
139            max_pending_message_bundles: 10_000,
140            network,
141            path_provider,
142            on_drop,
143            extra_args,
144        }
145    }
146
147    /// Runs `linera project new`.
148    pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
149        let tmp = TempDir::new()?;
150        let mut command = self.command().await?;
151        command
152            .current_dir(tmp.path())
153            .arg("project")
154            .arg("new")
155            .arg(project_name)
156            .arg("--linera-root")
157            .arg(linera_root)
158            .spawn_and_wait_for_stdout()
159            .await?;
160        Ok(tmp)
161    }
162
163    /// Runs `linera project publish`.
164    pub async fn project_publish<T: Serialize>(
165        &self,
166        path: PathBuf,
167        required_application_ids: Vec<String>,
168        publisher: impl Into<Option<ChainId>>,
169        argument: &T,
170    ) -> Result<String> {
171        let json_parameters = serde_json::to_string(&())?;
172        let json_argument = serde_json::to_string(argument)?;
173        let mut command = self.command().await?;
174        command
175            .arg("project")
176            .arg("publish-and-create")
177            .arg(path)
178            .args(publisher.into().iter().map(ChainId::to_string))
179            .args(["--json-parameters", &json_parameters])
180            .args(["--json-argument", &json_argument]);
181        if !required_application_ids.is_empty() {
182            command.arg("--required-application-ids");
183            command.args(required_application_ids);
184        }
185        let stdout = command.spawn_and_wait_for_stdout().await?;
186        Ok(stdout.trim().to_string())
187    }
188
189    /// Runs `linera project test`.
190    pub async fn project_test(&self, path: &Path) -> Result<()> {
191        self.command()
192            .await
193            .context("failed to create project test command")?
194            .current_dir(path)
195            .arg("project")
196            .arg("test")
197            .spawn_and_wait_for_stdout()
198            .await?;
199        Ok(())
200    }
201
202    async fn command_with_envs_and_arguments(
203        &self,
204        envs: &[(&str, &str)],
205        arguments: impl IntoIterator<Item = Cow<'_, str>>,
206    ) -> Result<Command> {
207        let mut command = self.command_binary().await?;
208        command.current_dir(self.path_provider.path());
209        for (key, value) in envs {
210            command.env(key, value);
211        }
212        for argument in arguments {
213            command.arg(&*argument);
214        }
215        Ok(command)
216    }
217
218    async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
219        self.command_with_envs_and_arguments(envs, self.command_arguments())
220            .await
221    }
222
223    async fn command_with_arguments(
224        &self,
225        arguments: impl IntoIterator<Item = Cow<'_, str>>,
226    ) -> Result<Command> {
227        self.command_with_envs_and_arguments(
228            &[(
229                "RUST_LOG",
230                &std::env::var("RUST_LOG").unwrap_or_else(|_| String::from("linera=debug")),
231            )],
232            arguments,
233        )
234        .await
235    }
236
237    async fn command(&self) -> Result<Command> {
238        self.command_with_envs(&[(
239            "RUST_LOG",
240            &std::env::var("RUST_LOG").unwrap_or_else(|_| String::from("linera=debug")),
241        )])
242        .await
243    }
244
245    fn required_command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
246        [
247            "--wallet".into(),
248            self.wallet.as_str().into(),
249            "--keystore".into(),
250            self.keystore.as_str().into(),
251            "--storage".into(),
252            self.storage.as_str().into(),
253            "--send-timeout-ms".into(),
254            "500000".into(),
255            "--recv-timeout-ms".into(),
256            "500000".into(),
257        ]
258        .into_iter()
259        .chain(self.extra_args.iter().map(|s| s.as_str().into()))
260    }
261
262    /// Returns an iterator over the arguments that should be added to all command invocations.
263    fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
264        self.required_command_arguments().chain([
265            "--max-pending-message-bundles".into(),
266            self.max_pending_message_bundles.to_string().into(),
267        ])
268    }
269
270    /// Returns the [`Command`] instance configured to run the appropriate binary.
271    ///
272    /// The path is resolved once and cached inside `self` for subsequent usages.
273    async fn command_binary(&self) -> Result<Command> {
274        match self.command_with_cached_binary_path() {
275            Some(command) => Ok(command),
276            None => {
277                let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
278                let command = Command::new(&resolved_path);
279
280                self.set_cached_binary_path(resolved_path);
281
282                Ok(command)
283            }
284        }
285    }
286
287    /// Returns a [`Command`] instance configured with the cached `binary_path`, if available.
288    fn command_with_cached_binary_path(&self) -> Option<Command> {
289        let binary_path = self.binary_path.lock().unwrap();
290
291        binary_path.as_ref().map(Command::new)
292    }
293
294    /// Sets the cached `binary_path` with the `new_binary_path`.
295    ///
296    /// # Panics
297    ///
298    /// If the cache is already set to a different value. In theory the two threads calling
299    /// `command_binary` can race and resolve the binary path twice, but they should always be the
300    /// same path.
301    fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
302        let mut binary_path = self.binary_path.lock().unwrap();
303
304        if binary_path.is_none() {
305            *binary_path = Some(new_binary_path);
306        } else {
307            assert_eq!(*binary_path, Some(new_binary_path));
308        }
309    }
310
311    /// Runs `linera create-genesis-config`.
312    pub async fn create_genesis_config(
313        &self,
314        num_other_initial_chains: u32,
315        initial_funding: Amount,
316        policy_config: ResourceControlPolicyConfig,
317        http_allow_list: Option<Vec<String>>,
318    ) -> Result<()> {
319        let mut command = self.command().await?;
320        command
321            .args([
322                "create-genesis-config",
323                &num_other_initial_chains.to_string(),
324            ])
325            .args(["--initial-funding", &initial_funding.to_string()])
326            .args(["--committee", "committee.json"])
327            .args(["--genesis", "genesis.json"])
328            .args([
329                "--policy-config",
330                &policy_config.to_string().to_kebab_case(),
331            ]);
332        if let Some(allow_list) = http_allow_list {
333            command
334                .arg("--http-request-allow-list")
335                .arg(allow_list.join(","));
336        }
337        if let Some(seed) = self.testing_prng_seed {
338            command.arg("--testing-prng-seed").arg(seed.to_string());
339        }
340        command.spawn_and_wait_for_stdout().await?;
341        Ok(())
342    }
343
344    /// Runs `linera wallet init`. The genesis config is read from `genesis.json`, or from the
345    /// faucet if provided.
346    pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
347        let mut command = self.command().await?;
348        command.args(["wallet", "init"]);
349        match faucet {
350            None => command.args(["--genesis", "genesis.json"]),
351            Some(faucet) => command.args(["--faucet", faucet.url()]),
352        };
353        if let Some(seed) = self.testing_prng_seed {
354            command.arg("--testing-prng-seed").arg(seed.to_string());
355        }
356        command.spawn_and_wait_for_stdout().await?;
357        Ok(())
358    }
359
360    /// Runs `linera wallet request-chain`.
361    pub async fn request_chain(
362        &self,
363        faucet: &Faucet,
364        set_default: bool,
365    ) -> Result<(ChainId, AccountOwner)> {
366        let mut command = self.command().await?;
367        command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
368        if set_default {
369            command.arg("--set-default");
370        }
371        let stdout = command.spawn_and_wait_for_stdout().await?;
372        let mut lines = stdout.split_whitespace();
373        let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
374        let owner = lines.next().context("missing chain owner")?.parse()?;
375        Ok((chain_id, owner))
376    }
377
378    /// Runs `linera wallet publish-and-create`.
379    #[expect(clippy::too_many_arguments)]
380    pub async fn publish_and_create<
381        A: ContractAbi,
382        Parameters: Serialize,
383        InstantiationArgument: Serialize,
384    >(
385        &self,
386        contract: PathBuf,
387        service: PathBuf,
388        vm_runtime: VmRuntime,
389        parameters: &Parameters,
390        argument: &InstantiationArgument,
391        required_application_ids: &[ApplicationId],
392        publisher: impl Into<Option<ChainId>>,
393    ) -> Result<ApplicationId<A>> {
394        let json_parameters = serde_json::to_string(parameters)?;
395        let json_argument = serde_json::to_string(argument)?;
396        let mut command = self.command().await?;
397        let vm_runtime = format!("{vm_runtime}");
398        command
399            .arg("publish-and-create")
400            .args([contract, service])
401            .args(["--vm-runtime", &vm_runtime.to_lowercase()])
402            .args(publisher.into().iter().map(ChainId::to_string))
403            .args(["--json-parameters", &json_parameters])
404            .args(["--json-argument", &json_argument]);
405        if !required_application_ids.is_empty() {
406            command.arg("--required-application-ids");
407            command.args(
408                required_application_ids
409                    .iter()
410                    .map(ApplicationId::to_string),
411            );
412        }
413        let stdout = command.spawn_and_wait_for_stdout().await?;
414        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
415    }
416
417    /// Runs `linera publish-module`.
418    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
419        &self,
420        contract: PathBuf,
421        service: PathBuf,
422        vm_runtime: VmRuntime,
423        publisher: impl Into<Option<ChainId>>,
424    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
425        self.publish_module_with_formats(contract, service, None, vm_runtime, publisher)
426            .await
427    }
428
429    /// Runs `linera publish-module` with an optional `--formats` SNAP file path.
430    pub async fn publish_module_with_formats<Abi, Parameters, InstantiationArgument>(
431        &self,
432        contract: PathBuf,
433        service: PathBuf,
434        formats: Option<PathBuf>,
435        vm_runtime: VmRuntime,
436        publisher: impl Into<Option<ChainId>>,
437    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
438        let mut command = self.command().await?;
439        command
440            .arg("publish-module")
441            .args([contract, service])
442            .args(["--vm-runtime", &format!("{vm_runtime}").to_lowercase()]);
443        if let Some(formats_path) = formats {
444            command.arg("--formats").arg(formats_path);
445        }
446        let stdout = command
447            .args(publisher.into().iter().map(ChainId::to_string))
448            .spawn_and_wait_for_stdout()
449            .await?;
450        let module_id: ModuleId = stdout.trim().parse()?;
451        Ok(module_id.with_abi())
452    }
453
454    /// Runs `linera create-application`.
455    pub async fn create_application<
456        Abi: ContractAbi,
457        Parameters: Serialize,
458        InstantiationArgument: Serialize,
459    >(
460        &self,
461        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
462        parameters: &Parameters,
463        argument: &InstantiationArgument,
464        required_application_ids: &[ApplicationId],
465        creator: impl Into<Option<ChainId>>,
466    ) -> Result<ApplicationId<Abi>> {
467        let json_parameters = serde_json::to_string(parameters)?;
468        let json_argument = serde_json::to_string(argument)?;
469        let mut command = self.command().await?;
470        command
471            .arg("create-application")
472            .arg(module_id.forget_abi().to_string())
473            .args(["--json-parameters", &json_parameters])
474            .args(["--json-argument", &json_argument])
475            .args(creator.into().iter().map(ChainId::to_string));
476        if !required_application_ids.is_empty() {
477            command.arg("--required-application-ids");
478            command.args(
479                required_application_ids
480                    .iter()
481                    .map(ApplicationId::to_string),
482            );
483        }
484        let stdout = command.spawn_and_wait_for_stdout().await?;
485        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
486    }
487
488    /// Runs `linera service`.
489    pub async fn run_node_service(
490        &self,
491        port: impl Into<Option<u16>>,
492        process_inbox: ProcessInbox,
493    ) -> Result<NodeService> {
494        self.run_node_service_with_options(port, process_inbox, &[], &[], false)
495            .await
496    }
497
498    /// Runs `linera service` with optional task processor configuration.
499    pub async fn run_node_service_with_options(
500        &self,
501        port: impl Into<Option<u16>>,
502        process_inbox: ProcessInbox,
503        operator_application_ids: &[ApplicationId],
504        operators: &[(String, PathBuf)],
505        read_only: bool,
506    ) -> Result<NodeService> {
507        self.run_node_service_with_all_options(
508            port,
509            process_inbox,
510            operator_application_ids,
511            operators,
512            read_only,
513            &[],
514            &[],
515        )
516        .await
517    }
518
519    /// Runs `linera service` with all available options.
520    #[expect(clippy::too_many_arguments)]
521    pub async fn run_node_service_with_all_options(
522        &self,
523        port: impl Into<Option<u16>>,
524        process_inbox: ProcessInbox,
525        operator_application_ids: &[ApplicationId],
526        operators: &[(String, PathBuf)],
527        read_only: bool,
528        allowed_subscriptions: &[String],
529        subscription_ttls: &[(String, u64)],
530    ) -> Result<NodeService> {
531        let port = port.into().unwrap_or(8080);
532        let mut command = self.command().await?;
533        command.arg("service");
534        if let ProcessInbox::Skip = process_inbox {
535            command.arg("--listener-skip-process-inbox");
536        }
537        if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
538            command.args(var.split_whitespace());
539        }
540        for app_id in operator_application_ids {
541            command.args(["--operator-application-ids", &app_id.to_string()]);
542        }
543        for (name, path) in operators {
544            command.args(["--operators", &format!("{}={}", name, path.display())]);
545        }
546        if read_only {
547            command.arg("--read-only");
548        }
549        for query in allowed_subscriptions {
550            command.args(["--allow-subscription", query]);
551        }
552        for (name, secs) in subscription_ttls {
553            command.args(["--subscription-ttl-secs", &format!("{name}={secs}")]);
554        }
555        let child = command
556            .args(["--port".to_string(), port.to_string()])
557            .spawn_into()?;
558        let client = reqwest_client();
559        for i in 0..10 {
560            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
561            let request = client.get(format!("http://localhost:{port}/")).send().await;
562            if request.is_ok() {
563                tracing::info!("Node service has started");
564                return Ok(NodeService::new(port, child));
565            } else {
566                tracing::warn!("Waiting for node service to start");
567            }
568        }
569        bail!("Failed to start node service");
570    }
571
572    /// Runs `linera service` with a controller application.
573    pub async fn run_node_service_with_controller(
574        &self,
575        port: impl Into<Option<u16>>,
576        process_inbox: ProcessInbox,
577        controller_id: &ApplicationId,
578        operators: &[(String, PathBuf)],
579    ) -> Result<NodeService> {
580        let port = port.into().unwrap_or(8080);
581        let mut command = self.command().await?;
582        command.arg("service");
583        if let ProcessInbox::Skip = process_inbox {
584            command.arg("--listener-skip-process-inbox");
585        }
586        if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
587            command.args(var.split_whitespace());
588        }
589        command.args(["--controller-id", &controller_id.to_string()]);
590        for (name, path) in operators {
591            command.args(["--operators", &format!("{}={}", name, path.display())]);
592        }
593        let child = command
594            .args(["--port".to_string(), port.to_string()])
595            .spawn_into()?;
596        let client = reqwest_client();
597        for i in 0..10 {
598            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
599            let request = client.get(format!("http://localhost:{port}/")).send().await;
600            if request.is_ok() {
601                tracing::info!("Node service has started");
602                return Ok(NodeService::new(port, child));
603            } else {
604                tracing::warn!("Waiting for node service to start");
605            }
606        }
607        bail!("Failed to start node service");
608    }
609
610    /// Runs `linera validator query`
611    pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
612        let mut command = self.command().await?;
613        command.arg("validator").arg("query").arg(address);
614        let stdout = command.spawn_and_wait_for_stdout().await?;
615
616        // Parse the genesis config hash from the output.
617        // It's on a line like "Genesis config hash: <hash>"
618        let hash = stdout
619            .lines()
620            .find_map(|line| {
621                line.strip_prefix("Genesis config hash: ")
622                    .and_then(|hash_str| hash_str.trim().parse().ok())
623            })
624            .context("error while parsing the result of `linera validator query`")?;
625        Ok(hash)
626    }
627
628    /// Runs `linera validator list`.
629    pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
630        let mut command = self.command().await?;
631        command.arg("validator").arg("list");
632        if let Some(chain_id) = chain_id {
633            command.args(["--chain-id", &chain_id.to_string()]);
634        }
635        command.spawn_and_wait_for_stdout().await?;
636        Ok(())
637    }
638
639    /// Runs `linera sync-validator`.
640    pub async fn sync_validator(
641        &self,
642        chain_ids: impl IntoIterator<Item = &ChainId>,
643        validator_address: impl Into<String>,
644    ) -> Result<()> {
645        let mut command = self.command().await?;
646        command
647            .arg("validator")
648            .arg("sync")
649            .arg(validator_address.into());
650        let mut chain_ids = chain_ids.into_iter().peekable();
651        if chain_ids.peek().is_some() {
652            command
653                .arg("--chains")
654                .args(chain_ids.map(ChainId::to_string));
655        }
656        command.spawn_and_wait_for_stdout().await?;
657        Ok(())
658    }
659
660    /// Runs `linera faucet`.
661    pub async fn run_faucet(
662        &self,
663        port: impl Into<Option<u16>>,
664        chain_id: Option<ChainId>,
665        amount: Amount,
666    ) -> Result<FaucetService> {
667        let port = port.into().unwrap_or(8080);
668        let temp_dir = tempfile::tempdir()
669            .context("Failed to create temporary directory for faucet storage")?;
670        let storage_path = temp_dir.path().join("faucet_storage.sqlite");
671        let mut command = self.command().await?;
672        let command = command
673            .arg("faucet")
674            .args(["--port".to_string(), port.to_string()])
675            .args(["--amount".to_string(), amount.to_string()])
676            .args([
677                "--storage-path".to_string(),
678                storage_path.to_string_lossy().to_string(),
679            ]);
680        if let Some(chain_id) = chain_id {
681            command.arg(chain_id.to_string());
682        }
683        let child = command.spawn_into()?;
684        let client = reqwest_client();
685        for i in 0..10 {
686            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
687            let request = client.get(format!("http://localhost:{port}/")).send().await;
688            if request.is_ok() {
689                tracing::info!("Faucet has started");
690                return Ok(FaucetService::new(port, child, temp_dir));
691            } else {
692                tracing::debug!("Waiting for faucet to start");
693            }
694        }
695        bail!("Failed to start faucet");
696    }
697
698    /// Runs `linera local-balance`.
699    pub async fn local_balance(&self, account: Account) -> Result<Amount> {
700        let stdout = self
701            .command()
702            .await?
703            .arg("local-balance")
704            .arg(account.to_string())
705            .spawn_and_wait_for_stdout()
706            .await?;
707        let amount = stdout
708            .trim()
709            .parse()
710            .context("error while parsing the result of `linera local-balance`")?;
711        Ok(amount)
712    }
713
714    /// Runs `linera query-balance`.
715    pub async fn query_balance(&self, account: Account) -> Result<Amount> {
716        let stdout = self
717            .command()
718            .await?
719            .arg("query-balance")
720            .arg(account.to_string())
721            .spawn_and_wait_for_stdout()
722            .await?;
723        let amount = stdout
724            .trim()
725            .parse()
726            .context("error while parsing the result of `linera query-balance`")?;
727        Ok(amount)
728    }
729
730    /// Runs `linera query-application` and parses the JSON result.
731    pub async fn query_application_json<T: DeserializeOwned>(
732        &self,
733        chain_id: ChainId,
734        application_id: ApplicationId,
735        query: impl AsRef<str>,
736    ) -> Result<T> {
737        let query = query.as_ref().trim();
738        let name = query
739            .split_once(|ch: char| !ch.is_alphanumeric())
740            .map_or(query, |(name, _)| name);
741        let stdout = self
742            .command()
743            .await?
744            .arg("query-application")
745            .arg("--chain-id")
746            .arg(chain_id.to_string())
747            .arg("--application-id")
748            .arg(application_id.to_string())
749            .arg(query)
750            .spawn_and_wait_for_stdout()
751            .await?;
752        let data: serde_json::Value =
753            serde_json::from_str(stdout.trim()).context("invalid JSON from query-application")?;
754        serde_json::from_value(data[name].clone())
755            .with_context(|| format!("{name} field missing in query-application response"))
756    }
757
758    /// Runs `linera sync`.
759    pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
760        self.command()
761            .await?
762            .arg("sync")
763            .arg(chain_id.to_string())
764            .spawn_and_wait_for_stdout()
765            .await?;
766        Ok(())
767    }
768
769    /// Runs `linera process-inbox`.
770    pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
771        self.command()
772            .await?
773            .arg("process-inbox")
774            .arg(chain_id.to_string())
775            .spawn_and_wait_for_stdout()
776            .await?;
777        Ok(())
778    }
779
780    /// Runs `linera transfer`.
781    pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
782        self.command()
783            .await?
784            .arg("transfer")
785            .arg(amount.to_string())
786            .args(["--from", &from.to_string()])
787            .args(["--to", &to.to_string()])
788            .spawn_and_wait_for_stdout()
789            .await?;
790        Ok(())
791    }
792
793    /// Runs `linera transfer` with no logging.
794    pub async fn transfer_with_silent_logs(
795        &self,
796        amount: Amount,
797        from: ChainId,
798        to: ChainId,
799    ) -> Result<()> {
800        self.command()
801            .await?
802            .env("RUST_LOG", "off")
803            .arg("transfer")
804            .arg(amount.to_string())
805            .args(["--from", &from.to_string()])
806            .args(["--to", &to.to_string()])
807            .spawn_and_wait_for_stdout()
808            .await?;
809        Ok(())
810    }
811
812    /// Runs `linera transfer` with owner accounts.
813    pub async fn transfer_with_accounts(
814        &self,
815        amount: Amount,
816        from: Account,
817        to: Account,
818    ) -> Result<()> {
819        self.command()
820            .await?
821            .arg("transfer")
822            .arg(amount.to_string())
823            .args(["--from", &from.to_string()])
824            .args(["--to", &to.to_string()])
825            .spawn_and_wait_for_stdout()
826            .await?;
827        Ok(())
828    }
829
830    fn benchmark_command_internal(command: &mut Command, args: &BenchmarkCommand) -> Result<()> {
831        let mut formatted_args = to_args(&args)?;
832        let subcommand = formatted_args.remove(0);
833        // The subcommand is followed by the flattened options, which are preceded by "options".
834        // So remove that as well.
835        formatted_args.remove(0);
836        let options = formatted_args
837            .chunks_exact(2)
838            .flat_map(|pair| {
839                let option = format!("--{}", pair[0]);
840                match pair[1].as_str() {
841                    "true" => vec![option],
842                    "false" => vec![],
843                    _ => vec![option, pair[1].clone()],
844                }
845            })
846            .collect::<Vec<_>>();
847        command
848            .args([
849                "--max-pending-message-bundles",
850                &args.transactions_per_block().to_string(),
851            ])
852            .arg("benchmark")
853            .arg(subcommand)
854            .args(options);
855        Ok(())
856    }
857
858    async fn benchmark_command_with_envs(
859        &self,
860        args: BenchmarkCommand,
861        envs: &[(&str, &str)],
862    ) -> Result<Command> {
863        let mut command = self
864            .command_with_envs_and_arguments(envs, self.required_command_arguments())
865            .await?;
866        Self::benchmark_command_internal(&mut command, &args)?;
867        Ok(command)
868    }
869
870    async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
871        let mut command = self
872            .command_with_arguments(self.required_command_arguments())
873            .await?;
874        Self::benchmark_command_internal(&mut command, &args)?;
875        Ok(command)
876    }
877
878    /// Runs `linera benchmark`.
879    pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
880        let mut command = self.benchmark_command(args).await?;
881        command.spawn_and_wait_for_stdout().await?;
882        Ok(())
883    }
884
885    /// Runs `linera benchmark`, but detached: don't wait for the command to finish, just spawn it
886    /// and return the child process, and the handles to the stdout and stderr.
887    pub async fn benchmark_detached(
888        &self,
889        args: BenchmarkCommand,
890        tx: oneshot::Sender<()>,
891    ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
892        let mut child = self
893            .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
894            .await?
895            .kill_on_drop(true)
896            .stdin(Stdio::piped())
897            .stdout(Stdio::piped())
898            .stderr(Stdio::piped())
899            .spawn()?;
900
901        let pid = child.id().expect("failed to get pid");
902        let stdout = child.stdout.take().expect("stdout not open");
903        let stdout_handle = tokio::spawn(async move {
904            let mut lines = BufReader::new(stdout).lines();
905            while let Ok(Some(line)) = lines.next_line().await {
906                println!("benchmark{{pid={pid}}} {line}");
907            }
908        });
909
910        let stderr = child.stderr.take().expect("stderr not open");
911        let stderr_handle = tokio::spawn(async move {
912            let mut lines = BufReader::new(stderr).lines();
913            let mut tx = Some(tx);
914            while let Ok(Some(line)) = lines.next_line().await {
915                if line.contains("Ready to start benchmark") {
916                    tx.take()
917                        .expect("Should only send signal once")
918                        .send(())
919                        .expect("failed to send ready signal to main thread");
920                } else {
921                    println!("benchmark{{pid={pid}}} {line}");
922                }
923            }
924        });
925        Ok((child, stdout_handle, stderr_handle))
926    }
927
928    async fn open_chain_internal(
929        &self,
930        from: ChainId,
931        owner: Option<AccountOwner>,
932        initial_balance: Amount,
933        super_owner: bool,
934    ) -> Result<(ChainId, AccountOwner)> {
935        let mut command = self.command().await?;
936        command
937            .arg("open-chain")
938            .args(["--from", &from.to_string()])
939            .args(["--initial-balance", &initial_balance.to_string()]);
940
941        if let Some(owner) = owner {
942            command.args(["--owner", &owner.to_string()]);
943        }
944
945        if super_owner {
946            command.arg("--super-owner");
947        }
948
949        let stdout = command.spawn_and_wait_for_stdout().await?;
950        let mut split = stdout.split('\n');
951        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
952        let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
953        if let Some(owner) = owner {
954            assert_eq!(owner, new_owner);
955        }
956        Ok((chain_id, new_owner))
957    }
958
959    /// Runs `linera open-chain --super-owner`.
960    pub async fn open_chain_super_owner(
961        &self,
962        from: ChainId,
963        owner: Option<AccountOwner>,
964        initial_balance: Amount,
965    ) -> Result<(ChainId, AccountOwner)> {
966        self.open_chain_internal(from, owner, initial_balance, true)
967            .await
968    }
969
970    /// Runs `linera open-chain`.
971    pub async fn open_chain(
972        &self,
973        from: ChainId,
974        owner: Option<AccountOwner>,
975        initial_balance: Amount,
976    ) -> Result<(ChainId, AccountOwner)> {
977        self.open_chain_internal(from, owner, initial_balance, false)
978            .await
979    }
980
981    /// Runs `linera open-chain` then `linera assign`.
982    pub async fn open_and_assign(
983        &self,
984        client: &ClientWrapper,
985        initial_balance: Amount,
986    ) -> Result<ChainId> {
987        let our_chain = self
988            .load_wallet()?
989            .default_chain()
990            .context("no default chain found")?;
991        let owner = client.keygen().await?;
992        let (new_chain, _) = self
993            .open_chain(our_chain, Some(owner), initial_balance)
994            .await?;
995        client.assign(owner, new_chain).await?;
996        Ok(new_chain)
997    }
998
999    pub async fn open_multi_owner_chain(
1000        &self,
1001        from: ChainId,
1002        owners: BTreeMap<AccountOwner, u64>,
1003        multi_leader_rounds: u32,
1004        balance: Amount,
1005        base_timeout_ms: u64,
1006    ) -> Result<ChainId> {
1007        let mut command = self.command().await?;
1008        command
1009            .arg("open-multi-owner-chain")
1010            .args(["--from", &from.to_string()])
1011            .arg("--owners")
1012            .arg(serde_json::to_string(&owners)?)
1013            .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
1014        command
1015            .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
1016            .args(["--initial-balance", &balance.to_string()]);
1017
1018        let stdout = command.spawn_and_wait_for_stdout().await?;
1019        let mut split = stdout.split('\n');
1020        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
1021
1022        Ok(chain_id)
1023    }
1024
1025    pub async fn change_ownership(
1026        &self,
1027        chain_id: ChainId,
1028        super_owners: Vec<AccountOwner>,
1029        owners: Vec<AccountOwner>,
1030    ) -> Result<()> {
1031        let mut command = self.command().await?;
1032        command
1033            .arg("change-ownership")
1034            .args(["--chain-id", &chain_id.to_string()]);
1035        command
1036            .arg("--super-owners")
1037            .arg(serde_json::to_string(&super_owners)?);
1038        command.arg("--owners").arg(serde_json::to_string(
1039            &owners
1040                .into_iter()
1041                .zip(std::iter::repeat(100u64))
1042                .collect::<BTreeMap<_, _>>(),
1043        )?);
1044        command.spawn_and_wait_for_stdout().await?;
1045        Ok(())
1046    }
1047
1048    pub async fn change_application_permissions(
1049        &self,
1050        chain_id: ChainId,
1051        application_permissions: ApplicationPermissions,
1052    ) -> Result<()> {
1053        let mut command = self.command().await?;
1054        command
1055            .arg("change-application-permissions")
1056            .args(["--chain-id", &chain_id.to_string()]);
1057        command.arg("--manage-chain").arg(serde_json::to_string(
1058            &application_permissions.manage_chain,
1059        )?);
1060        // TODO: add other fields
1061        command.spawn_and_wait_for_stdout().await?;
1062        Ok(())
1063    }
1064
1065    /// Runs `linera wallet follow-chain CHAIN_ID`.
1066    pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
1067        let mut command = self.command().await?;
1068        command
1069            .args(["wallet", "follow-chain"])
1070            .arg(chain_id.to_string());
1071        if sync {
1072            command.arg("--sync");
1073        }
1074        command.spawn_and_wait_for_stdout().await?;
1075        Ok(())
1076    }
1077
1078    /// Runs `linera wallet forget-chain CHAIN_ID`.
1079    pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
1080        let mut command = self.command().await?;
1081        command
1082            .args(["wallet", "forget-chain"])
1083            .arg(chain_id.to_string());
1084        command.spawn_and_wait_for_stdout().await?;
1085        Ok(())
1086    }
1087
1088    /// Runs `linera wallet set-default CHAIN_ID`.
1089    pub async fn set_default_chain(&self, chain_id: ChainId) -> Result<()> {
1090        let mut command = self.command().await?;
1091        command
1092            .args(["wallet", "set-default"])
1093            .arg(chain_id.to_string());
1094        command.spawn_and_wait_for_stdout().await?;
1095        Ok(())
1096    }
1097
1098    pub async fn retry_pending_block(
1099        &self,
1100        chain_id: Option<ChainId>,
1101    ) -> Result<Option<CryptoHash>> {
1102        let mut command = self.command().await?;
1103        command.arg("retry-pending-block");
1104        if let Some(chain_id) = chain_id {
1105            command.arg(chain_id.to_string());
1106        }
1107        let stdout = command.spawn_and_wait_for_stdout().await?;
1108        let stdout = stdout.trim();
1109        if stdout.is_empty() {
1110            Ok(None)
1111        } else {
1112            Ok(Some(CryptoHash::from_str(stdout)?))
1113        }
1114    }
1115
1116    /// Runs `linera publish-data-blob`.
1117    pub async fn publish_data_blob(
1118        &self,
1119        path: &Path,
1120        chain_id: Option<ChainId>,
1121    ) -> Result<CryptoHash> {
1122        let mut command = self.command().await?;
1123        command.arg("publish-data-blob").arg(path);
1124        if let Some(chain_id) = chain_id {
1125            command.arg(chain_id.to_string());
1126        }
1127        let stdout = command.spawn_and_wait_for_stdout().await?;
1128        let stdout = stdout.trim();
1129        Ok(CryptoHash::from_str(stdout)?)
1130    }
1131
1132    /// Runs `linera read-data-blob`.
1133    pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
1134        let mut command = self.command().await?;
1135        command.arg("read-data-blob").arg(hash.to_string());
1136        if let Some(chain_id) = chain_id {
1137            command.arg(chain_id.to_string());
1138        }
1139        command.spawn_and_wait_for_stdout().await?;
1140        Ok(())
1141    }
1142
1143    pub fn load_wallet(&self) -> Result<Wallet> {
1144        Ok(Wallet::read(&self.wallet_path())?)
1145    }
1146
1147    pub fn load_keystore(&self) -> Result<InMemorySigner> {
1148        util::read_json(self.keystore_path())
1149    }
1150
1151    pub fn wallet_path(&self) -> PathBuf {
1152        self.path_provider.path().join(&self.wallet)
1153    }
1154
1155    pub fn keystore_path(&self) -> PathBuf {
1156        self.path_provider.path().join(&self.keystore)
1157    }
1158
1159    pub fn storage_path(&self) -> &str {
1160        &self.storage
1161    }
1162
1163    pub fn get_owner(&self) -> Option<AccountOwner> {
1164        let wallet = self.load_wallet().ok()?;
1165        wallet
1166            .get(wallet.default_chain()?)
1167            .expect("default chain must be in wallet")
1168            .owner
1169    }
1170
1171    pub fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
1172        self.load_wallet()
1173            .ok()
1174            .is_some_and(|wallet| wallet.get(chain).is_some())
1175    }
1176
1177    pub async fn set_validator(
1178        &self,
1179        validator_key: &(String, String),
1180        port: usize,
1181        votes: usize,
1182    ) -> Result<()> {
1183        let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1184        self.command()
1185            .await?
1186            .arg("validator")
1187            .arg("add")
1188            .args(["--public-key", &validator_key.0])
1189            .args(["--account-key", &validator_key.1])
1190            .args(["--address", &address])
1191            .args(["--votes", &votes.to_string()])
1192            .spawn_and_wait_for_stdout()
1193            .await?;
1194        Ok(())
1195    }
1196
1197    pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
1198        self.command()
1199            .await?
1200            .arg("validator")
1201            .arg("remove")
1202            .args(["--public-key", validator_key])
1203            .spawn_and_wait_for_stdout()
1204            .await?;
1205        Ok(())
1206    }
1207
1208    pub async fn change_validators(
1209        &self,
1210        add_validators: &[(String, String, usize, usize)], // (public_key, account_key, port, votes)
1211        modify_validators: &[(String, String, usize, usize)], // (public_key, account_key, port, votes)
1212        remove_validators: &[String],
1213    ) -> Result<()> {
1214        use std::str::FromStr;
1215
1216        use linera_base::crypto::{AccountPublicKey, ValidatorPublicKey};
1217
1218        // Build a map that will be serialized to JSON
1219        // Use the exact types that deserialization expects
1220        let mut changes = std::collections::HashMap::new();
1221
1222        // Add/modify validators
1223        for (public_key_str, account_key_str, port, votes) in
1224            add_validators.iter().chain(modify_validators.iter())
1225        {
1226            let public_key = ValidatorPublicKey::from_str(public_key_str)
1227                .with_context(|| format!("Invalid validator public key: {public_key_str}"))?;
1228
1229            let account_key = AccountPublicKey::from_str(account_key_str)
1230                .with_context(|| format!("Invalid account public key: {account_key_str}"))?;
1231
1232            let address = format!("{}:127.0.0.1:{}", self.network.short(), port)
1233                .parse()
1234                .unwrap();
1235
1236            // Create ValidatorChange struct
1237            let change = crate::cli::validator::Change {
1238                account_key,
1239                address,
1240                votes: crate::cli::validator::Votes(
1241                    std::num::NonZero::new(*votes as u64).context("Votes must be non-zero")?,
1242                ),
1243            };
1244
1245            changes.insert(public_key, Some(change));
1246        }
1247
1248        // Remove validators (set to None)
1249        for validator_key_str in remove_validators {
1250            let public_key = ValidatorPublicKey::from_str(validator_key_str)
1251                .with_context(|| format!("Invalid validator public key: {validator_key_str}"))?;
1252            changes.insert(public_key, None);
1253        }
1254
1255        // Create temporary file with JSON
1256        let temp_file = tempfile::NamedTempFile::new()
1257            .context("Failed to create temporary file for validator changes")?;
1258        serde_json::to_writer(&temp_file, &changes)
1259            .context("Failed to write validator changes to file")?;
1260        let temp_path = temp_file.path();
1261
1262        self.command()
1263            .await?
1264            .arg("validator")
1265            .arg("update")
1266            .arg(temp_path)
1267            .arg("--yes") // Skip confirmation prompt
1268            .spawn_and_wait_for_stdout()
1269            .await?;
1270
1271        Ok(())
1272    }
1273
1274    pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
1275        self.command()
1276            .await?
1277            .arg("revoke-epochs")
1278            .arg(epoch.to_string())
1279            .spawn_and_wait_for_stdout()
1280            .await?;
1281        Ok(())
1282    }
1283
1284    pub async fn set_resource_control_policy(
1285        &self,
1286        overrides: ResourceControlPolicyOverrides,
1287    ) -> Result<()> {
1288        let mut command = self.command().await?;
1289        command.arg("resource-control-policy");
1290        let ResourceControlPolicyOverrides {
1291            wasm_fuel_unit,
1292            evm_fuel_unit,
1293            read_operation,
1294            write_operation,
1295            byte_runtime,
1296            byte_read,
1297            byte_written,
1298            blob_read,
1299            blob_published,
1300            blob_byte_read,
1301            blob_byte_published,
1302            operation,
1303            operation_byte,
1304            message,
1305            message_byte,
1306            service_as_oracle_query,
1307            http_request,
1308            maximum_wasm_fuel_per_block,
1309            maximum_evm_fuel_per_block,
1310            maximum_service_oracle_execution_ms,
1311            maximum_block_size,
1312            maximum_blob_size,
1313            maximum_published_blobs,
1314            maximum_bytecode_size,
1315            maximum_block_proposal_size,
1316            maximum_bytes_read_per_block,
1317            maximum_bytes_written_per_block,
1318            maximum_oracle_response_bytes,
1319            maximum_http_response_bytes,
1320            http_request_timeout_ms,
1321            http_request_allow_list,
1322            free_application_ids,
1323            flags,
1324        } = overrides;
1325        if let Some(value) = wasm_fuel_unit {
1326            command.args(["--wasm-fuel-unit", &value.to_string()]);
1327        }
1328        if let Some(value) = evm_fuel_unit {
1329            command.args(["--evm-fuel-unit", &value.to_string()]);
1330        }
1331        if let Some(value) = read_operation {
1332            command.args(["--read-operation", &value.to_string()]);
1333        }
1334        if let Some(value) = write_operation {
1335            command.args(["--write-operation", &value.to_string()]);
1336        }
1337        if let Some(value) = byte_runtime {
1338            command.args(["--byte-runtime", &value.to_string()]);
1339        }
1340        if let Some(value) = byte_read {
1341            command.args(["--byte-read", &value.to_string()]);
1342        }
1343        if let Some(value) = byte_written {
1344            command.args(["--byte-written", &value.to_string()]);
1345        }
1346        if let Some(value) = blob_read {
1347            command.args(["--blob-read", &value.to_string()]);
1348        }
1349        if let Some(value) = blob_published {
1350            command.args(["--blob-published", &value.to_string()]);
1351        }
1352        if let Some(value) = blob_byte_read {
1353            command.args(["--blob-byte-read", &value.to_string()]);
1354        }
1355        if let Some(value) = blob_byte_published {
1356            command.args(["--blob-byte-published", &value.to_string()]);
1357        }
1358        if let Some(value) = operation {
1359            command.args(["--operation", &value.to_string()]);
1360        }
1361        if let Some(value) = operation_byte {
1362            command.args(["--operation-byte", &value.to_string()]);
1363        }
1364        if let Some(value) = message {
1365            command.args(["--message", &value.to_string()]);
1366        }
1367        if let Some(value) = message_byte {
1368            command.args(["--message-byte", &value.to_string()]);
1369        }
1370        if let Some(value) = service_as_oracle_query {
1371            command.args(["--service-as-oracle-query", &value.to_string()]);
1372        }
1373        if let Some(value) = http_request {
1374            command.args(["--http-request", &value.to_string()]);
1375        }
1376        if let Some(value) = maximum_wasm_fuel_per_block {
1377            command.args(["--maximum-wasm-fuel-per-block", &value.to_string()]);
1378        }
1379        if let Some(value) = maximum_evm_fuel_per_block {
1380            command.args(["--maximum-evm-fuel-per-block", &value.to_string()]);
1381        }
1382        if let Some(value) = maximum_service_oracle_execution_ms {
1383            command.args(["--maximum-service-oracle-execution-ms", &value.to_string()]);
1384        }
1385        if let Some(value) = maximum_block_size {
1386            command.args(["--maximum-block-size", &value.to_string()]);
1387        }
1388        if let Some(value) = maximum_blob_size {
1389            command.args(["--maximum-blob-size", &value.to_string()]);
1390        }
1391        if let Some(value) = maximum_published_blobs {
1392            command.args(["--maximum-published-blobs", &value.to_string()]);
1393        }
1394        if let Some(value) = maximum_bytecode_size {
1395            command.args(["--maximum-bytecode-size", &value.to_string()]);
1396        }
1397        if let Some(value) = maximum_block_proposal_size {
1398            command.args(["--maximum-block-proposal-size", &value.to_string()]);
1399        }
1400        if let Some(value) = maximum_bytes_read_per_block {
1401            command.args(["--maximum-bytes-read-per-block", &value.to_string()]);
1402        }
1403        if let Some(value) = maximum_bytes_written_per_block {
1404            command.args(["--maximum-bytes-written-per-block", &value.to_string()]);
1405        }
1406        if let Some(value) = maximum_oracle_response_bytes {
1407            command.args(["--maximum-oracle-response-bytes", &value.to_string()]);
1408        }
1409        if let Some(value) = maximum_http_response_bytes {
1410            command.args(["--maximum-http-response-bytes", &value.to_string()]);
1411        }
1412        if let Some(value) = http_request_timeout_ms {
1413            command.args(["--http-request-timeout-ms", &value.to_string()]);
1414        }
1415        if let Some(values) = http_request_allow_list {
1416            command.args(["--http-request-allow-list", &values.join(",")]);
1417        }
1418        if let Some(values) = free_application_ids {
1419            command.args(["--free-application-ids", &values.join(",")]);
1420        }
1421        if let Some(values) = flags {
1422            command.args(["--flags", &values.join(",")]);
1423        }
1424        command.spawn_and_wait_for_stdout().await?;
1425        Ok(())
1426    }
1427
1428    /// Runs `linera keygen`.
1429    pub async fn keygen(&self) -> Result<AccountOwner> {
1430        let stdout = self
1431            .command()
1432            .await?
1433            .arg("keygen")
1434            .spawn_and_wait_for_stdout()
1435            .await?;
1436        AccountOwner::from_str(stdout.as_str().trim())
1437    }
1438
1439    /// Returns the default chain.
1440    pub fn default_chain(&self) -> Option<ChainId> {
1441        self.load_wallet().ok()?.default_chain()
1442    }
1443
1444    /// Runs `linera assign`.
1445    pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
1446        let _stdout = self
1447            .command()
1448            .await?
1449            .arg("assign")
1450            .args(["--owner", &owner.to_string()])
1451            .args(["--chain-id", &chain_id.to_string()])
1452            .spawn_and_wait_for_stdout()
1453            .await?;
1454        Ok(())
1455    }
1456
1457    /// Runs `linera set-preferred-owner` for `chain_id`.
1458    pub async fn set_preferred_owner(
1459        &self,
1460        chain_id: ChainId,
1461        owner: Option<AccountOwner>,
1462    ) -> Result<()> {
1463        let mut owner_arg = vec!["--owner".to_string()];
1464        if let Some(owner) = owner {
1465            owner_arg.push(owner.to_string());
1466        };
1467        self.command()
1468            .await?
1469            .arg("set-preferred-owner")
1470            .args(["--chain-id", &chain_id.to_string()])
1471            .args(owner_arg)
1472            .spawn_and_wait_for_stdout()
1473            .await?;
1474        Ok(())
1475    }
1476
1477    pub async fn build_application(
1478        &self,
1479        path: &Path,
1480        name: &str,
1481        is_workspace: bool,
1482    ) -> Result<(PathBuf, PathBuf)> {
1483        Command::new("cargo")
1484            .current_dir(self.path_provider.path())
1485            .arg("build")
1486            .arg("--release")
1487            .args(["--target", "wasm32-unknown-unknown"])
1488            .arg("--manifest-path")
1489            .arg(path.join("Cargo.toml"))
1490            .spawn_and_wait_for_stdout()
1491            .await?;
1492
1493        let release_dir = match is_workspace {
1494            true => path.join("../target/wasm32-unknown-unknown/release"),
1495            false => path.join("target/wasm32-unknown-unknown/release"),
1496        };
1497
1498        let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1499        let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1500
1501        let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1502        let service_size = fs_err::tokio::metadata(&service).await?.len();
1503        tracing::info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1504
1505        Ok((contract, service))
1506    }
1507}
1508
1509impl Drop for ClientWrapper {
1510    fn drop(&mut self) {
1511        use std::process::Command as SyncCommand;
1512
1513        if self.on_drop != OnClientDrop::CloseChains {
1514            return;
1515        }
1516
1517        let Ok(binary_path) = self.binary_path.lock() else {
1518            tracing::error!(
1519                "Failed to close chains because a thread panicked with a lock to `binary_path`"
1520            );
1521            return;
1522        };
1523
1524        let Some(binary_path) = binary_path.as_ref() else {
1525            tracing::warn!(
1526                "Assuming no chains need to be closed, because the command binary was never \
1527                resolved and therefore presumably never called"
1528            );
1529            return;
1530        };
1531
1532        let working_directory = self.path_provider.path();
1533        let mut wallet_show_command = SyncCommand::new(binary_path);
1534
1535        for argument in self.command_arguments() {
1536            wallet_show_command.arg(&*argument);
1537        }
1538
1539        let Ok(wallet_show_output) = wallet_show_command
1540            .current_dir(working_directory)
1541            .args(["wallet", "show", "--short", "--owned"])
1542            .output()
1543        else {
1544            tracing::warn!("Failed to execute `wallet show --short` to list chains to close");
1545            return;
1546        };
1547
1548        if !wallet_show_output.status.success() {
1549            tracing::warn!("Failed to list chains in the wallet to close them");
1550            return;
1551        }
1552
1553        let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1554            tracing::warn!(
1555                "Failed to close chains because `linera wallet show --short` \
1556                returned a non-UTF-8 output"
1557            );
1558            return;
1559        };
1560
1561        let chain_ids = chain_list_string
1562            .split('\n')
1563            .map(|line| line.trim())
1564            .filter(|line| !line.is_empty());
1565
1566        for chain_id in chain_ids {
1567            let mut close_chain_command = SyncCommand::new(binary_path);
1568
1569            for argument in self.command_arguments() {
1570                close_chain_command.arg(&*argument);
1571            }
1572
1573            close_chain_command.current_dir(working_directory);
1574
1575            match close_chain_command.args(["close-chain", chain_id]).status() {
1576                Ok(status) if status.success() => (),
1577                Ok(failure) => tracing::warn!("Failed to close chain {chain_id}: {failure}"),
1578                Err(error) => tracing::warn!("Failed to close chain {chain_id}: {error}"),
1579            }
1580        }
1581    }
1582}
1583
1584#[cfg(with_testing)]
1585impl ClientWrapper {
1586    pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1587        self.build_application(Self::example_path(name)?.as_path(), name, true)
1588            .await
1589    }
1590
1591    pub async fn build_test_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1592        self.build_application(Self::test_example_path(name)?.as_path(), name, true)
1593            .await
1594    }
1595
1596    pub fn example_path(name: &str) -> Result<PathBuf> {
1597        Ok(env::current_dir()?.join("../examples/").join(name))
1598    }
1599
1600    pub fn test_example_path(name: &str) -> Result<PathBuf> {
1601        Ok(env::current_dir()?
1602            .join("../linera-sdk/tests/fixtures/")
1603            .join(name))
1604    }
1605}
1606
1607fn truncate_query_output(input: &str) -> String {
1608    let max_len = 1000;
1609    if input.len() < max_len {
1610        input.to_string()
1611    } else {
1612        format!("{} ...", input.get(..max_len).unwrap())
1613    }
1614}
1615
1616fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1617    let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1618    let max_len = 200;
1619    if query.len() < max_len {
1620        query
1621    } else {
1622        format!("{} ...", query.get(..max_len).unwrap())
1623    }
1624}
1625
1626/// A running node service.
1627/// Logs a warning if a child process has already exited when its wrapper is dropped.
1628/// On Unix, includes the signal number if the process was killed (e.g. signal 9 = OOM).
1629fn log_unexpected_exit(child: &mut Child, service_kind: &str, port: u16) {
1630    match child.try_wait() {
1631        Ok(Some(status)) => {
1632            #[cfg(unix)]
1633            {
1634                use std::os::unix::process::ExitStatusExt as _;
1635                if let Some(signal) = status.signal() {
1636                    tracing::error!(
1637                        port,
1638                        signal,
1639                        "The {service_kind} service was killed by signal {signal}",
1640                    );
1641                    return;
1642                }
1643            }
1644            if !status.success() {
1645                tracing::error!(
1646                    port,
1647                    %status,
1648                    "The {service_kind} service exited unexpectedly with {status}",
1649                );
1650            }
1651        }
1652        Ok(None) => {} // Still running — normal case when terminate() was called.
1653        Err(error) => {
1654            tracing::warn!(
1655                port,
1656                %error,
1657                "Failed to check {service_kind} service status",
1658            );
1659        }
1660    }
1661}
1662
1663pub struct NodeService {
1664    port: u16,
1665    child: Child,
1666    terminated: bool,
1667}
1668
1669impl Drop for NodeService {
1670    fn drop(&mut self) {
1671        if !self.terminated {
1672            log_unexpected_exit(&mut self.child, "node", self.port);
1673        }
1674    }
1675}
1676
1677impl NodeService {
1678    fn new(port: u16, child: Child) -> Self {
1679        Self {
1680            port,
1681            child,
1682            terminated: false,
1683        }
1684    }
1685
1686    pub async fn terminate(mut self) -> Result<()> {
1687        self.terminated = true;
1688        self.child.kill().await.context("terminating node service")
1689    }
1690
1691    pub fn port(&self) -> u16 {
1692        self.port
1693    }
1694
1695    pub fn ensure_is_running(&mut self) -> Result<()> {
1696        self.child.ensure_is_running()
1697    }
1698
1699    pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1700        let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1701        let mut data = self.query_node(query).await?;
1702        Ok(serde_json::from_value(data["processInbox"].take())?)
1703    }
1704
1705    pub async fn sync(&self, chain_id: &ChainId) -> Result<u64> {
1706        let query = format!("mutation {{ sync(chainId: \"{chain_id}\") }}");
1707        let mut data = self.query_node(query).await?;
1708        Ok(serde_json::from_value(data["sync"].take())?)
1709    }
1710
1711    pub async fn transfer(
1712        &self,
1713        chain_id: ChainId,
1714        owner: AccountOwner,
1715        recipient: Account,
1716        amount: Amount,
1717    ) -> Result<CryptoHash> {
1718        let json_owner = owner.to_value();
1719        let json_recipient = recipient.to_value();
1720        let query = format!(
1721            "mutation {{ transfer(\
1722                 chainId: \"{chain_id}\", \
1723                 owner: {json_owner}, \
1724                 recipient: {json_recipient}, \
1725                 amount: \"{amount}\") \
1726             }}"
1727        );
1728        let data = self.query_node(query).await?;
1729        serde_json::from_value(data["transfer"].clone())
1730            .context("missing transfer field in response")
1731    }
1732
1733    pub async fn balance(&self, account: &Account) -> Result<Amount> {
1734        let chain = account.chain_id;
1735        let owner = account.owner;
1736        if matches!(owner, AccountOwner::CHAIN) {
1737            let query = format!(
1738                "query {{ chain(chainId:\"{chain}\") {{
1739                    executionState {{ system {{ balance }} }}
1740                }} }}"
1741            );
1742            let response = self.query_node(query).await?;
1743            let balance = &response["chain"]["executionState"]["system"]["balance"]
1744                .as_str()
1745                .unwrap();
1746            return Ok(Amount::from_str(balance)?);
1747        }
1748        let query = format!(
1749            "query {{ chain(chainId:\"{chain}\") {{
1750                executionState {{ system {{ balances {{
1751                    entry(key:\"{owner}\") {{ value }}
1752                }} }} }}
1753            }} }}"
1754        );
1755        let response = self.query_node(query).await?;
1756        let balances = &response["chain"]["executionState"]["system"]["balances"];
1757        let balance = balances["entry"]["value"].as_str();
1758        match balance {
1759            None => Ok(Amount::ZERO),
1760            Some(amount) => Ok(Amount::from_str(amount)?),
1761        }
1762    }
1763
1764    pub fn make_application<A: ContractAbi>(
1765        &self,
1766        chain_id: &ChainId,
1767        application_id: &ApplicationId<A>,
1768    ) -> Result<ApplicationWrapper<A>> {
1769        let application_id = application_id.forget_abi().to_string();
1770        let link = format!(
1771            "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1772            self.port
1773        );
1774        Ok(ApplicationWrapper::from(link))
1775    }
1776
1777    pub async fn publish_data_blob(
1778        &self,
1779        chain_id: &ChainId,
1780        bytes: Vec<u8>,
1781    ) -> Result<CryptoHash> {
1782        let query = format!(
1783            "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1784            chain_id.to_value(),
1785            bytes.to_value(),
1786        );
1787        let data = self.query_node(query).await?;
1788        serde_json::from_value(data["publishDataBlob"].clone())
1789            .context("missing publishDataBlob field in response")
1790    }
1791
1792    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1793        &self,
1794        chain_id: &ChainId,
1795        contract: PathBuf,
1796        service: PathBuf,
1797        vm_runtime: VmRuntime,
1798    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1799        let contract_code = Bytecode::load_from_file(&contract).await?;
1800        let service_code = Bytecode::load_from_file(&service).await?;
1801        let query = format!(
1802            "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1803            chain_id.to_value(),
1804            contract_code.to_value(),
1805            service_code.to_value(),
1806            vm_runtime.to_value(),
1807        );
1808        let data = self.query_node(query).await?;
1809        let module_str = data["publishModule"]
1810            .as_str()
1811            .context("module ID not found")?;
1812        let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1813        Ok(module_id.with_abi())
1814    }
1815
1816    pub async fn query_committee_hash(&self, chain_id: &ChainId) -> Result<Option<CryptoHash>> {
1817        let query = format!(
1818            "query {{ chain(chainId:\"{chain_id}\") {{
1819                executionState {{ system {{ committeeHash }} }}
1820            }} }}"
1821        );
1822        let mut response = self.query_node(query).await?;
1823        let hash = response["chain"]["executionState"]["system"]["committeeHash"].take();
1824        Ok(serde_json::from_value(hash)?)
1825    }
1826
1827    pub async fn query_chain_epoch(&self, chain_id: &ChainId) -> Result<Epoch> {
1828        let query = format!(
1829            "query {{ chain(chainId:\"{chain_id}\") {{
1830                executionState {{ system {{ epoch }} }}
1831            }} }}"
1832        );
1833        let mut response = self.query_node(query).await?;
1834        let epoch = response["chain"]["executionState"]["system"]["epoch"].take();
1835        Ok(serde_json::from_value(epoch)?)
1836    }
1837
1838    pub async fn events_from_index(
1839        &self,
1840        chain_id: &ChainId,
1841        stream_id: &StreamId,
1842        start_index: u32,
1843    ) -> Result<Vec<IndexAndEvent>> {
1844        let query = format!(
1845            "query {{
1846               eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1847               {{ index event }}
1848             }}",
1849            stream_id.to_value()
1850        );
1851        let mut response = self.query_node(query).await?;
1852        let response = response["eventsFromIndex"].take();
1853        Ok(serde_json::from_value(response)?)
1854    }
1855
1856    pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1857        let n_try = 5;
1858        let query = query.as_ref();
1859        for i in 0..n_try {
1860            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1861            let url = format!("http://localhost:{}/", self.port);
1862            let client = reqwest_client();
1863            let result = client
1864                .post(url)
1865                .json(&json!({ "query": query }))
1866                .send()
1867                .await;
1868            if matches!(result, Err(ref error) if error.is_timeout()) {
1869                tracing::warn!(
1870                    "Timeout when sending query {} to the node service",
1871                    truncate_query_output(query)
1872                );
1873                continue;
1874            }
1875            let response = result.with_context(|| {
1876                format!(
1877                    "query_node: failed to post query={}",
1878                    truncate_query_output(query)
1879                )
1880            })?;
1881            ensure!(
1882                response.status().is_success(),
1883                "Query \"{}\" failed: {}",
1884                truncate_query_output(query),
1885                response
1886                    .text()
1887                    .await
1888                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1889            );
1890            let value: Value = response.json().await.context("invalid JSON")?;
1891            if let Some(errors) = value.get("errors") {
1892                tracing::warn!(
1893                    "Query \"{}\" failed: {}",
1894                    truncate_query_output(query),
1895                    errors
1896                );
1897            } else {
1898                return Ok(value["data"].clone());
1899            }
1900        }
1901        bail!(
1902            "Query \"{}\" failed after {} retries.",
1903            truncate_query_output(query),
1904            n_try
1905        );
1906    }
1907
1908    pub async fn create_application<
1909        Abi: ContractAbi,
1910        Parameters: Serialize,
1911        InstantiationArgument: Serialize,
1912    >(
1913        &self,
1914        chain_id: &ChainId,
1915        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1916        parameters: &Parameters,
1917        argument: &InstantiationArgument,
1918        required_application_ids: &[ApplicationId],
1919    ) -> Result<ApplicationId<Abi>> {
1920        let module_id = module_id.forget_abi();
1921        let json_required_applications_ids = required_application_ids
1922            .iter()
1923            .map(ApplicationId::to_string)
1924            .collect::<Vec<_>>()
1925            .to_value();
1926        // Convert to `serde_json::Value` then `async_graphql::Value` via the trait `InputType`.
1927        let new_parameters = serde_json::to_value(parameters)
1928            .context("could not create parameters JSON")?
1929            .to_value();
1930        let new_argument = serde_json::to_value(argument)
1931            .context("could not create argument JSON")?
1932            .to_value();
1933        let query = format!(
1934            "mutation {{ createApplication(\
1935                 chainId: \"{chain_id}\",
1936                 moduleId: \"{module_id}\", \
1937                 parameters: {new_parameters}, \
1938                 instantiationArgument: {new_argument}, \
1939                 requiredApplicationIds: {json_required_applications_ids}) \
1940             }}"
1941        );
1942        let data = self.query_node(query).await?;
1943        let app_id_str = data["createApplication"]
1944            .as_str()
1945            .context("missing createApplication string in response")?
1946            .trim();
1947        Ok(app_id_str
1948            .parse::<ApplicationId>()
1949            .context("invalid application ID")?
1950            .with_abi())
1951    }
1952
1953    /// Obtains the hash and height of the `chain`'s tip block, as known by this node service.
1954    pub async fn chain_tip(&self, chain: ChainId) -> Result<Option<(CryptoHash, BlockHeight)>> {
1955        let query = format!(
1956            r#"query {{ block(chainId: "{chain}") {{
1957                hash
1958                block {{ header {{ height }} }}
1959            }} }}"#
1960        );
1961
1962        let mut response = self.query_node(&query).await?;
1963
1964        match (
1965            mem::take(&mut response["block"]["hash"]),
1966            mem::take(&mut response["block"]["block"]["header"]["height"]),
1967        ) {
1968            (Value::Null, Value::Null) => Ok(None),
1969            (Value::String(hash), Value::Number(height)) => Ok(Some((
1970                hash.parse()
1971                    .context("Received an invalid hash {hash:?} for chain tip")?,
1972                BlockHeight(height.as_u64().unwrap()),
1973            ))),
1974            invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
1975        }
1976    }
1977
1978    /// Subscribes to the node service and returns a stream of notifications about a chain.
1979    pub async fn notifications(
1980        &self,
1981        chain_id: ChainId,
1982    ) -> Result<Pin<Box<impl Stream<Item = Result<Notification>>>>> {
1983        let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
1984        let url = format!("ws://localhost:{}/ws", self.port);
1985        let mut request = url.into_client_request()?;
1986        request.headers_mut().insert(
1987            "Sec-WebSocket-Protocol",
1988            HeaderValue::from_str("graphql-transport-ws")?,
1989        );
1990        let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1991        let init_json = json!({
1992          "type": "connection_init",
1993          "payload": {}
1994        });
1995        websocket.send(init_json.to_string().into()).await?;
1996        let text = websocket
1997            .next()
1998            .await
1999            .context("Failed to establish connection")??
2000            .into_text()?;
2001        ensure!(
2002            text == "{\"type\":\"connection_ack\"}",
2003            "Unexpected response: {text}"
2004        );
2005        let query_json = json!({
2006          "id": "1",
2007          "type": "start",
2008          "payload": {
2009            "query": query,
2010            "variables": {},
2011            "operationName": null
2012          }
2013        });
2014        websocket.send(query_json.to_string().into()).await?;
2015        Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
2016            |message| async {
2017                let text = message.into_text()?;
2018                let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
2019                if let Some(errors) = value["payload"].get("errors") {
2020                    bail!("Notification subscription failed: {errors:?}");
2021                }
2022                serde_json::from_value(value["payload"]["data"]["notifications"].clone())
2023                    .context("Failed to deserialize notification")
2024            },
2025        )))
2026    }
2027
2028    /// Subscribes to query results via the `queryResult` GraphQL subscription.
2029    pub async fn query_result(
2030        &self,
2031        name: &str,
2032        chain_id: ChainId,
2033        application_id: &ApplicationId,
2034    ) -> Result<Pin<Box<impl Stream<Item = Result<Value>>>>> {
2035        let query = format!(
2036            r#"subscription {{ queryResult(name: "{name}", chainId: "{chain_id}", applicationId: "{application_id}") }}"#,
2037        );
2038        let url = format!("ws://localhost:{}/ws", self.port);
2039        let mut request = url.into_client_request()?;
2040        request.headers_mut().insert(
2041            "Sec-WebSocket-Protocol",
2042            HeaderValue::from_str("graphql-transport-ws")?,
2043        );
2044        let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
2045        let init_json = json!({
2046          "type": "connection_init",
2047          "payload": {}
2048        });
2049        websocket.send(init_json.to_string().into()).await?;
2050        let text = websocket
2051            .next()
2052            .await
2053            .context("Failed to establish connection")??
2054            .into_text()?;
2055        ensure!(
2056            text == "{\"type\":\"connection_ack\"}",
2057            "Unexpected response: {text}"
2058        );
2059        let query_json = json!({
2060          "id": "1",
2061          "type": "start",
2062          "payload": {
2063            "query": query,
2064            "variables": {},
2065            "operationName": null
2066          }
2067        });
2068        websocket.send(query_json.to_string().into()).await?;
2069        Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
2070            |message| async {
2071                let text = message.into_text()?;
2072                let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
2073                if let Some(errors) = value["payload"].get("errors") {
2074                    bail!("Query result subscription failed: {errors:?}");
2075                }
2076                Ok(value["payload"]["data"]["queryResult"].clone())
2077            },
2078        )))
2079    }
2080}
2081
2082/// A running faucet service.
2083pub struct FaucetService {
2084    port: u16,
2085    child: Child,
2086    _temp_dir: tempfile::TempDir,
2087    terminated: bool,
2088}
2089
2090impl Drop for FaucetService {
2091    fn drop(&mut self) {
2092        if !self.terminated {
2093            log_unexpected_exit(&mut self.child, "faucet", self.port);
2094        }
2095    }
2096}
2097
2098impl FaucetService {
2099    fn new(port: u16, child: Child, temp_dir: tempfile::TempDir) -> Self {
2100        Self {
2101            port,
2102            child,
2103            _temp_dir: temp_dir,
2104            terminated: false,
2105        }
2106    }
2107
2108    pub async fn terminate(mut self) -> Result<()> {
2109        self.terminated = true;
2110        self.child
2111            .kill()
2112            .await
2113            .context("terminating faucet service")
2114    }
2115
2116    pub fn ensure_is_running(&mut self) -> Result<()> {
2117        self.child.ensure_is_running()
2118    }
2119
2120    pub fn instance(&self) -> Faucet {
2121        Faucet::new(format!("http://localhost:{}/", self.port))
2122    }
2123}
2124
2125/// A running `Application` to be queried in GraphQL.
2126pub struct ApplicationWrapper<A> {
2127    uri: String,
2128    _phantom: PhantomData<A>,
2129}
2130
2131impl<A> ApplicationWrapper<A> {
2132    pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
2133        let query = query.as_ref();
2134        let value = self.run_json_query(json!({ "query": query })).await?;
2135        Ok(value["data"].clone())
2136    }
2137
2138    pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
2139        const MAX_RETRIES: usize = 5;
2140
2141        for i in 0.. {
2142            let client = reqwest_client();
2143            let result = client.post(&self.uri).json(&query).send().await;
2144            let response = match result {
2145                Ok(response) => response,
2146                Err(error) if i < MAX_RETRIES => {
2147                    tracing::warn!(
2148                        "Failed to post query \"{}\": {error}; retrying",
2149                        truncate_query_output_serialize(&query),
2150                    );
2151                    continue;
2152                }
2153                Err(error) => {
2154                    let query = truncate_query_output_serialize(&query);
2155                    return Err(error)
2156                        .with_context(|| format!("run_json_query: failed to post query={query}"));
2157                }
2158            };
2159            ensure!(
2160                response.status().is_success(),
2161                "Query \"{}\" failed: {}",
2162                truncate_query_output_serialize(&query),
2163                response
2164                    .text()
2165                    .await
2166                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
2167            );
2168            let value: Value = response.json().await.context("invalid JSON")?;
2169            if let Some(errors) = value.get("errors") {
2170                bail!(
2171                    "Query \"{}\" failed: {}",
2172                    truncate_query_output_serialize(&query),
2173                    errors
2174                );
2175            }
2176            return Ok(value);
2177        }
2178        unreachable!()
2179    }
2180
2181    pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
2182        let query = query.as_ref();
2183        self.run_graphql_query(&format!("query {{ {query} }}"))
2184            .await
2185    }
2186
2187    pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
2188        let query = query.as_ref().trim();
2189        let name = query
2190            .split_once(|ch: char| !ch.is_alphanumeric())
2191            .map_or(query, |(name, _)| name);
2192        let data = self.query(query).await?;
2193        serde_json::from_value(data[name].clone())
2194            .with_context(|| format!("{name} field missing in response"))
2195    }
2196
2197    pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
2198        let mutation = mutation.as_ref();
2199        self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
2200            .await
2201    }
2202
2203    pub async fn multiple_mutate(&self, mutations: &[String]) -> Result<Value> {
2204        let mut out = String::from("mutation {\n");
2205        for (index, mutation) in mutations.iter().enumerate() {
2206            out = format!("{out}  u{index}: {mutation}\n");
2207        }
2208        out.push_str("}\n");
2209        self.run_graphql_query(&out).await
2210    }
2211}
2212
2213impl<A> From<String> for ApplicationWrapper<A> {
2214    fn from(uri: String) -> ApplicationWrapper<A> {
2215        ApplicationWrapper {
2216            uri,
2217            _phantom: PhantomData,
2218        }
2219    }
2220}
2221
2222/// Returns the timeout for tests that wait for notifications, either read from the env
2223/// variable `LINERA_TEST_NOTIFICATION_TIMEOUT_MS`, or the default value of 10 seconds.
2224#[cfg(with_testing)]
2225fn notification_timeout() -> Duration {
2226    const NOTIFICATION_TIMEOUT_MS_ENV: &str = "LINERA_TEST_NOTIFICATION_TIMEOUT_MS";
2227    const NOTIFICATION_TIMEOUT_MS_DEFAULT: u64 = 10_000;
2228
2229    match env::var(NOTIFICATION_TIMEOUT_MS_ENV) {
2230        Ok(var) => Duration::from_millis(var.parse().unwrap_or_else(|error| {
2231            panic!("{NOTIFICATION_TIMEOUT_MS_ENV} is not a valid number: {error}")
2232        })),
2233        Err(env::VarError::NotPresent) => Duration::from_millis(NOTIFICATION_TIMEOUT_MS_DEFAULT),
2234        Err(env::VarError::NotUnicode(_)) => {
2235            panic!("{NOTIFICATION_TIMEOUT_MS_ENV} must be valid Unicode")
2236        }
2237    }
2238}
2239
2240#[cfg(with_testing)]
2241pub trait NotificationsExt {
2242    /// Waits for a notification for which `f` returns `Some(t)`, and returns `t`.
2243    fn wait_for<T>(
2244        &mut self,
2245        f: impl FnMut(Notification) -> Option<T>,
2246    ) -> impl Future<Output = Result<T>>;
2247
2248    /// Waits for a `NewEvents` notification for the given block height. If no height is specified,
2249    /// any height is accepted.
2250    fn wait_for_events(
2251        &mut self,
2252        expected_height: impl Into<Option<BlockHeight>>,
2253    ) -> impl Future<Output = Result<BTreeSet<StreamId>>> {
2254        let expected_height = expected_height.into();
2255        self.wait_for(move |notification| {
2256            if let Reason::NewEvents {
2257                height,
2258                event_streams,
2259                ..
2260            } = notification.reason
2261            {
2262                if expected_height.is_none_or(|h| h == height) {
2263                    return Some(event_streams);
2264                }
2265            }
2266            None
2267        })
2268    }
2269
2270    /// Waits for a `NewBlock` notification for the given block height. If no height is specified,
2271    /// any height is accepted.
2272    fn wait_for_block(
2273        &mut self,
2274        expected_height: impl Into<Option<BlockHeight>>,
2275    ) -> impl Future<Output = Result<CryptoHash>> {
2276        let expected_height = expected_height.into();
2277        self.wait_for(move |notification| {
2278            if let Reason::NewBlock { height, hash, .. } = notification.reason {
2279                if expected_height.is_none_or(|h| h == height) {
2280                    return Some(hash);
2281                }
2282            }
2283            None
2284        })
2285    }
2286
2287    /// Waits for a `NewIncomingBundle` notification for the given sender chain and sender block
2288    /// height. If no height is specified, any height is accepted.
2289    fn wait_for_bundle(
2290        &mut self,
2291        expected_origin: ChainId,
2292        expected_height: impl Into<Option<BlockHeight>>,
2293    ) -> impl Future<Output = Result<()>> {
2294        let expected_height = expected_height.into();
2295        self.wait_for(move |notification| {
2296            if let Reason::NewIncomingBundle { height, origin } = notification.reason {
2297                if expected_height.is_none_or(|h| h == height) && origin == expected_origin {
2298                    return Some(());
2299                }
2300            }
2301            None
2302        })
2303    }
2304}
2305
2306#[cfg(with_testing)]
2307impl<S: Stream<Item = Result<Notification>>> NotificationsExt for Pin<Box<S>> {
2308    async fn wait_for<T>(&mut self, mut f: impl FnMut(Notification) -> Option<T>) -> Result<T> {
2309        let mut timeout = Box::pin(linera_base::time::timer::sleep(notification_timeout())).fuse();
2310        loop {
2311            let notification = futures::select! {
2312                () = timeout => bail!("Timeout waiting for notification"),
2313                notification = self.next().fuse() => notification.context("Stream closed")??,
2314            };
2315            if let Some(t) = f(notification) {
2316                return Ok(t);
2317            }
2318        }
2319    }
2320}