linera_service/cli_wrappers/
wallet.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    borrow::Cow,
6    collections::BTreeMap,
7    env,
8    marker::PhantomData,
9    mem,
10    path::{Path, PathBuf},
11    pin::Pin,
12    process::Stdio,
13    str::FromStr,
14    sync,
15    time::Duration,
16};
17
18use anyhow::{bail, ensure, Context, Result};
19use async_graphql::InputType;
20use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
21use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
22use heck::ToKebabCase;
23use linera_base::{
24    abi::ContractAbi,
25    command::{resolve_binary, CommandExt},
26    crypto::{CryptoHash, InMemorySigner},
27    data_types::{Amount, BlockHeight, Bytecode, Epoch},
28    identifiers::{
29        Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
30    },
31    vm::VmRuntime,
32};
33use linera_client::client_options::ResourceControlPolicyConfig;
34use linera_core::worker::Notification;
35use linera_execution::committee::Committee;
36use linera_faucet_client::Faucet;
37use serde::{de::DeserializeOwned, ser::Serialize};
38use serde_command_opts::to_args;
39use serde_json::{json, Value};
40use tempfile::TempDir;
41use tokio::{
42    io::{AsyncBufReadExt, BufReader},
43    process::{Child, Command},
44    sync::oneshot,
45    task::JoinHandle,
46};
47#[cfg(with_testing)]
48use {
49    futures::FutureExt as _,
50    linera_core::worker::Reason,
51    std::{collections::BTreeSet, future::Future},
52};
53
54use crate::{
55    cli::command::BenchmarkCommand,
56    cli_wrappers::{
57        local_net::{PathProvider, ProcessInbox},
58        Network,
59    },
60    util::{self, ChildExt},
61    wallet::Wallet,
62};
63
64/// The name of the environment variable that allows specifying additional arguments to be passed
65/// to the node-service command of the client.
66const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
67
68fn reqwest_client() -> reqwest::Client {
69    reqwest::ClientBuilder::new()
70        .timeout(Duration::from_secs(30))
71        .build()
72        .unwrap()
73}
74
75/// Wrapper to run a Linera client command.
76pub struct ClientWrapper {
77    binary_path: sync::Mutex<Option<PathBuf>>,
78    testing_prng_seed: Option<u64>,
79    storage: String,
80    wallet: String,
81    keystore: String,
82    max_pending_message_bundles: usize,
83    network: Network,
84    pub path_provider: PathProvider,
85    on_drop: OnClientDrop,
86    extra_args: Vec<String>,
87}
88
89/// Action to perform when the [`ClientWrapper`] is dropped.
90#[derive(Clone, Copy, Debug, Eq, PartialEq)]
91pub enum OnClientDrop {
92    /// Close all the chains on the wallet.
93    CloseChains,
94    /// Do not close any chains, leaving them active.
95    LeakChains,
96}
97
98impl ClientWrapper {
99    pub fn new(
100        path_provider: PathProvider,
101        network: Network,
102        testing_prng_seed: Option<u64>,
103        id: usize,
104        on_drop: OnClientDrop,
105    ) -> Self {
106        Self::new_with_extra_args(
107            path_provider,
108            network,
109            testing_prng_seed,
110            id,
111            on_drop,
112            vec!["--wait-for-outgoing-messages".to_string()],
113        )
114    }
115
116    pub fn new_with_extra_args(
117        path_provider: PathProvider,
118        network: Network,
119        testing_prng_seed: Option<u64>,
120        id: usize,
121        on_drop: OnClientDrop,
122        extra_args: Vec<String>,
123    ) -> Self {
124        let storage = format!(
125            "rocksdb:{}/client_{}.db",
126            path_provider.path().display(),
127            id
128        );
129        let wallet = format!("wallet_{}.json", id);
130        let keystore = format!("keystore_{}.json", id);
131        Self {
132            binary_path: sync::Mutex::new(None),
133            testing_prng_seed,
134            storage,
135            wallet,
136            keystore,
137            max_pending_message_bundles: 10_000,
138            network,
139            path_provider,
140            on_drop,
141            extra_args,
142        }
143    }
144
145    /// Runs `linera project new`.
146    pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
147        let tmp = TempDir::new()?;
148        let mut command = self.command().await?;
149        command
150            .current_dir(tmp.path())
151            .arg("project")
152            .arg("new")
153            .arg(project_name)
154            .arg("--linera-root")
155            .arg(linera_root)
156            .spawn_and_wait_for_stdout()
157            .await?;
158        Ok(tmp)
159    }
160
161    /// Runs `linera project publish`.
162    pub async fn project_publish<T: Serialize>(
163        &self,
164        path: PathBuf,
165        required_application_ids: Vec<String>,
166        publisher: impl Into<Option<ChainId>>,
167        argument: &T,
168    ) -> Result<String> {
169        let json_parameters = serde_json::to_string(&())?;
170        let json_argument = serde_json::to_string(argument)?;
171        let mut command = self.command().await?;
172        command
173            .arg("project")
174            .arg("publish-and-create")
175            .arg(path)
176            .args(publisher.into().iter().map(ChainId::to_string))
177            .args(["--json-parameters", &json_parameters])
178            .args(["--json-argument", &json_argument]);
179        if !required_application_ids.is_empty() {
180            command.arg("--required-application-ids");
181            command.args(required_application_ids);
182        }
183        let stdout = command.spawn_and_wait_for_stdout().await?;
184        Ok(stdout.trim().to_string())
185    }
186
187    /// Runs `linera project test`.
188    pub async fn project_test(&self, path: &Path) -> Result<()> {
189        self.command()
190            .await
191            .context("failed to create project test command")?
192            .current_dir(path)
193            .arg("project")
194            .arg("test")
195            .spawn_and_wait_for_stdout()
196            .await?;
197        Ok(())
198    }
199
200    async fn command_with_envs_and_arguments(
201        &self,
202        envs: &[(&str, &str)],
203        arguments: impl IntoIterator<Item = Cow<'_, str>>,
204    ) -> Result<Command> {
205        let mut command = self.command_binary().await?;
206        command.current_dir(self.path_provider.path());
207        for (key, value) in envs {
208            command.env(key, value);
209        }
210        for argument in arguments {
211            command.arg(&*argument);
212        }
213        Ok(command)
214    }
215
216    async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
217        self.command_with_envs_and_arguments(envs, self.command_arguments())
218            .await
219    }
220
221    async fn command_with_arguments(
222        &self,
223        arguments: impl IntoIterator<Item = Cow<'_, str>>,
224    ) -> Result<Command> {
225        self.command_with_envs_and_arguments(
226            &[(
227                "RUST_LOG",
228                &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
229            )],
230            arguments,
231        )
232        .await
233    }
234
235    async fn command(&self) -> Result<Command> {
236        self.command_with_envs(&[(
237            "RUST_LOG",
238            &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
239        )])
240        .await
241    }
242
243    fn required_command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
244        [
245            "--wallet".into(),
246            self.wallet.as_str().into(),
247            "--keystore".into(),
248            self.keystore.as_str().into(),
249            "--storage".into(),
250            self.storage.as_str().into(),
251            "--send-timeout-ms".into(),
252            "500000".into(),
253            "--recv-timeout-ms".into(),
254            "500000".into(),
255        ]
256        .into_iter()
257        .chain(self.extra_args.iter().map(|s| s.as_str().into()))
258    }
259
260    /// Returns an iterator over the arguments that should be added to all command invocations.
261    fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
262        self.required_command_arguments().chain([
263            "--max-pending-message-bundles".into(),
264            self.max_pending_message_bundles.to_string().into(),
265        ])
266    }
267
268    /// Returns the [`Command`] instance configured to run the appropriate binary.
269    ///
270    /// The path is resolved once and cached inside `self` for subsequent usages.
271    async fn command_binary(&self) -> Result<Command> {
272        match self.command_with_cached_binary_path() {
273            Some(command) => Ok(command),
274            None => {
275                let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
276                let command = Command::new(&resolved_path);
277
278                self.set_cached_binary_path(resolved_path);
279
280                Ok(command)
281            }
282        }
283    }
284
285    /// Returns a [`Command`] instance configured with the cached `binary_path`, if available.
286    fn command_with_cached_binary_path(&self) -> Option<Command> {
287        let binary_path = self.binary_path.lock().unwrap();
288
289        binary_path.as_ref().map(Command::new)
290    }
291
292    /// Sets the cached `binary_path` with the `new_binary_path`.
293    ///
294    /// # Panics
295    ///
296    /// If the cache is already set to a different value. In theory the two threads calling
297    /// `command_binary` can race and resolve the binary path twice, but they should always be the
298    /// same path.
299    fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
300        let mut binary_path = self.binary_path.lock().unwrap();
301
302        if binary_path.is_none() {
303            *binary_path = Some(new_binary_path);
304        } else {
305            assert_eq!(*binary_path, Some(new_binary_path));
306        }
307    }
308
309    /// Runs `linera create-genesis-config`.
310    pub async fn create_genesis_config(
311        &self,
312        num_other_initial_chains: u32,
313        initial_funding: Amount,
314        policy_config: ResourceControlPolicyConfig,
315        http_allow_list: Option<Vec<String>>,
316    ) -> Result<()> {
317        let mut command = self.command().await?;
318        command
319            .args([
320                "create-genesis-config",
321                &num_other_initial_chains.to_string(),
322            ])
323            .args(["--initial-funding", &initial_funding.to_string()])
324            .args(["--committee", "committee.json"])
325            .args(["--genesis", "genesis.json"])
326            .args([
327                "--policy-config",
328                &policy_config.to_string().to_kebab_case(),
329            ]);
330        if let Some(allow_list) = http_allow_list {
331            command
332                .arg("--http-request-allow-list")
333                .arg(allow_list.join(","));
334        }
335        if let Some(seed) = self.testing_prng_seed {
336            command.arg("--testing-prng-seed").arg(seed.to_string());
337        }
338        command.spawn_and_wait_for_stdout().await?;
339        Ok(())
340    }
341
342    /// Runs `linera wallet init`. The genesis config is read from `genesis.json`, or from the
343    /// faucet if provided.
344    pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
345        let mut command = self.command().await?;
346        command.args(["wallet", "init"]);
347        match faucet {
348            None => command.args(["--genesis", "genesis.json"]),
349            Some(faucet) => command.args(["--faucet", faucet.url()]),
350        };
351        if let Some(seed) = self.testing_prng_seed {
352            command.arg("--testing-prng-seed").arg(seed.to_string());
353        }
354        command.spawn_and_wait_for_stdout().await?;
355        Ok(())
356    }
357
358    /// Runs `linera wallet request-chain`.
359    pub async fn request_chain(
360        &self,
361        faucet: &Faucet,
362        set_default: bool,
363    ) -> Result<(ChainId, AccountOwner)> {
364        let mut command = self.command().await?;
365        command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
366        if set_default {
367            command.arg("--set-default");
368        }
369        let stdout = command.spawn_and_wait_for_stdout().await?;
370        let mut lines = stdout.split_whitespace();
371        let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
372        let owner = lines.next().context("missing chain owner")?.parse()?;
373        Ok((chain_id, owner))
374    }
375
376    /// Runs `linera wallet publish-and-create`.
377    #[expect(clippy::too_many_arguments)]
378    pub async fn publish_and_create<
379        A: ContractAbi,
380        Parameters: Serialize,
381        InstantiationArgument: Serialize,
382    >(
383        &self,
384        contract: PathBuf,
385        service: PathBuf,
386        vm_runtime: VmRuntime,
387        parameters: &Parameters,
388        argument: &InstantiationArgument,
389        required_application_ids: &[ApplicationId],
390        publisher: impl Into<Option<ChainId>>,
391    ) -> Result<ApplicationId<A>> {
392        let json_parameters = serde_json::to_string(parameters)?;
393        let json_argument = serde_json::to_string(argument)?;
394        let mut command = self.command().await?;
395        let vm_runtime = format!("{}", vm_runtime);
396        command
397            .arg("publish-and-create")
398            .args([contract, service])
399            .args(["--vm-runtime", &vm_runtime.to_lowercase()])
400            .args(publisher.into().iter().map(ChainId::to_string))
401            .args(["--json-parameters", &json_parameters])
402            .args(["--json-argument", &json_argument]);
403        if !required_application_ids.is_empty() {
404            command.arg("--required-application-ids");
405            command.args(
406                required_application_ids
407                    .iter()
408                    .map(ApplicationId::to_string),
409            );
410        }
411        let stdout = command.spawn_and_wait_for_stdout().await?;
412        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
413    }
414
415    /// Runs `linera publish-module`.
416    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
417        &self,
418        contract: PathBuf,
419        service: PathBuf,
420        vm_runtime: VmRuntime,
421        publisher: impl Into<Option<ChainId>>,
422    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
423        let stdout = self
424            .command()
425            .await?
426            .arg("publish-module")
427            .args([contract, service])
428            .args(["--vm-runtime", &format!("{}", vm_runtime).to_lowercase()])
429            .args(publisher.into().iter().map(ChainId::to_string))
430            .spawn_and_wait_for_stdout()
431            .await?;
432        let module_id: ModuleId = stdout.trim().parse()?;
433        Ok(module_id.with_abi())
434    }
435
436    /// Runs `linera create-application`.
437    pub async fn create_application<
438        Abi: ContractAbi,
439        Parameters: Serialize,
440        InstantiationArgument: Serialize,
441    >(
442        &self,
443        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
444        parameters: &Parameters,
445        argument: &InstantiationArgument,
446        required_application_ids: &[ApplicationId],
447        creator: impl Into<Option<ChainId>>,
448    ) -> Result<ApplicationId<Abi>> {
449        let json_parameters = serde_json::to_string(parameters)?;
450        let json_argument = serde_json::to_string(argument)?;
451        let mut command = self.command().await?;
452        command
453            .arg("create-application")
454            .arg(module_id.forget_abi().to_string())
455            .args(["--json-parameters", &json_parameters])
456            .args(["--json-argument", &json_argument])
457            .args(creator.into().iter().map(ChainId::to_string));
458        if !required_application_ids.is_empty() {
459            command.arg("--required-application-ids");
460            command.args(
461                required_application_ids
462                    .iter()
463                    .map(ApplicationId::to_string),
464            );
465        }
466        let stdout = command.spawn_and_wait_for_stdout().await?;
467        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
468    }
469
470    /// Runs `linera service`.
471    pub async fn run_node_service(
472        &self,
473        port: impl Into<Option<u16>>,
474        process_inbox: ProcessInbox,
475    ) -> Result<NodeService> {
476        self.run_node_service_with_options(port, process_inbox, &[], &[], false)
477            .await
478    }
479
480    /// Runs `linera service` with optional task processor configuration.
481    pub async fn run_node_service_with_options(
482        &self,
483        port: impl Into<Option<u16>>,
484        process_inbox: ProcessInbox,
485        operator_application_ids: &[ApplicationId],
486        operators: &[(String, PathBuf)],
487        read_only: bool,
488    ) -> Result<NodeService> {
489        let port = port.into().unwrap_or(8080);
490        let mut command = self.command().await?;
491        command.arg("service");
492        if let ProcessInbox::Skip = process_inbox {
493            command.arg("--listener-skip-process-inbox");
494        }
495        if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
496            command.args(var.split_whitespace());
497        }
498        for app_id in operator_application_ids {
499            command.args(["--operator-application-ids", &app_id.to_string()]);
500        }
501        for (name, path) in operators {
502            command.args(["--operators", &format!("{}={}", name, path.display())]);
503        }
504        if read_only {
505            command.arg("--read-only");
506        }
507        let child = command
508            .args(["--port".to_string(), port.to_string()])
509            .spawn_into()?;
510        let client = reqwest_client();
511        for i in 0..10 {
512            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
513            let request = client
514                .get(format!("http://localhost:{}/", port))
515                .send()
516                .await;
517            if request.is_ok() {
518                tracing::info!("Node service has started");
519                return Ok(NodeService::new(port, child));
520            } else {
521                tracing::warn!("Waiting for node service to start");
522            }
523        }
524        bail!("Failed to start node service");
525    }
526
527    /// Runs `linera service` with a controller application.
528    pub async fn run_node_service_with_controller(
529        &self,
530        port: impl Into<Option<u16>>,
531        process_inbox: ProcessInbox,
532        controller_id: &ApplicationId,
533        operators: &[(String, PathBuf)],
534    ) -> Result<NodeService> {
535        let port = port.into().unwrap_or(8080);
536        let mut command = self.command().await?;
537        command.arg("service");
538        if let ProcessInbox::Skip = process_inbox {
539            command.arg("--listener-skip-process-inbox");
540        }
541        if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
542            command.args(var.split_whitespace());
543        }
544        command.args(["--controller-id", &controller_id.to_string()]);
545        for (name, path) in operators {
546            command.args(["--operators", &format!("{}={}", name, path.display())]);
547        }
548        let child = command
549            .args(["--port".to_string(), port.to_string()])
550            .spawn_into()?;
551        let client = reqwest_client();
552        for i in 0..10 {
553            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
554            let request = client
555                .get(format!("http://localhost:{}/", port))
556                .send()
557                .await;
558            if request.is_ok() {
559                tracing::info!("Node service has started");
560                return Ok(NodeService::new(port, child));
561            } else {
562                tracing::warn!("Waiting for node service to start");
563            }
564        }
565        bail!("Failed to start node service");
566    }
567
568    /// Runs `linera validator query`
569    pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
570        let mut command = self.command().await?;
571        command.arg("validator").arg("query").arg(address);
572        let stdout = command.spawn_and_wait_for_stdout().await?;
573
574        // Parse the genesis config hash from the output.
575        // It's on a line like "Genesis config hash: <hash>"
576        let hash = stdout
577            .lines()
578            .find_map(|line| {
579                line.strip_prefix("Genesis config hash: ")
580                    .and_then(|hash_str| hash_str.trim().parse().ok())
581            })
582            .context("error while parsing the result of `linera validator query`")?;
583        Ok(hash)
584    }
585
586    /// Runs `linera validator list`.
587    pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
588        let mut command = self.command().await?;
589        command.arg("validator").arg("list");
590        if let Some(chain_id) = chain_id {
591            command.args(["--chain-id", &chain_id.to_string()]);
592        }
593        command.spawn_and_wait_for_stdout().await?;
594        Ok(())
595    }
596
597    /// Runs `linera sync-validator`.
598    pub async fn sync_validator(
599        &self,
600        chain_ids: impl IntoIterator<Item = &ChainId>,
601        validator_address: impl Into<String>,
602    ) -> Result<()> {
603        let mut command = self.command().await?;
604        command
605            .arg("validator")
606            .arg("sync")
607            .arg(validator_address.into());
608        let mut chain_ids = chain_ids.into_iter().peekable();
609        if chain_ids.peek().is_some() {
610            command
611                .arg("--chains")
612                .args(chain_ids.map(ChainId::to_string));
613        }
614        command.spawn_and_wait_for_stdout().await?;
615        Ok(())
616    }
617
618    /// Runs `linera faucet`.
619    pub async fn run_faucet(
620        &self,
621        port: impl Into<Option<u16>>,
622        chain_id: Option<ChainId>,
623        amount: Amount,
624    ) -> Result<FaucetService> {
625        let port = port.into().unwrap_or(8080);
626        let temp_dir = tempfile::tempdir()
627            .context("Failed to create temporary directory for faucet storage")?;
628        let storage_path = temp_dir.path().join("faucet_storage.sqlite");
629        let mut command = self.command().await?;
630        let command = command
631            .arg("faucet")
632            .args(["--port".to_string(), port.to_string()])
633            .args(["--amount".to_string(), amount.to_string()])
634            .args([
635                "--storage-path".to_string(),
636                storage_path.to_string_lossy().to_string(),
637            ]);
638        if let Some(chain_id) = chain_id {
639            command.arg(chain_id.to_string());
640        }
641        let child = command.spawn_into()?;
642        let client = reqwest_client();
643        for i in 0..10 {
644            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
645            let request = client
646                .get(format!("http://localhost:{}/", port))
647                .send()
648                .await;
649            if request.is_ok() {
650                tracing::info!("Faucet has started");
651                return Ok(FaucetService::new(port, child, temp_dir));
652            } else {
653                tracing::debug!("Waiting for faucet to start");
654            }
655        }
656        bail!("Failed to start faucet");
657    }
658
659    /// Runs `linera local-balance`.
660    pub async fn local_balance(&self, account: Account) -> Result<Amount> {
661        let stdout = self
662            .command()
663            .await?
664            .arg("local-balance")
665            .arg(account.to_string())
666            .spawn_and_wait_for_stdout()
667            .await?;
668        let amount = stdout
669            .trim()
670            .parse()
671            .context("error while parsing the result of `linera local-balance`")?;
672        Ok(amount)
673    }
674
675    /// Runs `linera query-balance`.
676    pub async fn query_balance(&self, account: Account) -> Result<Amount> {
677        let stdout = self
678            .command()
679            .await?
680            .arg("query-balance")
681            .arg(account.to_string())
682            .spawn_and_wait_for_stdout()
683            .await?;
684        let amount = stdout
685            .trim()
686            .parse()
687            .context("error while parsing the result of `linera query-balance`")?;
688        Ok(amount)
689    }
690
691    /// Runs `linera sync`.
692    pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
693        self.command()
694            .await?
695            .arg("sync")
696            .arg(chain_id.to_string())
697            .spawn_and_wait_for_stdout()
698            .await?;
699        Ok(())
700    }
701
702    /// Runs `linera process-inbox`.
703    pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
704        self.command()
705            .await?
706            .arg("process-inbox")
707            .arg(chain_id.to_string())
708            .spawn_and_wait_for_stdout()
709            .await?;
710        Ok(())
711    }
712
713    /// Runs `linera transfer`.
714    pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
715        self.command()
716            .await?
717            .arg("transfer")
718            .arg(amount.to_string())
719            .args(["--from", &from.to_string()])
720            .args(["--to", &to.to_string()])
721            .spawn_and_wait_for_stdout()
722            .await?;
723        Ok(())
724    }
725
726    /// Runs `linera transfer` with no logging.
727    pub async fn transfer_with_silent_logs(
728        &self,
729        amount: Amount,
730        from: ChainId,
731        to: ChainId,
732    ) -> Result<()> {
733        self.command()
734            .await?
735            .env("RUST_LOG", "off")
736            .arg("transfer")
737            .arg(amount.to_string())
738            .args(["--from", &from.to_string()])
739            .args(["--to", &to.to_string()])
740            .spawn_and_wait_for_stdout()
741            .await?;
742        Ok(())
743    }
744
745    /// Runs `linera transfer` with owner accounts.
746    pub async fn transfer_with_accounts(
747        &self,
748        amount: Amount,
749        from: Account,
750        to: Account,
751    ) -> Result<()> {
752        self.command()
753            .await?
754            .arg("transfer")
755            .arg(amount.to_string())
756            .args(["--from", &from.to_string()])
757            .args(["--to", &to.to_string()])
758            .spawn_and_wait_for_stdout()
759            .await?;
760        Ok(())
761    }
762
763    fn benchmark_command_internal(command: &mut Command, args: BenchmarkCommand) -> Result<()> {
764        let mut formatted_args = to_args(&args)?;
765        let subcommand = formatted_args.remove(0);
766        // The subcommand is followed by the flattened options, which are preceded by "options".
767        // So remove that as well.
768        formatted_args.remove(0);
769        let options = formatted_args
770            .chunks_exact(2)
771            .flat_map(|pair| {
772                let option = format!("--{}", pair[0]);
773                match pair[1].as_str() {
774                    "true" => vec![option],
775                    "false" => vec![],
776                    _ => vec![option, pair[1].clone()],
777                }
778            })
779            .collect::<Vec<_>>();
780        command
781            .args([
782                "--max-pending-message-bundles",
783                &args.transactions_per_block().to_string(),
784            ])
785            .arg("benchmark")
786            .arg(subcommand)
787            .args(options);
788        Ok(())
789    }
790
791    async fn benchmark_command_with_envs(
792        &self,
793        args: BenchmarkCommand,
794        envs: &[(&str, &str)],
795    ) -> Result<Command> {
796        let mut command = self
797            .command_with_envs_and_arguments(envs, self.required_command_arguments())
798            .await?;
799        Self::benchmark_command_internal(&mut command, args)?;
800        Ok(command)
801    }
802
803    async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
804        let mut command = self
805            .command_with_arguments(self.required_command_arguments())
806            .await?;
807        Self::benchmark_command_internal(&mut command, args)?;
808        Ok(command)
809    }
810
811    /// Runs `linera benchmark`.
812    pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
813        let mut command = self.benchmark_command(args).await?;
814        command.spawn_and_wait_for_stdout().await?;
815        Ok(())
816    }
817
818    /// Runs `linera benchmark`, but detached: don't wait for the command to finish, just spawn it
819    /// and return the child process, and the handles to the stdout and stderr.
820    pub async fn benchmark_detached(
821        &self,
822        args: BenchmarkCommand,
823        tx: oneshot::Sender<()>,
824    ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
825        let mut child = self
826            .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
827            .await?
828            .kill_on_drop(true)
829            .stdin(Stdio::piped())
830            .stdout(Stdio::piped())
831            .stderr(Stdio::piped())
832            .spawn()?;
833
834        let pid = child.id().expect("failed to get pid");
835        let stdout = child.stdout.take().expect("stdout not open");
836        let stdout_handle = tokio::spawn(async move {
837            let mut lines = BufReader::new(stdout).lines();
838            while let Ok(Some(line)) = lines.next_line().await {
839                println!("benchmark{{pid={pid}}} {line}");
840            }
841        });
842
843        let stderr = child.stderr.take().expect("stderr not open");
844        let stderr_handle = tokio::spawn(async move {
845            let mut lines = BufReader::new(stderr).lines();
846            let mut tx = Some(tx);
847            while let Ok(Some(line)) = lines.next_line().await {
848                if line.contains("Ready to start benchmark") {
849                    tx.take()
850                        .expect("Should only send signal once")
851                        .send(())
852                        .expect("failed to send ready signal to main thread");
853                } else {
854                    println!("benchmark{{pid={pid}}} {line}");
855                }
856            }
857        });
858        Ok((child, stdout_handle, stderr_handle))
859    }
860
861    async fn open_chain_internal(
862        &self,
863        from: ChainId,
864        owner: Option<AccountOwner>,
865        initial_balance: Amount,
866        super_owner: bool,
867    ) -> Result<(ChainId, AccountOwner)> {
868        let mut command = self.command().await?;
869        command
870            .arg("open-chain")
871            .args(["--from", &from.to_string()])
872            .args(["--initial-balance", &initial_balance.to_string()]);
873
874        if let Some(owner) = owner {
875            command.args(["--owner", &owner.to_string()]);
876        }
877
878        if super_owner {
879            command.arg("--super-owner");
880        }
881
882        let stdout = command.spawn_and_wait_for_stdout().await?;
883        let mut split = stdout.split('\n');
884        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
885        let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
886        if let Some(owner) = owner {
887            assert_eq!(owner, new_owner);
888        }
889        Ok((chain_id, new_owner))
890    }
891
892    /// Runs `linera open-chain --super-owner`.
893    pub async fn open_chain_super_owner(
894        &self,
895        from: ChainId,
896        owner: Option<AccountOwner>,
897        initial_balance: Amount,
898    ) -> Result<(ChainId, AccountOwner)> {
899        self.open_chain_internal(from, owner, initial_balance, true)
900            .await
901    }
902
903    /// Runs `linera open-chain`.
904    pub async fn open_chain(
905        &self,
906        from: ChainId,
907        owner: Option<AccountOwner>,
908        initial_balance: Amount,
909    ) -> Result<(ChainId, AccountOwner)> {
910        self.open_chain_internal(from, owner, initial_balance, false)
911            .await
912    }
913
914    /// Runs `linera open-chain` then `linera assign`.
915    pub async fn open_and_assign(
916        &self,
917        client: &ClientWrapper,
918        initial_balance: Amount,
919    ) -> Result<ChainId> {
920        let our_chain = self
921            .load_wallet()?
922            .default_chain()
923            .context("no default chain found")?;
924        let owner = client.keygen().await?;
925        let (new_chain, _) = self
926            .open_chain(our_chain, Some(owner), initial_balance)
927            .await?;
928        client.assign(owner, new_chain).await?;
929        Ok(new_chain)
930    }
931
932    pub async fn open_multi_owner_chain(
933        &self,
934        from: ChainId,
935        owners: BTreeMap<AccountOwner, u64>,
936        multi_leader_rounds: u32,
937        balance: Amount,
938        base_timeout_ms: u64,
939    ) -> Result<ChainId> {
940        let mut command = self.command().await?;
941        command
942            .arg("open-multi-owner-chain")
943            .args(["--from", &from.to_string()])
944            .arg("--owners")
945            .arg(serde_json::to_string(&owners)?)
946            .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
947        command
948            .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
949            .args(["--initial-balance", &balance.to_string()]);
950
951        let stdout = command.spawn_and_wait_for_stdout().await?;
952        let mut split = stdout.split('\n');
953        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
954
955        Ok(chain_id)
956    }
957
958    pub async fn change_ownership(
959        &self,
960        chain_id: ChainId,
961        super_owners: Vec<AccountOwner>,
962        owners: Vec<AccountOwner>,
963    ) -> Result<()> {
964        let mut command = self.command().await?;
965        command
966            .arg("change-ownership")
967            .args(["--chain-id", &chain_id.to_string()]);
968        command
969            .arg("--super-owners")
970            .arg(serde_json::to_string(&super_owners)?);
971        command.arg("--owners").arg(serde_json::to_string(
972            &owners
973                .into_iter()
974                .zip(std::iter::repeat(100u64))
975                .collect::<BTreeMap<_, _>>(),
976        )?);
977        command.spawn_and_wait_for_stdout().await?;
978        Ok(())
979    }
980
981    /// Runs `linera wallet follow-chain CHAIN_ID`.
982    pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
983        let mut command = self.command().await?;
984        command
985            .args(["wallet", "follow-chain"])
986            .arg(chain_id.to_string());
987        if sync {
988            command.arg("--sync");
989        }
990        command.spawn_and_wait_for_stdout().await?;
991        Ok(())
992    }
993
994    /// Runs `linera wallet forget-chain CHAIN_ID`.
995    pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
996        let mut command = self.command().await?;
997        command
998            .args(["wallet", "forget-chain"])
999            .arg(chain_id.to_string());
1000        command.spawn_and_wait_for_stdout().await?;
1001        Ok(())
1002    }
1003
1004    /// Runs `linera wallet set-default CHAIN_ID`.
1005    pub async fn set_default_chain(&self, chain_id: ChainId) -> Result<()> {
1006        let mut command = self.command().await?;
1007        command
1008            .args(["wallet", "set-default"])
1009            .arg(chain_id.to_string());
1010        command.spawn_and_wait_for_stdout().await?;
1011        Ok(())
1012    }
1013
1014    pub async fn retry_pending_block(
1015        &self,
1016        chain_id: Option<ChainId>,
1017    ) -> Result<Option<CryptoHash>> {
1018        let mut command = self.command().await?;
1019        command.arg("retry-pending-block");
1020        if let Some(chain_id) = chain_id {
1021            command.arg(chain_id.to_string());
1022        }
1023        let stdout = command.spawn_and_wait_for_stdout().await?;
1024        let stdout = stdout.trim();
1025        if stdout.is_empty() {
1026            Ok(None)
1027        } else {
1028            Ok(Some(CryptoHash::from_str(stdout)?))
1029        }
1030    }
1031
1032    /// Runs `linera publish-data-blob`.
1033    pub async fn publish_data_blob(
1034        &self,
1035        path: &Path,
1036        chain_id: Option<ChainId>,
1037    ) -> Result<CryptoHash> {
1038        let mut command = self.command().await?;
1039        command.arg("publish-data-blob").arg(path);
1040        if let Some(chain_id) = chain_id {
1041            command.arg(chain_id.to_string());
1042        }
1043        let stdout = command.spawn_and_wait_for_stdout().await?;
1044        let stdout = stdout.trim();
1045        Ok(CryptoHash::from_str(stdout)?)
1046    }
1047
1048    /// Runs `linera read-data-blob`.
1049    pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
1050        let mut command = self.command().await?;
1051        command.arg("read-data-blob").arg(hash.to_string());
1052        if let Some(chain_id) = chain_id {
1053            command.arg(chain_id.to_string());
1054        }
1055        command.spawn_and_wait_for_stdout().await?;
1056        Ok(())
1057    }
1058
1059    pub fn load_wallet(&self) -> Result<Wallet> {
1060        Ok(Wallet::read(&self.wallet_path())?)
1061    }
1062
1063    pub fn load_keystore(&self) -> Result<InMemorySigner> {
1064        util::read_json(self.keystore_path())
1065    }
1066
1067    pub fn wallet_path(&self) -> PathBuf {
1068        self.path_provider.path().join(&self.wallet)
1069    }
1070
1071    pub fn keystore_path(&self) -> PathBuf {
1072        self.path_provider.path().join(&self.keystore)
1073    }
1074
1075    pub fn storage_path(&self) -> &str {
1076        &self.storage
1077    }
1078
1079    pub fn get_owner(&self) -> Option<AccountOwner> {
1080        let wallet = self.load_wallet().ok()?;
1081        wallet
1082            .get(wallet.default_chain()?)
1083            .expect("default chain must be in wallet")
1084            .owner
1085    }
1086
1087    pub fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
1088        self.load_wallet()
1089            .ok()
1090            .is_some_and(|wallet| wallet.get(chain).is_some())
1091    }
1092
1093    pub async fn set_validator(
1094        &self,
1095        validator_key: &(String, String),
1096        port: usize,
1097        votes: usize,
1098    ) -> Result<()> {
1099        let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1100        self.command()
1101            .await?
1102            .arg("validator")
1103            .arg("add")
1104            .args(["--public-key", &validator_key.0])
1105            .args(["--account-key", &validator_key.1])
1106            .args(["--address", &address])
1107            .args(["--votes", &votes.to_string()])
1108            .spawn_and_wait_for_stdout()
1109            .await?;
1110        Ok(())
1111    }
1112
1113    pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
1114        self.command()
1115            .await?
1116            .arg("validator")
1117            .arg("remove")
1118            .args(["--public-key", validator_key])
1119            .spawn_and_wait_for_stdout()
1120            .await?;
1121        Ok(())
1122    }
1123
1124    pub async fn change_validators(
1125        &self,
1126        add_validators: &[(String, String, usize, usize)], // (public_key, account_key, port, votes)
1127        modify_validators: &[(String, String, usize, usize)], // (public_key, account_key, port, votes)
1128        remove_validators: &[String],
1129    ) -> Result<()> {
1130        use std::str::FromStr;
1131
1132        use linera_base::crypto::{AccountPublicKey, ValidatorPublicKey};
1133
1134        // Build a map that will be serialized to JSON
1135        // Use the exact types that deserialization expects
1136        let mut changes = std::collections::HashMap::new();
1137
1138        // Add/modify validators
1139        for (public_key_str, account_key_str, port, votes) in
1140            add_validators.iter().chain(modify_validators.iter())
1141        {
1142            let public_key = ValidatorPublicKey::from_str(public_key_str)
1143                .with_context(|| format!("Invalid validator public key: {}", public_key_str))?;
1144
1145            let account_key = AccountPublicKey::from_str(account_key_str)
1146                .with_context(|| format!("Invalid account public key: {}", account_key_str))?;
1147
1148            let address = format!("{}:127.0.0.1:{}", self.network.short(), port)
1149                .parse()
1150                .unwrap();
1151
1152            // Create ValidatorChange struct
1153            let change = crate::cli::validator::Change {
1154                account_key,
1155                address,
1156                votes: crate::cli::validator::Votes(
1157                    std::num::NonZero::new(*votes as u64).context("Votes must be non-zero")?,
1158                ),
1159            };
1160
1161            changes.insert(public_key, Some(change));
1162        }
1163
1164        // Remove validators (set to None)
1165        for validator_key_str in remove_validators {
1166            let public_key = ValidatorPublicKey::from_str(validator_key_str)
1167                .with_context(|| format!("Invalid validator public key: {}", validator_key_str))?;
1168            changes.insert(public_key, None);
1169        }
1170
1171        // Create temporary file with JSON
1172        let temp_file = tempfile::NamedTempFile::new()
1173            .context("Failed to create temporary file for validator changes")?;
1174        serde_json::to_writer(&temp_file, &changes)
1175            .context("Failed to write validator changes to file")?;
1176        let temp_path = temp_file.path();
1177
1178        self.command()
1179            .await?
1180            .arg("validator")
1181            .arg("update")
1182            .arg(temp_path)
1183            .arg("--yes") // Skip confirmation prompt
1184            .spawn_and_wait_for_stdout()
1185            .await?;
1186
1187        Ok(())
1188    }
1189
1190    pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
1191        self.command()
1192            .await?
1193            .arg("revoke-epochs")
1194            .arg(epoch.to_string())
1195            .spawn_and_wait_for_stdout()
1196            .await?;
1197        Ok(())
1198    }
1199
1200    /// Runs `linera keygen`.
1201    pub async fn keygen(&self) -> Result<AccountOwner> {
1202        let stdout = self
1203            .command()
1204            .await?
1205            .arg("keygen")
1206            .spawn_and_wait_for_stdout()
1207            .await?;
1208        AccountOwner::from_str(stdout.as_str().trim())
1209    }
1210
1211    /// Returns the default chain.
1212    pub fn default_chain(&self) -> Option<ChainId> {
1213        self.load_wallet().ok()?.default_chain()
1214    }
1215
1216    /// Runs `linera assign`.
1217    pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
1218        let _stdout = self
1219            .command()
1220            .await?
1221            .arg("assign")
1222            .args(["--owner", &owner.to_string()])
1223            .args(["--chain-id", &chain_id.to_string()])
1224            .spawn_and_wait_for_stdout()
1225            .await?;
1226        Ok(())
1227    }
1228
1229    /// Runs `linera set-preferred-owner` for `chain_id`.
1230    pub async fn set_preferred_owner(
1231        &self,
1232        chain_id: ChainId,
1233        owner: Option<AccountOwner>,
1234    ) -> Result<()> {
1235        let mut owner_arg = vec!["--owner".to_string()];
1236        if let Some(owner) = owner {
1237            owner_arg.push(owner.to_string());
1238        };
1239        self.command()
1240            .await?
1241            .arg("set-preferred-owner")
1242            .args(["--chain-id", &chain_id.to_string()])
1243            .args(owner_arg)
1244            .spawn_and_wait_for_stdout()
1245            .await?;
1246        Ok(())
1247    }
1248
1249    pub async fn build_application(
1250        &self,
1251        path: &Path,
1252        name: &str,
1253        is_workspace: bool,
1254    ) -> Result<(PathBuf, PathBuf)> {
1255        Command::new("cargo")
1256            .current_dir(self.path_provider.path())
1257            .arg("build")
1258            .arg("--release")
1259            .args(["--target", "wasm32-unknown-unknown"])
1260            .arg("--manifest-path")
1261            .arg(path.join("Cargo.toml"))
1262            .spawn_and_wait_for_stdout()
1263            .await?;
1264
1265        let release_dir = match is_workspace {
1266            true => path.join("../target/wasm32-unknown-unknown/release"),
1267            false => path.join("target/wasm32-unknown-unknown/release"),
1268        };
1269
1270        let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1271        let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1272
1273        let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1274        let service_size = fs_err::tokio::metadata(&service).await?.len();
1275        tracing::info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1276
1277        Ok((contract, service))
1278    }
1279}
1280
1281impl Drop for ClientWrapper {
1282    fn drop(&mut self) {
1283        use std::process::Command as SyncCommand;
1284
1285        if self.on_drop != OnClientDrop::CloseChains {
1286            return;
1287        }
1288
1289        let Ok(binary_path) = self.binary_path.lock() else {
1290            tracing::error!(
1291                "Failed to close chains because a thread panicked with a lock to `binary_path`"
1292            );
1293            return;
1294        };
1295
1296        let Some(binary_path) = binary_path.as_ref() else {
1297            tracing::warn!(
1298                "Assuming no chains need to be closed, because the command binary was never \
1299                resolved and therefore presumably never called"
1300            );
1301            return;
1302        };
1303
1304        let working_directory = self.path_provider.path();
1305        let mut wallet_show_command = SyncCommand::new(binary_path);
1306
1307        for argument in self.command_arguments() {
1308            wallet_show_command.arg(&*argument);
1309        }
1310
1311        let Ok(wallet_show_output) = wallet_show_command
1312            .current_dir(working_directory)
1313            .args(["wallet", "show", "--short", "--owned"])
1314            .output()
1315        else {
1316            tracing::warn!("Failed to execute `wallet show --short` to list chains to close");
1317            return;
1318        };
1319
1320        if !wallet_show_output.status.success() {
1321            tracing::warn!("Failed to list chains in the wallet to close them");
1322            return;
1323        }
1324
1325        let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1326            tracing::warn!(
1327                "Failed to close chains because `linera wallet show --short` \
1328                returned a non-UTF-8 output"
1329            );
1330            return;
1331        };
1332
1333        let chain_ids = chain_list_string
1334            .split('\n')
1335            .map(|line| line.trim())
1336            .filter(|line| !line.is_empty());
1337
1338        for chain_id in chain_ids {
1339            let mut close_chain_command = SyncCommand::new(binary_path);
1340
1341            for argument in self.command_arguments() {
1342                close_chain_command.arg(&*argument);
1343            }
1344
1345            close_chain_command.current_dir(working_directory);
1346
1347            match close_chain_command.args(["close-chain", chain_id]).status() {
1348                Ok(status) if status.success() => (),
1349                Ok(failure) => tracing::warn!("Failed to close chain {chain_id}: {failure}"),
1350                Err(error) => tracing::warn!("Failed to close chain {chain_id}: {error}"),
1351            }
1352        }
1353    }
1354}
1355
1356#[cfg(with_testing)]
1357impl ClientWrapper {
1358    pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1359        self.build_application(Self::example_path(name)?.as_path(), name, true)
1360            .await
1361    }
1362
1363    pub fn example_path(name: &str) -> Result<PathBuf> {
1364        Ok(env::current_dir()?.join("../examples/").join(name))
1365    }
1366}
1367
1368fn truncate_query_output(input: &str) -> String {
1369    let max_len = 1000;
1370    if input.len() < max_len {
1371        input.to_string()
1372    } else {
1373        format!("{} ...", input.get(..max_len).unwrap())
1374    }
1375}
1376
1377fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1378    let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1379    let max_len = 200;
1380    if query.len() < max_len {
1381        query
1382    } else {
1383        format!("{} ...", query.get(..max_len).unwrap())
1384    }
1385}
1386
1387/// A running node service.
1388pub struct NodeService {
1389    port: u16,
1390    child: Child,
1391}
1392
1393impl NodeService {
1394    fn new(port: u16, child: Child) -> Self {
1395        Self { port, child }
1396    }
1397
1398    pub async fn terminate(mut self) -> Result<()> {
1399        self.child.kill().await.context("terminating node service")
1400    }
1401
1402    pub fn port(&self) -> u16 {
1403        self.port
1404    }
1405
1406    pub fn ensure_is_running(&mut self) -> Result<()> {
1407        self.child.ensure_is_running()
1408    }
1409
1410    pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1411        let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1412        let mut data = self.query_node(query).await?;
1413        Ok(serde_json::from_value(data["processInbox"].take())?)
1414    }
1415
1416    pub async fn sync(&self, chain_id: &ChainId) -> Result<u64> {
1417        let query = format!("mutation {{ sync(chainId: \"{chain_id}\") }}");
1418        let mut data = self.query_node(query).await?;
1419        Ok(serde_json::from_value(data["sync"].take())?)
1420    }
1421
1422    pub async fn transfer(
1423        &self,
1424        chain_id: ChainId,
1425        owner: AccountOwner,
1426        recipient: Account,
1427        amount: Amount,
1428    ) -> Result<CryptoHash> {
1429        let json_owner = owner.to_value();
1430        let json_recipient = recipient.to_value();
1431        let query = format!(
1432            "mutation {{ transfer(\
1433                 chainId: \"{chain_id}\", \
1434                 owner: {json_owner}, \
1435                 recipient: {json_recipient}, \
1436                 amount: \"{amount}\") \
1437             }}"
1438        );
1439        let data = self.query_node(query).await?;
1440        serde_json::from_value(data["transfer"].clone())
1441            .context("missing transfer field in response")
1442    }
1443
1444    pub async fn balance(&self, account: &Account) -> Result<Amount> {
1445        let chain = account.chain_id;
1446        let owner = account.owner;
1447        if matches!(owner, AccountOwner::CHAIN) {
1448            let query = format!(
1449                "query {{ chain(chainId:\"{chain}\") {{
1450                    executionState {{ system {{ balance }} }}
1451                }} }}"
1452            );
1453            let response = self.query_node(query).await?;
1454            let balance = &response["chain"]["executionState"]["system"]["balance"]
1455                .as_str()
1456                .unwrap();
1457            return Ok(Amount::from_str(balance)?);
1458        }
1459        let query = format!(
1460            "query {{ chain(chainId:\"{chain}\") {{
1461                executionState {{ system {{ balances {{
1462                    entry(key:\"{owner}\") {{ value }}
1463                }} }} }}
1464            }} }}"
1465        );
1466        let response = self.query_node(query).await?;
1467        let balances = &response["chain"]["executionState"]["system"]["balances"];
1468        let balance = balances["entry"]["value"].as_str();
1469        match balance {
1470            None => Ok(Amount::ZERO),
1471            Some(amount) => Ok(Amount::from_str(amount)?),
1472        }
1473    }
1474
1475    pub fn make_application<A: ContractAbi>(
1476        &self,
1477        chain_id: &ChainId,
1478        application_id: &ApplicationId<A>,
1479    ) -> Result<ApplicationWrapper<A>> {
1480        let application_id = application_id.forget_abi().to_string();
1481        let link = format!(
1482            "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1483            self.port
1484        );
1485        Ok(ApplicationWrapper::from(link))
1486    }
1487
1488    pub async fn publish_data_blob(
1489        &self,
1490        chain_id: &ChainId,
1491        bytes: Vec<u8>,
1492    ) -> Result<CryptoHash> {
1493        let query = format!(
1494            "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1495            chain_id.to_value(),
1496            bytes.to_value(),
1497        );
1498        let data = self.query_node(query).await?;
1499        serde_json::from_value(data["publishDataBlob"].clone())
1500            .context("missing publishDataBlob field in response")
1501    }
1502
1503    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1504        &self,
1505        chain_id: &ChainId,
1506        contract: PathBuf,
1507        service: PathBuf,
1508        vm_runtime: VmRuntime,
1509    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1510        let contract_code = Bytecode::load_from_file(&contract).await?;
1511        let service_code = Bytecode::load_from_file(&service).await?;
1512        let query = format!(
1513            "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1514            chain_id.to_value(),
1515            contract_code.to_value(),
1516            service_code.to_value(),
1517            vm_runtime.to_value(),
1518        );
1519        let data = self.query_node(query).await?;
1520        let module_str = data["publishModule"]
1521            .as_str()
1522            .context("module ID not found")?;
1523        let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1524        Ok(module_id.with_abi())
1525    }
1526
1527    pub async fn query_committees(&self, chain_id: &ChainId) -> Result<BTreeMap<Epoch, Committee>> {
1528        let query = format!(
1529            "query {{ chain(chainId:\"{chain_id}\") {{
1530                executionState {{ system {{ committees }} }}
1531            }} }}"
1532        );
1533        let mut response = self.query_node(query).await?;
1534        let committees = response["chain"]["executionState"]["system"]["committees"].take();
1535        Ok(serde_json::from_value(committees)?)
1536    }
1537
1538    pub async fn events_from_index(
1539        &self,
1540        chain_id: &ChainId,
1541        stream_id: &StreamId,
1542        start_index: u32,
1543    ) -> Result<Vec<IndexAndEvent>> {
1544        let query = format!(
1545            "query {{
1546               eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1547               {{ index event }}
1548             }}",
1549            stream_id.to_value()
1550        );
1551        let mut response = self.query_node(query).await?;
1552        let response = response["eventsFromIndex"].take();
1553        Ok(serde_json::from_value(response)?)
1554    }
1555
1556    pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1557        let n_try = 5;
1558        let query = query.as_ref();
1559        for i in 0..n_try {
1560            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1561            let url = format!("http://localhost:{}/", self.port);
1562            let client = reqwest_client();
1563            let result = client
1564                .post(url)
1565                .json(&json!({ "query": query }))
1566                .send()
1567                .await;
1568            if matches!(result, Err(ref error) if error.is_timeout()) {
1569                tracing::warn!(
1570                    "Timeout when sending query {} to the node service",
1571                    truncate_query_output(query)
1572                );
1573                continue;
1574            }
1575            let response = result.with_context(|| {
1576                format!(
1577                    "query_node: failed to post query={}",
1578                    truncate_query_output(query)
1579                )
1580            })?;
1581            ensure!(
1582                response.status().is_success(),
1583                "Query \"{}\" failed: {}",
1584                truncate_query_output(query),
1585                response
1586                    .text()
1587                    .await
1588                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1589            );
1590            let value: Value = response.json().await.context("invalid JSON")?;
1591            if let Some(errors) = value.get("errors") {
1592                tracing::warn!(
1593                    "Query \"{}\" failed: {}",
1594                    truncate_query_output(query),
1595                    errors
1596                );
1597            } else {
1598                return Ok(value["data"].clone());
1599            }
1600        }
1601        bail!(
1602            "Query \"{}\" failed after {} retries.",
1603            truncate_query_output(query),
1604            n_try
1605        );
1606    }
1607
1608    pub async fn create_application<
1609        Abi: ContractAbi,
1610        Parameters: Serialize,
1611        InstantiationArgument: Serialize,
1612    >(
1613        &self,
1614        chain_id: &ChainId,
1615        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1616        parameters: &Parameters,
1617        argument: &InstantiationArgument,
1618        required_application_ids: &[ApplicationId],
1619    ) -> Result<ApplicationId<Abi>> {
1620        let module_id = module_id.forget_abi();
1621        let json_required_applications_ids = required_application_ids
1622            .iter()
1623            .map(ApplicationId::to_string)
1624            .collect::<Vec<_>>()
1625            .to_value();
1626        // Convert to `serde_json::Value` then `async_graphql::Value` via the trait `InputType`.
1627        let new_parameters = serde_json::to_value(parameters)
1628            .context("could not create parameters JSON")?
1629            .to_value();
1630        let new_argument = serde_json::to_value(argument)
1631            .context("could not create argument JSON")?
1632            .to_value();
1633        let query = format!(
1634            "mutation {{ createApplication(\
1635                 chainId: \"{chain_id}\",
1636                 moduleId: \"{module_id}\", \
1637                 parameters: {new_parameters}, \
1638                 instantiationArgument: {new_argument}, \
1639                 requiredApplicationIds: {json_required_applications_ids}) \
1640             }}"
1641        );
1642        let data = self.query_node(query).await?;
1643        let app_id_str = data["createApplication"]
1644            .as_str()
1645            .context("missing createApplication string in response")?
1646            .trim();
1647        Ok(app_id_str
1648            .parse::<ApplicationId>()
1649            .context("invalid application ID")?
1650            .with_abi())
1651    }
1652
1653    /// Obtains the hash and height of the `chain`'s tip block, as known by this node service.
1654    pub async fn chain_tip(&self, chain: ChainId) -> Result<Option<(CryptoHash, BlockHeight)>> {
1655        let query = format!(
1656            r#"query {{ block(chainId: "{chain}") {{
1657                hash
1658                block {{ header {{ height }} }}
1659            }} }}"#
1660        );
1661
1662        let mut response = self.query_node(&query).await?;
1663
1664        match (
1665            mem::take(&mut response["block"]["hash"]),
1666            mem::take(&mut response["block"]["block"]["header"]["height"]),
1667        ) {
1668            (Value::Null, Value::Null) => Ok(None),
1669            (Value::String(hash), Value::Number(height)) => Ok(Some((
1670                hash.parse()
1671                    .context("Received an invalid hash {hash:?} for chain tip")?,
1672                BlockHeight(height.as_u64().unwrap()),
1673            ))),
1674            invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
1675        }
1676    }
1677
1678    /// Subscribes to the node service and returns a stream of notifications about a chain.
1679    pub async fn notifications(
1680        &self,
1681        chain_id: ChainId,
1682    ) -> Result<Pin<Box<impl Stream<Item = Result<Notification>>>>> {
1683        let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
1684        let url = format!("ws://localhost:{}/ws", self.port);
1685        let mut request = url.into_client_request()?;
1686        request.headers_mut().insert(
1687            "Sec-WebSocket-Protocol",
1688            HeaderValue::from_str("graphql-transport-ws")?,
1689        );
1690        let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1691        let init_json = json!({
1692          "type": "connection_init",
1693          "payload": {}
1694        });
1695        websocket.send(init_json.to_string().into()).await?;
1696        let text = websocket
1697            .next()
1698            .await
1699            .context("Failed to establish connection")??
1700            .into_text()?;
1701        ensure!(
1702            text == "{\"type\":\"connection_ack\"}",
1703            "Unexpected response: {text}"
1704        );
1705        let query_json = json!({
1706          "id": "1",
1707          "type": "start",
1708          "payload": {
1709            "query": query,
1710            "variables": {},
1711            "operationName": null
1712          }
1713        });
1714        websocket.send(query_json.to_string().into()).await?;
1715        Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
1716            |message| async {
1717                let text = message.into_text()?;
1718                let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1719                if let Some(errors) = value["payload"].get("errors") {
1720                    bail!("Notification subscription failed: {errors:?}");
1721                }
1722                serde_json::from_value(value["payload"]["data"]["notifications"].clone())
1723                    .context("Failed to deserialize notification")
1724            },
1725        )))
1726    }
1727}
1728
1729/// A running faucet service.
1730pub struct FaucetService {
1731    port: u16,
1732    child: Child,
1733    _temp_dir: tempfile::TempDir,
1734}
1735
1736impl FaucetService {
1737    fn new(port: u16, child: Child, temp_dir: tempfile::TempDir) -> Self {
1738        Self {
1739            port,
1740            child,
1741            _temp_dir: temp_dir,
1742        }
1743    }
1744
1745    pub async fn terminate(mut self) -> Result<()> {
1746        self.child
1747            .kill()
1748            .await
1749            .context("terminating faucet service")
1750    }
1751
1752    pub fn ensure_is_running(&mut self) -> Result<()> {
1753        self.child.ensure_is_running()
1754    }
1755
1756    pub fn instance(&self) -> Faucet {
1757        Faucet::new(format!("http://localhost:{}/", self.port))
1758    }
1759}
1760
1761/// A running `Application` to be queried in GraphQL.
1762pub struct ApplicationWrapper<A> {
1763    uri: String,
1764    _phantom: PhantomData<A>,
1765}
1766
1767impl<A> ApplicationWrapper<A> {
1768    pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
1769        let query = query.as_ref();
1770        let value = self.run_json_query(json!({ "query": query })).await?;
1771        Ok(value["data"].clone())
1772    }
1773
1774    pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
1775        const MAX_RETRIES: usize = 5;
1776
1777        for i in 0.. {
1778            let client = reqwest_client();
1779            let result = client.post(&self.uri).json(&query).send().await;
1780            let response = match result {
1781                Ok(response) => response,
1782                Err(error) if i < MAX_RETRIES => {
1783                    tracing::warn!(
1784                        "Failed to post query \"{}\": {error}; retrying",
1785                        truncate_query_output_serialize(&query),
1786                    );
1787                    continue;
1788                }
1789                Err(error) => {
1790                    let query = truncate_query_output_serialize(&query);
1791                    return Err(error)
1792                        .with_context(|| format!("run_json_query: failed to post query={query}"));
1793                }
1794            };
1795            ensure!(
1796                response.status().is_success(),
1797                "Query \"{}\" failed: {}",
1798                truncate_query_output_serialize(&query),
1799                response
1800                    .text()
1801                    .await
1802                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1803            );
1804            let value: Value = response.json().await.context("invalid JSON")?;
1805            if let Some(errors) = value.get("errors") {
1806                bail!(
1807                    "Query \"{}\" failed: {}",
1808                    truncate_query_output_serialize(&query),
1809                    errors
1810                );
1811            }
1812            return Ok(value);
1813        }
1814        unreachable!()
1815    }
1816
1817    pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
1818        let query = query.as_ref();
1819        self.run_graphql_query(&format!("query {{ {query} }}"))
1820            .await
1821    }
1822
1823    pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
1824        let query = query.as_ref().trim();
1825        let name = query
1826            .split_once(|ch: char| !ch.is_alphanumeric())
1827            .map_or(query, |(name, _)| name);
1828        let data = self.query(query).await?;
1829        serde_json::from_value(data[name].clone())
1830            .with_context(|| format!("{name} field missing in response"))
1831    }
1832
1833    pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
1834        let mutation = mutation.as_ref();
1835        self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
1836            .await
1837    }
1838
1839    pub async fn multiple_mutate(&self, mutations: &[String]) -> Result<Value> {
1840        let mut out = String::from("mutation {\n");
1841        for (index, mutation) in mutations.iter().enumerate() {
1842            out = format!("{}  u{}: {}\n", out, index, mutation);
1843        }
1844        out.push_str("}\n");
1845        self.run_graphql_query(&out).await
1846    }
1847}
1848
1849impl<A> From<String> for ApplicationWrapper<A> {
1850    fn from(uri: String) -> ApplicationWrapper<A> {
1851        ApplicationWrapper {
1852            uri,
1853            _phantom: PhantomData,
1854        }
1855    }
1856}
1857
1858/// Returns the timeout for tests that wait for notifications, either read from the env
1859/// variable `LINERA_TEST_NOTIFICATION_TIMEOUT_MS`, or the default value of 10 seconds.
1860#[cfg(with_testing)]
1861fn notification_timeout() -> Duration {
1862    const NOTIFICATION_TIMEOUT_MS_ENV: &str = "LINERA_TEST_NOTIFICATION_TIMEOUT_MS";
1863    const NOTIFICATION_TIMEOUT_MS_DEFAULT: u64 = 10_000;
1864
1865    match env::var(NOTIFICATION_TIMEOUT_MS_ENV) {
1866        Ok(var) => Duration::from_millis(var.parse().unwrap_or_else(|error| {
1867            panic!("{NOTIFICATION_TIMEOUT_MS_ENV} is not a valid number: {error}")
1868        })),
1869        Err(env::VarError::NotPresent) => Duration::from_millis(NOTIFICATION_TIMEOUT_MS_DEFAULT),
1870        Err(env::VarError::NotUnicode(_)) => {
1871            panic!("{NOTIFICATION_TIMEOUT_MS_ENV} must be valid Unicode")
1872        }
1873    }
1874}
1875
1876#[cfg(with_testing)]
1877pub trait NotificationsExt {
1878    /// Waits for a notification for which `f` returns `Some(t)`, and returns `t`.
1879    fn wait_for<T>(
1880        &mut self,
1881        f: impl FnMut(Notification) -> Option<T>,
1882    ) -> impl Future<Output = Result<T>>;
1883
1884    /// Waits for a `NewEvents` notification for the given block height. If no height is specified,
1885    /// any height is accepted.
1886    fn wait_for_events(
1887        &mut self,
1888        expected_height: impl Into<Option<BlockHeight>>,
1889    ) -> impl Future<Output = Result<BTreeSet<StreamId>>> {
1890        let expected_height = expected_height.into();
1891        self.wait_for(move |notification| {
1892            if let Reason::NewEvents {
1893                height,
1894                event_streams,
1895                ..
1896            } = notification.reason
1897            {
1898                if expected_height.is_none_or(|h| h == height) {
1899                    return Some(event_streams);
1900                }
1901            }
1902            None
1903        })
1904    }
1905
1906    /// Waits for a `NewBlock` notification for the given block height. If no height is specified,
1907    /// any height is accepted.
1908    fn wait_for_block(
1909        &mut self,
1910        expected_height: impl Into<Option<BlockHeight>>,
1911    ) -> impl Future<Output = Result<CryptoHash>> {
1912        let expected_height = expected_height.into();
1913        self.wait_for(move |notification| {
1914            if let Reason::NewBlock { height, hash, .. } = notification.reason {
1915                if expected_height.is_none_or(|h| h == height) {
1916                    return Some(hash);
1917                }
1918            }
1919            None
1920        })
1921    }
1922
1923    /// Waits for a `NewIncomingBundle` notification for the given sender chain and sender block
1924    /// height. If no height is specified, any height is accepted.
1925    fn wait_for_bundle(
1926        &mut self,
1927        expected_origin: ChainId,
1928        expected_height: impl Into<Option<BlockHeight>>,
1929    ) -> impl Future<Output = Result<()>> {
1930        let expected_height = expected_height.into();
1931        self.wait_for(move |notification| {
1932            if let Reason::NewIncomingBundle { height, origin } = notification.reason {
1933                if expected_height.is_none_or(|h| h == height) && origin == expected_origin {
1934                    return Some(());
1935                }
1936            }
1937            None
1938        })
1939    }
1940}
1941
1942#[cfg(with_testing)]
1943impl<S: Stream<Item = Result<Notification>>> NotificationsExt for Pin<Box<S>> {
1944    async fn wait_for<T>(&mut self, mut f: impl FnMut(Notification) -> Option<T>) -> Result<T> {
1945        let mut timeout = Box::pin(linera_base::time::timer::sleep(notification_timeout())).fuse();
1946        loop {
1947            let notification = futures::select! {
1948                () = timeout => bail!("Timeout waiting for notification"),
1949                notification = self.next().fuse() => notification.context("Stream closed")??,
1950            };
1951            if let Some(t) = f(notification) {
1952                return Ok(t);
1953            }
1954        }
1955    }
1956}