linera_service/cli_wrappers/
wallet.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    borrow::Cow,
6    collections::BTreeMap,
7    env,
8    marker::PhantomData,
9    mem,
10    path::{Path, PathBuf},
11    pin::Pin,
12    process::Stdio,
13    str::FromStr,
14    sync,
15    time::Duration,
16};
17
18use anyhow::{bail, ensure, Context, Result};
19use async_graphql::InputType;
20use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
21use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
22use heck::ToKebabCase;
23use linera_base::{
24    abi::ContractAbi,
25    command::{resolve_binary, CommandExt},
26    crypto::{CryptoHash, InMemorySigner},
27    data_types::{Amount, ApplicationPermissions, BlockHeight, Bytecode, Epoch},
28    identifiers::{
29        Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
30    },
31    vm::VmRuntime,
32};
33use linera_client::client_options::ResourceControlPolicyConfig;
34use linera_core::worker::Notification;
35use linera_execution::committee::Committee;
36use linera_faucet_client::Faucet;
37use serde::{de::DeserializeOwned, ser::Serialize};
38use serde_command_opts::to_args;
39use serde_json::{json, Value};
40use tempfile::TempDir;
41use tokio::{
42    io::{AsyncBufReadExt, BufReader},
43    process::{Child, Command},
44    sync::oneshot,
45    task::JoinHandle,
46};
47#[cfg(with_testing)]
48use {
49    futures::FutureExt as _,
50    linera_core::worker::Reason,
51    std::{collections::BTreeSet, future::Future},
52};
53
54use crate::{
55    cli::command::BenchmarkCommand,
56    cli_wrappers::{
57        local_net::{PathProvider, ProcessInbox},
58        Network,
59    },
60    util::{self, ChildExt},
61    wallet::Wallet,
62};
63
64/// The name of the environment variable that allows specifying additional arguments to be passed
65/// to the node-service command of the client.
66const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
67
68fn reqwest_client() -> reqwest::Client {
69    reqwest::ClientBuilder::new()
70        .timeout(Duration::from_secs(30))
71        .build()
72        .unwrap()
73}
74
75/// Wrapper to run a Linera client command.
76pub struct ClientWrapper {
77    binary_path: sync::Mutex<Option<PathBuf>>,
78    testing_prng_seed: Option<u64>,
79    storage: String,
80    wallet: String,
81    keystore: String,
82    max_pending_message_bundles: usize,
83    network: Network,
84    pub path_provider: PathProvider,
85    on_drop: OnClientDrop,
86    extra_args: Vec<String>,
87}
88
89/// Action to perform when the [`ClientWrapper`] is dropped.
90#[derive(Clone, Copy, Debug, Eq, PartialEq)]
91pub enum OnClientDrop {
92    /// Close all the chains on the wallet.
93    CloseChains,
94    /// Do not close any chains, leaving them active.
95    LeakChains,
96}
97
98impl ClientWrapper {
99    pub fn new(
100        path_provider: PathProvider,
101        network: Network,
102        testing_prng_seed: Option<u64>,
103        id: usize,
104        on_drop: OnClientDrop,
105    ) -> Self {
106        Self::new_with_extra_args(
107            path_provider,
108            network,
109            testing_prng_seed,
110            id,
111            on_drop,
112            vec!["--wait-for-outgoing-messages".to_string()],
113            None,
114        )
115    }
116
117    pub fn new_with_extra_args(
118        path_provider: PathProvider,
119        network: Network,
120        testing_prng_seed: Option<u64>,
121        id: usize,
122        on_drop: OnClientDrop,
123        extra_args: Vec<String>,
124        binary_dir: Option<PathBuf>,
125    ) -> Self {
126        let storage = format!(
127            "rocksdb:{}/client_{}.db",
128            path_provider.path().display(),
129            id
130        );
131        let wallet = format!("wallet_{}.json", id);
132        let keystore = format!("keystore_{}.json", id);
133        let binary_path = binary_dir.map(|dir| dir.join("linera"));
134        Self {
135            binary_path: sync::Mutex::new(binary_path),
136            testing_prng_seed,
137            storage,
138            wallet,
139            keystore,
140            max_pending_message_bundles: 10_000,
141            network,
142            path_provider,
143            on_drop,
144            extra_args,
145        }
146    }
147
148    /// Runs `linera project new`.
149    pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
150        let tmp = TempDir::new()?;
151        let mut command = self.command().await?;
152        command
153            .current_dir(tmp.path())
154            .arg("project")
155            .arg("new")
156            .arg(project_name)
157            .arg("--linera-root")
158            .arg(linera_root)
159            .spawn_and_wait_for_stdout()
160            .await?;
161        Ok(tmp)
162    }
163
164    /// Runs `linera project publish`.
165    pub async fn project_publish<T: Serialize>(
166        &self,
167        path: PathBuf,
168        required_application_ids: Vec<String>,
169        publisher: impl Into<Option<ChainId>>,
170        argument: &T,
171    ) -> Result<String> {
172        let json_parameters = serde_json::to_string(&())?;
173        let json_argument = serde_json::to_string(argument)?;
174        let mut command = self.command().await?;
175        command
176            .arg("project")
177            .arg("publish-and-create")
178            .arg(path)
179            .args(publisher.into().iter().map(ChainId::to_string))
180            .args(["--json-parameters", &json_parameters])
181            .args(["--json-argument", &json_argument]);
182        if !required_application_ids.is_empty() {
183            command.arg("--required-application-ids");
184            command.args(required_application_ids);
185        }
186        let stdout = command.spawn_and_wait_for_stdout().await?;
187        Ok(stdout.trim().to_string())
188    }
189
190    /// Runs `linera project test`.
191    pub async fn project_test(&self, path: &Path) -> Result<()> {
192        self.command()
193            .await
194            .context("failed to create project test command")?
195            .current_dir(path)
196            .arg("project")
197            .arg("test")
198            .spawn_and_wait_for_stdout()
199            .await?;
200        Ok(())
201    }
202
203    async fn command_with_envs_and_arguments(
204        &self,
205        envs: &[(&str, &str)],
206        arguments: impl IntoIterator<Item = Cow<'_, str>>,
207    ) -> Result<Command> {
208        let mut command = self.command_binary().await?;
209        command.current_dir(self.path_provider.path());
210        for (key, value) in envs {
211            command.env(key, value);
212        }
213        for argument in arguments {
214            command.arg(&*argument);
215        }
216        Ok(command)
217    }
218
219    async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
220        self.command_with_envs_and_arguments(envs, self.command_arguments())
221            .await
222    }
223
224    async fn command_with_arguments(
225        &self,
226        arguments: impl IntoIterator<Item = Cow<'_, str>>,
227    ) -> Result<Command> {
228        self.command_with_envs_and_arguments(
229            &[(
230                "RUST_LOG",
231                &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
232            )],
233            arguments,
234        )
235        .await
236    }
237
238    async fn command(&self) -> Result<Command> {
239        self.command_with_envs(&[(
240            "RUST_LOG",
241            &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
242        )])
243        .await
244    }
245
246    fn required_command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
247        [
248            "--wallet".into(),
249            self.wallet.as_str().into(),
250            "--keystore".into(),
251            self.keystore.as_str().into(),
252            "--storage".into(),
253            self.storage.as_str().into(),
254            "--send-timeout-ms".into(),
255            "500000".into(),
256            "--recv-timeout-ms".into(),
257            "500000".into(),
258        ]
259        .into_iter()
260        .chain(self.extra_args.iter().map(|s| s.as_str().into()))
261    }
262
263    /// Returns an iterator over the arguments that should be added to all command invocations.
264    fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
265        self.required_command_arguments().chain([
266            "--max-pending-message-bundles".into(),
267            self.max_pending_message_bundles.to_string().into(),
268        ])
269    }
270
271    /// Returns the [`Command`] instance configured to run the appropriate binary.
272    ///
273    /// The path is resolved once and cached inside `self` for subsequent usages.
274    async fn command_binary(&self) -> Result<Command> {
275        match self.command_with_cached_binary_path() {
276            Some(command) => Ok(command),
277            None => {
278                let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
279                let command = Command::new(&resolved_path);
280
281                self.set_cached_binary_path(resolved_path);
282
283                Ok(command)
284            }
285        }
286    }
287
288    /// Returns a [`Command`] instance configured with the cached `binary_path`, if available.
289    fn command_with_cached_binary_path(&self) -> Option<Command> {
290        let binary_path = self.binary_path.lock().unwrap();
291
292        binary_path.as_ref().map(Command::new)
293    }
294
295    /// Sets the cached `binary_path` with the `new_binary_path`.
296    ///
297    /// # Panics
298    ///
299    /// If the cache is already set to a different value. In theory the two threads calling
300    /// `command_binary` can race and resolve the binary path twice, but they should always be the
301    /// same path.
302    fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
303        let mut binary_path = self.binary_path.lock().unwrap();
304
305        if binary_path.is_none() {
306            *binary_path = Some(new_binary_path);
307        } else {
308            assert_eq!(*binary_path, Some(new_binary_path));
309        }
310    }
311
312    /// Runs `linera create-genesis-config`.
313    pub async fn create_genesis_config(
314        &self,
315        num_other_initial_chains: u32,
316        initial_funding: Amount,
317        policy_config: ResourceControlPolicyConfig,
318        http_allow_list: Option<Vec<String>>,
319    ) -> Result<()> {
320        let mut command = self.command().await?;
321        command
322            .args([
323                "create-genesis-config",
324                &num_other_initial_chains.to_string(),
325            ])
326            .args(["--initial-funding", &initial_funding.to_string()])
327            .args(["--committee", "committee.json"])
328            .args(["--genesis", "genesis.json"])
329            .args([
330                "--policy-config",
331                &policy_config.to_string().to_kebab_case(),
332            ]);
333        if let Some(allow_list) = http_allow_list {
334            command
335                .arg("--http-request-allow-list")
336                .arg(allow_list.join(","));
337        }
338        if let Some(seed) = self.testing_prng_seed {
339            command.arg("--testing-prng-seed").arg(seed.to_string());
340        }
341        command.spawn_and_wait_for_stdout().await?;
342        Ok(())
343    }
344
345    /// Runs `linera wallet init`. The genesis config is read from `genesis.json`, or from the
346    /// faucet if provided.
347    pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
348        let mut command = self.command().await?;
349        command.args(["wallet", "init"]);
350        match faucet {
351            None => command.args(["--genesis", "genesis.json"]),
352            Some(faucet) => command.args(["--faucet", faucet.url()]),
353        };
354        if let Some(seed) = self.testing_prng_seed {
355            command.arg("--testing-prng-seed").arg(seed.to_string());
356        }
357        command.spawn_and_wait_for_stdout().await?;
358        Ok(())
359    }
360
361    /// Runs `linera wallet request-chain`.
362    pub async fn request_chain(
363        &self,
364        faucet: &Faucet,
365        set_default: bool,
366    ) -> Result<(ChainId, AccountOwner)> {
367        let mut command = self.command().await?;
368        command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
369        if set_default {
370            command.arg("--set-default");
371        }
372        let stdout = command.spawn_and_wait_for_stdout().await?;
373        let mut lines = stdout.split_whitespace();
374        let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
375        let owner = lines.next().context("missing chain owner")?.parse()?;
376        Ok((chain_id, owner))
377    }
378
379    /// Runs `linera wallet publish-and-create`.
380    #[expect(clippy::too_many_arguments)]
381    pub async fn publish_and_create<
382        A: ContractAbi,
383        Parameters: Serialize,
384        InstantiationArgument: Serialize,
385    >(
386        &self,
387        contract: PathBuf,
388        service: PathBuf,
389        vm_runtime: VmRuntime,
390        parameters: &Parameters,
391        argument: &InstantiationArgument,
392        required_application_ids: &[ApplicationId],
393        publisher: impl Into<Option<ChainId>>,
394    ) -> Result<ApplicationId<A>> {
395        let json_parameters = serde_json::to_string(parameters)?;
396        let json_argument = serde_json::to_string(argument)?;
397        let mut command = self.command().await?;
398        let vm_runtime = format!("{}", vm_runtime);
399        command
400            .arg("publish-and-create")
401            .args([contract, service])
402            .args(["--vm-runtime", &vm_runtime.to_lowercase()])
403            .args(publisher.into().iter().map(ChainId::to_string))
404            .args(["--json-parameters", &json_parameters])
405            .args(["--json-argument", &json_argument]);
406        if !required_application_ids.is_empty() {
407            command.arg("--required-application-ids");
408            command.args(
409                required_application_ids
410                    .iter()
411                    .map(ApplicationId::to_string),
412            );
413        }
414        let stdout = command.spawn_and_wait_for_stdout().await?;
415        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
416    }
417
418    /// Runs `linera publish-module`.
419    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
420        &self,
421        contract: PathBuf,
422        service: PathBuf,
423        vm_runtime: VmRuntime,
424        publisher: impl Into<Option<ChainId>>,
425    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
426        let stdout = self
427            .command()
428            .await?
429            .arg("publish-module")
430            .args([contract, service])
431            .args(["--vm-runtime", &format!("{}", vm_runtime).to_lowercase()])
432            .args(publisher.into().iter().map(ChainId::to_string))
433            .spawn_and_wait_for_stdout()
434            .await?;
435        let module_id: ModuleId = stdout.trim().parse()?;
436        Ok(module_id.with_abi())
437    }
438
439    /// Runs `linera create-application`.
440    pub async fn create_application<
441        Abi: ContractAbi,
442        Parameters: Serialize,
443        InstantiationArgument: Serialize,
444    >(
445        &self,
446        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
447        parameters: &Parameters,
448        argument: &InstantiationArgument,
449        required_application_ids: &[ApplicationId],
450        creator: impl Into<Option<ChainId>>,
451    ) -> Result<ApplicationId<Abi>> {
452        let json_parameters = serde_json::to_string(parameters)?;
453        let json_argument = serde_json::to_string(argument)?;
454        let mut command = self.command().await?;
455        command
456            .arg("create-application")
457            .arg(module_id.forget_abi().to_string())
458            .args(["--json-parameters", &json_parameters])
459            .args(["--json-argument", &json_argument])
460            .args(creator.into().iter().map(ChainId::to_string));
461        if !required_application_ids.is_empty() {
462            command.arg("--required-application-ids");
463            command.args(
464                required_application_ids
465                    .iter()
466                    .map(ApplicationId::to_string),
467            );
468        }
469        let stdout = command.spawn_and_wait_for_stdout().await?;
470        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
471    }
472
473    /// Runs `linera service`.
474    pub async fn run_node_service(
475        &self,
476        port: impl Into<Option<u16>>,
477        process_inbox: ProcessInbox,
478    ) -> Result<NodeService> {
479        self.run_node_service_with_options(port, process_inbox, &[], &[], false)
480            .await
481    }
482
483    /// Runs `linera service` with optional task processor configuration.
484    pub async fn run_node_service_with_options(
485        &self,
486        port: impl Into<Option<u16>>,
487        process_inbox: ProcessInbox,
488        operator_application_ids: &[ApplicationId],
489        operators: &[(String, PathBuf)],
490        read_only: bool,
491    ) -> Result<NodeService> {
492        let port = port.into().unwrap_or(8080);
493        let mut command = self.command().await?;
494        command.arg("service");
495        if let ProcessInbox::Skip = process_inbox {
496            command.arg("--listener-skip-process-inbox");
497        }
498        if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
499            command.args(var.split_whitespace());
500        }
501        for app_id in operator_application_ids {
502            command.args(["--operator-application-ids", &app_id.to_string()]);
503        }
504        for (name, path) in operators {
505            command.args(["--operators", &format!("{}={}", name, path.display())]);
506        }
507        if read_only {
508            command.arg("--read-only");
509        }
510        let child = command
511            .args(["--port".to_string(), port.to_string()])
512            .spawn_into()?;
513        let client = reqwest_client();
514        for i in 0..10 {
515            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
516            let request = client
517                .get(format!("http://localhost:{}/", port))
518                .send()
519                .await;
520            if request.is_ok() {
521                tracing::info!("Node service has started");
522                return Ok(NodeService::new(port, child));
523            } else {
524                tracing::warn!("Waiting for node service to start");
525            }
526        }
527        bail!("Failed to start node service");
528    }
529
530    /// Runs `linera service` with a controller application.
531    pub async fn run_node_service_with_controller(
532        &self,
533        port: impl Into<Option<u16>>,
534        process_inbox: ProcessInbox,
535        controller_id: &ApplicationId,
536        operators: &[(String, PathBuf)],
537    ) -> Result<NodeService> {
538        let port = port.into().unwrap_or(8080);
539        let mut command = self.command().await?;
540        command.arg("service");
541        if let ProcessInbox::Skip = process_inbox {
542            command.arg("--listener-skip-process-inbox");
543        }
544        if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
545            command.args(var.split_whitespace());
546        }
547        command.args(["--controller-id", &controller_id.to_string()]);
548        for (name, path) in operators {
549            command.args(["--operators", &format!("{}={}", name, path.display())]);
550        }
551        let child = command
552            .args(["--port".to_string(), port.to_string()])
553            .spawn_into()?;
554        let client = reqwest_client();
555        for i in 0..10 {
556            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
557            let request = client
558                .get(format!("http://localhost:{}/", port))
559                .send()
560                .await;
561            if request.is_ok() {
562                tracing::info!("Node service has started");
563                return Ok(NodeService::new(port, child));
564            } else {
565                tracing::warn!("Waiting for node service to start");
566            }
567        }
568        bail!("Failed to start node service");
569    }
570
571    /// Runs `linera validator query`
572    pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
573        let mut command = self.command().await?;
574        command.arg("validator").arg("query").arg(address);
575        let stdout = command.spawn_and_wait_for_stdout().await?;
576
577        // Parse the genesis config hash from the output.
578        // It's on a line like "Genesis config hash: <hash>"
579        let hash = stdout
580            .lines()
581            .find_map(|line| {
582                line.strip_prefix("Genesis config hash: ")
583                    .and_then(|hash_str| hash_str.trim().parse().ok())
584            })
585            .context("error while parsing the result of `linera validator query`")?;
586        Ok(hash)
587    }
588
589    /// Runs `linera validator list`.
590    pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
591        let mut command = self.command().await?;
592        command.arg("validator").arg("list");
593        if let Some(chain_id) = chain_id {
594            command.args(["--chain-id", &chain_id.to_string()]);
595        }
596        command.spawn_and_wait_for_stdout().await?;
597        Ok(())
598    }
599
600    /// Runs `linera sync-validator`.
601    pub async fn sync_validator(
602        &self,
603        chain_ids: impl IntoIterator<Item = &ChainId>,
604        validator_address: impl Into<String>,
605    ) -> Result<()> {
606        let mut command = self.command().await?;
607        command
608            .arg("validator")
609            .arg("sync")
610            .arg(validator_address.into());
611        let mut chain_ids = chain_ids.into_iter().peekable();
612        if chain_ids.peek().is_some() {
613            command
614                .arg("--chains")
615                .args(chain_ids.map(ChainId::to_string));
616        }
617        command.spawn_and_wait_for_stdout().await?;
618        Ok(())
619    }
620
621    /// Runs `linera faucet`.
622    pub async fn run_faucet(
623        &self,
624        port: impl Into<Option<u16>>,
625        chain_id: Option<ChainId>,
626        amount: Amount,
627    ) -> Result<FaucetService> {
628        let port = port.into().unwrap_or(8080);
629        let temp_dir = tempfile::tempdir()
630            .context("Failed to create temporary directory for faucet storage")?;
631        let storage_path = temp_dir.path().join("faucet_storage.sqlite");
632        let mut command = self.command().await?;
633        let command = command
634            .arg("faucet")
635            .args(["--port".to_string(), port.to_string()])
636            .args(["--amount".to_string(), amount.to_string()])
637            .args([
638                "--storage-path".to_string(),
639                storage_path.to_string_lossy().to_string(),
640            ]);
641        if let Some(chain_id) = chain_id {
642            command.arg(chain_id.to_string());
643        }
644        let child = command.spawn_into()?;
645        let client = reqwest_client();
646        for i in 0..10 {
647            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
648            let request = client
649                .get(format!("http://localhost:{}/", port))
650                .send()
651                .await;
652            if request.is_ok() {
653                tracing::info!("Faucet has started");
654                return Ok(FaucetService::new(port, child, temp_dir));
655            } else {
656                tracing::debug!("Waiting for faucet to start");
657            }
658        }
659        bail!("Failed to start faucet");
660    }
661
662    /// Runs `linera local-balance`.
663    pub async fn local_balance(&self, account: Account) -> Result<Amount> {
664        let stdout = self
665            .command()
666            .await?
667            .arg("local-balance")
668            .arg(account.to_string())
669            .spawn_and_wait_for_stdout()
670            .await?;
671        let amount = stdout
672            .trim()
673            .parse()
674            .context("error while parsing the result of `linera local-balance`")?;
675        Ok(amount)
676    }
677
678    /// Runs `linera query-balance`.
679    pub async fn query_balance(&self, account: Account) -> Result<Amount> {
680        let stdout = self
681            .command()
682            .await?
683            .arg("query-balance")
684            .arg(account.to_string())
685            .spawn_and_wait_for_stdout()
686            .await?;
687        let amount = stdout
688            .trim()
689            .parse()
690            .context("error while parsing the result of `linera query-balance`")?;
691        Ok(amount)
692    }
693
694    /// Runs `linera sync`.
695    pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
696        self.command()
697            .await?
698            .arg("sync")
699            .arg(chain_id.to_string())
700            .spawn_and_wait_for_stdout()
701            .await?;
702        Ok(())
703    }
704
705    /// Runs `linera process-inbox`.
706    pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
707        self.command()
708            .await?
709            .arg("process-inbox")
710            .arg(chain_id.to_string())
711            .spawn_and_wait_for_stdout()
712            .await?;
713        Ok(())
714    }
715
716    /// Runs `linera transfer`.
717    pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
718        self.command()
719            .await?
720            .arg("transfer")
721            .arg(amount.to_string())
722            .args(["--from", &from.to_string()])
723            .args(["--to", &to.to_string()])
724            .spawn_and_wait_for_stdout()
725            .await?;
726        Ok(())
727    }
728
729    /// Runs `linera transfer` with no logging.
730    pub async fn transfer_with_silent_logs(
731        &self,
732        amount: Amount,
733        from: ChainId,
734        to: ChainId,
735    ) -> Result<()> {
736        self.command()
737            .await?
738            .env("RUST_LOG", "off")
739            .arg("transfer")
740            .arg(amount.to_string())
741            .args(["--from", &from.to_string()])
742            .args(["--to", &to.to_string()])
743            .spawn_and_wait_for_stdout()
744            .await?;
745        Ok(())
746    }
747
748    /// Runs `linera transfer` with owner accounts.
749    pub async fn transfer_with_accounts(
750        &self,
751        amount: Amount,
752        from: Account,
753        to: Account,
754    ) -> Result<()> {
755        self.command()
756            .await?
757            .arg("transfer")
758            .arg(amount.to_string())
759            .args(["--from", &from.to_string()])
760            .args(["--to", &to.to_string()])
761            .spawn_and_wait_for_stdout()
762            .await?;
763        Ok(())
764    }
765
766    fn benchmark_command_internal(command: &mut Command, args: BenchmarkCommand) -> Result<()> {
767        let mut formatted_args = to_args(&args)?;
768        let subcommand = formatted_args.remove(0);
769        // The subcommand is followed by the flattened options, which are preceded by "options".
770        // So remove that as well.
771        formatted_args.remove(0);
772        let options = formatted_args
773            .chunks_exact(2)
774            .flat_map(|pair| {
775                let option = format!("--{}", pair[0]);
776                match pair[1].as_str() {
777                    "true" => vec![option],
778                    "false" => vec![],
779                    _ => vec![option, pair[1].clone()],
780                }
781            })
782            .collect::<Vec<_>>();
783        command
784            .args([
785                "--max-pending-message-bundles",
786                &args.transactions_per_block().to_string(),
787            ])
788            .arg("benchmark")
789            .arg(subcommand)
790            .args(options);
791        Ok(())
792    }
793
794    async fn benchmark_command_with_envs(
795        &self,
796        args: BenchmarkCommand,
797        envs: &[(&str, &str)],
798    ) -> Result<Command> {
799        let mut command = self
800            .command_with_envs_and_arguments(envs, self.required_command_arguments())
801            .await?;
802        Self::benchmark_command_internal(&mut command, args)?;
803        Ok(command)
804    }
805
806    async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
807        let mut command = self
808            .command_with_arguments(self.required_command_arguments())
809            .await?;
810        Self::benchmark_command_internal(&mut command, args)?;
811        Ok(command)
812    }
813
814    /// Runs `linera benchmark`.
815    pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
816        let mut command = self.benchmark_command(args).await?;
817        command.spawn_and_wait_for_stdout().await?;
818        Ok(())
819    }
820
821    /// Runs `linera benchmark`, but detached: don't wait for the command to finish, just spawn it
822    /// and return the child process, and the handles to the stdout and stderr.
823    pub async fn benchmark_detached(
824        &self,
825        args: BenchmarkCommand,
826        tx: oneshot::Sender<()>,
827    ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
828        let mut child = self
829            .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
830            .await?
831            .kill_on_drop(true)
832            .stdin(Stdio::piped())
833            .stdout(Stdio::piped())
834            .stderr(Stdio::piped())
835            .spawn()?;
836
837        let pid = child.id().expect("failed to get pid");
838        let stdout = child.stdout.take().expect("stdout not open");
839        let stdout_handle = tokio::spawn(async move {
840            let mut lines = BufReader::new(stdout).lines();
841            while let Ok(Some(line)) = lines.next_line().await {
842                println!("benchmark{{pid={pid}}} {line}");
843            }
844        });
845
846        let stderr = child.stderr.take().expect("stderr not open");
847        let stderr_handle = tokio::spawn(async move {
848            let mut lines = BufReader::new(stderr).lines();
849            let mut tx = Some(tx);
850            while let Ok(Some(line)) = lines.next_line().await {
851                if line.contains("Ready to start benchmark") {
852                    tx.take()
853                        .expect("Should only send signal once")
854                        .send(())
855                        .expect("failed to send ready signal to main thread");
856                } else {
857                    println!("benchmark{{pid={pid}}} {line}");
858                }
859            }
860        });
861        Ok((child, stdout_handle, stderr_handle))
862    }
863
864    async fn open_chain_internal(
865        &self,
866        from: ChainId,
867        owner: Option<AccountOwner>,
868        initial_balance: Amount,
869        super_owner: bool,
870    ) -> Result<(ChainId, AccountOwner)> {
871        let mut command = self.command().await?;
872        command
873            .arg("open-chain")
874            .args(["--from", &from.to_string()])
875            .args(["--initial-balance", &initial_balance.to_string()]);
876
877        if let Some(owner) = owner {
878            command.args(["--owner", &owner.to_string()]);
879        }
880
881        if super_owner {
882            command.arg("--super-owner");
883        }
884
885        let stdout = command.spawn_and_wait_for_stdout().await?;
886        let mut split = stdout.split('\n');
887        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
888        let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
889        if let Some(owner) = owner {
890            assert_eq!(owner, new_owner);
891        }
892        Ok((chain_id, new_owner))
893    }
894
895    /// Runs `linera open-chain --super-owner`.
896    pub async fn open_chain_super_owner(
897        &self,
898        from: ChainId,
899        owner: Option<AccountOwner>,
900        initial_balance: Amount,
901    ) -> Result<(ChainId, AccountOwner)> {
902        self.open_chain_internal(from, owner, initial_balance, true)
903            .await
904    }
905
906    /// Runs `linera open-chain`.
907    pub async fn open_chain(
908        &self,
909        from: ChainId,
910        owner: Option<AccountOwner>,
911        initial_balance: Amount,
912    ) -> Result<(ChainId, AccountOwner)> {
913        self.open_chain_internal(from, owner, initial_balance, false)
914            .await
915    }
916
917    /// Runs `linera open-chain` then `linera assign`.
918    pub async fn open_and_assign(
919        &self,
920        client: &ClientWrapper,
921        initial_balance: Amount,
922    ) -> Result<ChainId> {
923        let our_chain = self
924            .load_wallet()?
925            .default_chain()
926            .context("no default chain found")?;
927        let owner = client.keygen().await?;
928        let (new_chain, _) = self
929            .open_chain(our_chain, Some(owner), initial_balance)
930            .await?;
931        client.assign(owner, new_chain).await?;
932        Ok(new_chain)
933    }
934
935    pub async fn open_multi_owner_chain(
936        &self,
937        from: ChainId,
938        owners: BTreeMap<AccountOwner, u64>,
939        multi_leader_rounds: u32,
940        balance: Amount,
941        base_timeout_ms: u64,
942    ) -> Result<ChainId> {
943        let mut command = self.command().await?;
944        command
945            .arg("open-multi-owner-chain")
946            .args(["--from", &from.to_string()])
947            .arg("--owners")
948            .arg(serde_json::to_string(&owners)?)
949            .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
950        command
951            .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
952            .args(["--initial-balance", &balance.to_string()]);
953
954        let stdout = command.spawn_and_wait_for_stdout().await?;
955        let mut split = stdout.split('\n');
956        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
957
958        Ok(chain_id)
959    }
960
961    pub async fn change_ownership(
962        &self,
963        chain_id: ChainId,
964        super_owners: Vec<AccountOwner>,
965        owners: Vec<AccountOwner>,
966    ) -> Result<()> {
967        let mut command = self.command().await?;
968        command
969            .arg("change-ownership")
970            .args(["--chain-id", &chain_id.to_string()]);
971        command
972            .arg("--super-owners")
973            .arg(serde_json::to_string(&super_owners)?);
974        command.arg("--owners").arg(serde_json::to_string(
975            &owners
976                .into_iter()
977                .zip(std::iter::repeat(100u64))
978                .collect::<BTreeMap<_, _>>(),
979        )?);
980        command.spawn_and_wait_for_stdout().await?;
981        Ok(())
982    }
983
984    pub async fn change_application_permissions(
985        &self,
986        chain_id: ChainId,
987        application_permissions: ApplicationPermissions,
988    ) -> Result<()> {
989        let mut command = self.command().await?;
990        command
991            .arg("change-application-permissions")
992            .args(["--chain-id", &chain_id.to_string()]);
993        command.arg("--manage-chain").arg(serde_json::to_string(
994            &application_permissions.manage_chain,
995        )?);
996        // TODO: add other fields
997        command.spawn_and_wait_for_stdout().await?;
998        Ok(())
999    }
1000
1001    /// Runs `linera wallet follow-chain CHAIN_ID`.
1002    pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
1003        let mut command = self.command().await?;
1004        command
1005            .args(["wallet", "follow-chain"])
1006            .arg(chain_id.to_string());
1007        if sync {
1008            command.arg("--sync");
1009        }
1010        command.spawn_and_wait_for_stdout().await?;
1011        Ok(())
1012    }
1013
1014    /// Runs `linera wallet forget-chain CHAIN_ID`.
1015    pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
1016        let mut command = self.command().await?;
1017        command
1018            .args(["wallet", "forget-chain"])
1019            .arg(chain_id.to_string());
1020        command.spawn_and_wait_for_stdout().await?;
1021        Ok(())
1022    }
1023
1024    /// Runs `linera wallet set-default CHAIN_ID`.
1025    pub async fn set_default_chain(&self, chain_id: ChainId) -> Result<()> {
1026        let mut command = self.command().await?;
1027        command
1028            .args(["wallet", "set-default"])
1029            .arg(chain_id.to_string());
1030        command.spawn_and_wait_for_stdout().await?;
1031        Ok(())
1032    }
1033
1034    pub async fn retry_pending_block(
1035        &self,
1036        chain_id: Option<ChainId>,
1037    ) -> Result<Option<CryptoHash>> {
1038        let mut command = self.command().await?;
1039        command.arg("retry-pending-block");
1040        if let Some(chain_id) = chain_id {
1041            command.arg(chain_id.to_string());
1042        }
1043        let stdout = command.spawn_and_wait_for_stdout().await?;
1044        let stdout = stdout.trim();
1045        if stdout.is_empty() {
1046            Ok(None)
1047        } else {
1048            Ok(Some(CryptoHash::from_str(stdout)?))
1049        }
1050    }
1051
1052    /// Runs `linera publish-data-blob`.
1053    pub async fn publish_data_blob(
1054        &self,
1055        path: &Path,
1056        chain_id: Option<ChainId>,
1057    ) -> Result<CryptoHash> {
1058        let mut command = self.command().await?;
1059        command.arg("publish-data-blob").arg(path);
1060        if let Some(chain_id) = chain_id {
1061            command.arg(chain_id.to_string());
1062        }
1063        let stdout = command.spawn_and_wait_for_stdout().await?;
1064        let stdout = stdout.trim();
1065        Ok(CryptoHash::from_str(stdout)?)
1066    }
1067
1068    /// Runs `linera read-data-blob`.
1069    pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
1070        let mut command = self.command().await?;
1071        command.arg("read-data-blob").arg(hash.to_string());
1072        if let Some(chain_id) = chain_id {
1073            command.arg(chain_id.to_string());
1074        }
1075        command.spawn_and_wait_for_stdout().await?;
1076        Ok(())
1077    }
1078
1079    pub fn load_wallet(&self) -> Result<Wallet> {
1080        Ok(Wallet::read(&self.wallet_path())?)
1081    }
1082
1083    pub fn load_keystore(&self) -> Result<InMemorySigner> {
1084        util::read_json(self.keystore_path())
1085    }
1086
1087    pub fn wallet_path(&self) -> PathBuf {
1088        self.path_provider.path().join(&self.wallet)
1089    }
1090
1091    pub fn keystore_path(&self) -> PathBuf {
1092        self.path_provider.path().join(&self.keystore)
1093    }
1094
1095    pub fn storage_path(&self) -> &str {
1096        &self.storage
1097    }
1098
1099    pub fn get_owner(&self) -> Option<AccountOwner> {
1100        let wallet = self.load_wallet().ok()?;
1101        wallet
1102            .get(wallet.default_chain()?)
1103            .expect("default chain must be in wallet")
1104            .owner
1105    }
1106
1107    pub fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
1108        self.load_wallet()
1109            .ok()
1110            .is_some_and(|wallet| wallet.get(chain).is_some())
1111    }
1112
1113    pub async fn set_validator(
1114        &self,
1115        validator_key: &(String, String),
1116        port: usize,
1117        votes: usize,
1118    ) -> Result<()> {
1119        let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1120        self.command()
1121            .await?
1122            .arg("validator")
1123            .arg("add")
1124            .args(["--public-key", &validator_key.0])
1125            .args(["--account-key", &validator_key.1])
1126            .args(["--address", &address])
1127            .args(["--votes", &votes.to_string()])
1128            .spawn_and_wait_for_stdout()
1129            .await?;
1130        Ok(())
1131    }
1132
1133    pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
1134        self.command()
1135            .await?
1136            .arg("validator")
1137            .arg("remove")
1138            .args(["--public-key", validator_key])
1139            .spawn_and_wait_for_stdout()
1140            .await?;
1141        Ok(())
1142    }
1143
1144    pub async fn change_validators(
1145        &self,
1146        add_validators: &[(String, String, usize, usize)], // (public_key, account_key, port, votes)
1147        modify_validators: &[(String, String, usize, usize)], // (public_key, account_key, port, votes)
1148        remove_validators: &[String],
1149    ) -> Result<()> {
1150        use std::str::FromStr;
1151
1152        use linera_base::crypto::{AccountPublicKey, ValidatorPublicKey};
1153
1154        // Build a map that will be serialized to JSON
1155        // Use the exact types that deserialization expects
1156        let mut changes = std::collections::HashMap::new();
1157
1158        // Add/modify validators
1159        for (public_key_str, account_key_str, port, votes) in
1160            add_validators.iter().chain(modify_validators.iter())
1161        {
1162            let public_key = ValidatorPublicKey::from_str(public_key_str)
1163                .with_context(|| format!("Invalid validator public key: {}", public_key_str))?;
1164
1165            let account_key = AccountPublicKey::from_str(account_key_str)
1166                .with_context(|| format!("Invalid account public key: {}", account_key_str))?;
1167
1168            let address = format!("{}:127.0.0.1:{}", self.network.short(), port)
1169                .parse()
1170                .unwrap();
1171
1172            // Create ValidatorChange struct
1173            let change = crate::cli::validator::Change {
1174                account_key,
1175                address,
1176                votes: crate::cli::validator::Votes(
1177                    std::num::NonZero::new(*votes as u64).context("Votes must be non-zero")?,
1178                ),
1179            };
1180
1181            changes.insert(public_key, Some(change));
1182        }
1183
1184        // Remove validators (set to None)
1185        for validator_key_str in remove_validators {
1186            let public_key = ValidatorPublicKey::from_str(validator_key_str)
1187                .with_context(|| format!("Invalid validator public key: {}", validator_key_str))?;
1188            changes.insert(public_key, None);
1189        }
1190
1191        // Create temporary file with JSON
1192        let temp_file = tempfile::NamedTempFile::new()
1193            .context("Failed to create temporary file for validator changes")?;
1194        serde_json::to_writer(&temp_file, &changes)
1195            .context("Failed to write validator changes to file")?;
1196        let temp_path = temp_file.path();
1197
1198        self.command()
1199            .await?
1200            .arg("validator")
1201            .arg("update")
1202            .arg(temp_path)
1203            .arg("--yes") // Skip confirmation prompt
1204            .spawn_and_wait_for_stdout()
1205            .await?;
1206
1207        Ok(())
1208    }
1209
1210    pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
1211        self.command()
1212            .await?
1213            .arg("revoke-epochs")
1214            .arg(epoch.to_string())
1215            .spawn_and_wait_for_stdout()
1216            .await?;
1217        Ok(())
1218    }
1219
1220    /// Runs `linera keygen`.
1221    pub async fn keygen(&self) -> Result<AccountOwner> {
1222        let stdout = self
1223            .command()
1224            .await?
1225            .arg("keygen")
1226            .spawn_and_wait_for_stdout()
1227            .await?;
1228        AccountOwner::from_str(stdout.as_str().trim())
1229    }
1230
1231    /// Returns the default chain.
1232    pub fn default_chain(&self) -> Option<ChainId> {
1233        self.load_wallet().ok()?.default_chain()
1234    }
1235
1236    /// Runs `linera assign`.
1237    pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
1238        let _stdout = self
1239            .command()
1240            .await?
1241            .arg("assign")
1242            .args(["--owner", &owner.to_string()])
1243            .args(["--chain-id", &chain_id.to_string()])
1244            .spawn_and_wait_for_stdout()
1245            .await?;
1246        Ok(())
1247    }
1248
1249    /// Runs `linera set-preferred-owner` for `chain_id`.
1250    pub async fn set_preferred_owner(
1251        &self,
1252        chain_id: ChainId,
1253        owner: Option<AccountOwner>,
1254    ) -> Result<()> {
1255        let mut owner_arg = vec!["--owner".to_string()];
1256        if let Some(owner) = owner {
1257            owner_arg.push(owner.to_string());
1258        };
1259        self.command()
1260            .await?
1261            .arg("set-preferred-owner")
1262            .args(["--chain-id", &chain_id.to_string()])
1263            .args(owner_arg)
1264            .spawn_and_wait_for_stdout()
1265            .await?;
1266        Ok(())
1267    }
1268
1269    pub async fn build_application(
1270        &self,
1271        path: &Path,
1272        name: &str,
1273        is_workspace: bool,
1274    ) -> Result<(PathBuf, PathBuf)> {
1275        Command::new("cargo")
1276            .current_dir(self.path_provider.path())
1277            .arg("build")
1278            .arg("--release")
1279            .args(["--target", "wasm32-unknown-unknown"])
1280            .arg("--manifest-path")
1281            .arg(path.join("Cargo.toml"))
1282            .spawn_and_wait_for_stdout()
1283            .await?;
1284
1285        let release_dir = match is_workspace {
1286            true => path.join("../target/wasm32-unknown-unknown/release"),
1287            false => path.join("target/wasm32-unknown-unknown/release"),
1288        };
1289
1290        let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1291        let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1292
1293        let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1294        let service_size = fs_err::tokio::metadata(&service).await?.len();
1295        tracing::info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1296
1297        Ok((contract, service))
1298    }
1299}
1300
1301impl Drop for ClientWrapper {
1302    fn drop(&mut self) {
1303        use std::process::Command as SyncCommand;
1304
1305        if self.on_drop != OnClientDrop::CloseChains {
1306            return;
1307        }
1308
1309        let Ok(binary_path) = self.binary_path.lock() else {
1310            tracing::error!(
1311                "Failed to close chains because a thread panicked with a lock to `binary_path`"
1312            );
1313            return;
1314        };
1315
1316        let Some(binary_path) = binary_path.as_ref() else {
1317            tracing::warn!(
1318                "Assuming no chains need to be closed, because the command binary was never \
1319                resolved and therefore presumably never called"
1320            );
1321            return;
1322        };
1323
1324        let working_directory = self.path_provider.path();
1325        let mut wallet_show_command = SyncCommand::new(binary_path);
1326
1327        for argument in self.command_arguments() {
1328            wallet_show_command.arg(&*argument);
1329        }
1330
1331        let Ok(wallet_show_output) = wallet_show_command
1332            .current_dir(working_directory)
1333            .args(["wallet", "show", "--short", "--owned"])
1334            .output()
1335        else {
1336            tracing::warn!("Failed to execute `wallet show --short` to list chains to close");
1337            return;
1338        };
1339
1340        if !wallet_show_output.status.success() {
1341            tracing::warn!("Failed to list chains in the wallet to close them");
1342            return;
1343        }
1344
1345        let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1346            tracing::warn!(
1347                "Failed to close chains because `linera wallet show --short` \
1348                returned a non-UTF-8 output"
1349            );
1350            return;
1351        };
1352
1353        let chain_ids = chain_list_string
1354            .split('\n')
1355            .map(|line| line.trim())
1356            .filter(|line| !line.is_empty());
1357
1358        for chain_id in chain_ids {
1359            let mut close_chain_command = SyncCommand::new(binary_path);
1360
1361            for argument in self.command_arguments() {
1362                close_chain_command.arg(&*argument);
1363            }
1364
1365            close_chain_command.current_dir(working_directory);
1366
1367            match close_chain_command.args(["close-chain", chain_id]).status() {
1368                Ok(status) if status.success() => (),
1369                Ok(failure) => tracing::warn!("Failed to close chain {chain_id}: {failure}"),
1370                Err(error) => tracing::warn!("Failed to close chain {chain_id}: {error}"),
1371            }
1372        }
1373    }
1374}
1375
1376#[cfg(with_testing)]
1377impl ClientWrapper {
1378    pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1379        self.build_application(Self::example_path(name)?.as_path(), name, true)
1380            .await
1381    }
1382
1383    pub fn example_path(name: &str) -> Result<PathBuf> {
1384        Ok(env::current_dir()?.join("../examples/").join(name))
1385    }
1386}
1387
1388fn truncate_query_output(input: &str) -> String {
1389    let max_len = 1000;
1390    if input.len() < max_len {
1391        input.to_string()
1392    } else {
1393        format!("{} ...", input.get(..max_len).unwrap())
1394    }
1395}
1396
1397fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1398    let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1399    let max_len = 200;
1400    if query.len() < max_len {
1401        query
1402    } else {
1403        format!("{} ...", query.get(..max_len).unwrap())
1404    }
1405}
1406
1407/// A running node service.
1408pub struct NodeService {
1409    port: u16,
1410    child: Child,
1411}
1412
1413impl NodeService {
1414    fn new(port: u16, child: Child) -> Self {
1415        Self { port, child }
1416    }
1417
1418    pub async fn terminate(mut self) -> Result<()> {
1419        self.child.kill().await.context("terminating node service")
1420    }
1421
1422    pub fn port(&self) -> u16 {
1423        self.port
1424    }
1425
1426    pub fn ensure_is_running(&mut self) -> Result<()> {
1427        self.child.ensure_is_running()
1428    }
1429
1430    pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1431        let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1432        let mut data = self.query_node(query).await?;
1433        Ok(serde_json::from_value(data["processInbox"].take())?)
1434    }
1435
1436    pub async fn sync(&self, chain_id: &ChainId) -> Result<u64> {
1437        let query = format!("mutation {{ sync(chainId: \"{chain_id}\") }}");
1438        let mut data = self.query_node(query).await?;
1439        Ok(serde_json::from_value(data["sync"].take())?)
1440    }
1441
1442    pub async fn transfer(
1443        &self,
1444        chain_id: ChainId,
1445        owner: AccountOwner,
1446        recipient: Account,
1447        amount: Amount,
1448    ) -> Result<CryptoHash> {
1449        let json_owner = owner.to_value();
1450        let json_recipient = recipient.to_value();
1451        let query = format!(
1452            "mutation {{ transfer(\
1453                 chainId: \"{chain_id}\", \
1454                 owner: {json_owner}, \
1455                 recipient: {json_recipient}, \
1456                 amount: \"{amount}\") \
1457             }}"
1458        );
1459        let data = self.query_node(query).await?;
1460        serde_json::from_value(data["transfer"].clone())
1461            .context("missing transfer field in response")
1462    }
1463
1464    pub async fn balance(&self, account: &Account) -> Result<Amount> {
1465        let chain = account.chain_id;
1466        let owner = account.owner;
1467        if matches!(owner, AccountOwner::CHAIN) {
1468            let query = format!(
1469                "query {{ chain(chainId:\"{chain}\") {{
1470                    executionState {{ system {{ balance }} }}
1471                }} }}"
1472            );
1473            let response = self.query_node(query).await?;
1474            let balance = &response["chain"]["executionState"]["system"]["balance"]
1475                .as_str()
1476                .unwrap();
1477            return Ok(Amount::from_str(balance)?);
1478        }
1479        let query = format!(
1480            "query {{ chain(chainId:\"{chain}\") {{
1481                executionState {{ system {{ balances {{
1482                    entry(key:\"{owner}\") {{ value }}
1483                }} }} }}
1484            }} }}"
1485        );
1486        let response = self.query_node(query).await?;
1487        let balances = &response["chain"]["executionState"]["system"]["balances"];
1488        let balance = balances["entry"]["value"].as_str();
1489        match balance {
1490            None => Ok(Amount::ZERO),
1491            Some(amount) => Ok(Amount::from_str(amount)?),
1492        }
1493    }
1494
1495    pub fn make_application<A: ContractAbi>(
1496        &self,
1497        chain_id: &ChainId,
1498        application_id: &ApplicationId<A>,
1499    ) -> Result<ApplicationWrapper<A>> {
1500        let application_id = application_id.forget_abi().to_string();
1501        let link = format!(
1502            "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1503            self.port
1504        );
1505        Ok(ApplicationWrapper::from(link))
1506    }
1507
1508    pub async fn publish_data_blob(
1509        &self,
1510        chain_id: &ChainId,
1511        bytes: Vec<u8>,
1512    ) -> Result<CryptoHash> {
1513        let query = format!(
1514            "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1515            chain_id.to_value(),
1516            bytes.to_value(),
1517        );
1518        let data = self.query_node(query).await?;
1519        serde_json::from_value(data["publishDataBlob"].clone())
1520            .context("missing publishDataBlob field in response")
1521    }
1522
1523    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1524        &self,
1525        chain_id: &ChainId,
1526        contract: PathBuf,
1527        service: PathBuf,
1528        vm_runtime: VmRuntime,
1529    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1530        let contract_code = Bytecode::load_from_file(&contract).await?;
1531        let service_code = Bytecode::load_from_file(&service).await?;
1532        let query = format!(
1533            "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1534            chain_id.to_value(),
1535            contract_code.to_value(),
1536            service_code.to_value(),
1537            vm_runtime.to_value(),
1538        );
1539        let data = self.query_node(query).await?;
1540        let module_str = data["publishModule"]
1541            .as_str()
1542            .context("module ID not found")?;
1543        let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1544        Ok(module_id.with_abi())
1545    }
1546
1547    pub async fn query_committees(&self, chain_id: &ChainId) -> Result<BTreeMap<Epoch, Committee>> {
1548        let query = format!(
1549            "query {{ chain(chainId:\"{chain_id}\") {{
1550                executionState {{ system {{ committees }} }}
1551            }} }}"
1552        );
1553        let mut response = self.query_node(query).await?;
1554        let committees = response["chain"]["executionState"]["system"]["committees"].take();
1555        Ok(serde_json::from_value(committees)?)
1556    }
1557
1558    pub async fn events_from_index(
1559        &self,
1560        chain_id: &ChainId,
1561        stream_id: &StreamId,
1562        start_index: u32,
1563    ) -> Result<Vec<IndexAndEvent>> {
1564        let query = format!(
1565            "query {{
1566               eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1567               {{ index event }}
1568             }}",
1569            stream_id.to_value()
1570        );
1571        let mut response = self.query_node(query).await?;
1572        let response = response["eventsFromIndex"].take();
1573        Ok(serde_json::from_value(response)?)
1574    }
1575
1576    pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1577        let n_try = 5;
1578        let query = query.as_ref();
1579        for i in 0..n_try {
1580            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1581            let url = format!("http://localhost:{}/", self.port);
1582            let client = reqwest_client();
1583            let result = client
1584                .post(url)
1585                .json(&json!({ "query": query }))
1586                .send()
1587                .await;
1588            if matches!(result, Err(ref error) if error.is_timeout()) {
1589                tracing::warn!(
1590                    "Timeout when sending query {} to the node service",
1591                    truncate_query_output(query)
1592                );
1593                continue;
1594            }
1595            let response = result.with_context(|| {
1596                format!(
1597                    "query_node: failed to post query={}",
1598                    truncate_query_output(query)
1599                )
1600            })?;
1601            ensure!(
1602                response.status().is_success(),
1603                "Query \"{}\" failed: {}",
1604                truncate_query_output(query),
1605                response
1606                    .text()
1607                    .await
1608                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1609            );
1610            let value: Value = response.json().await.context("invalid JSON")?;
1611            if let Some(errors) = value.get("errors") {
1612                tracing::warn!(
1613                    "Query \"{}\" failed: {}",
1614                    truncate_query_output(query),
1615                    errors
1616                );
1617            } else {
1618                return Ok(value["data"].clone());
1619            }
1620        }
1621        bail!(
1622            "Query \"{}\" failed after {} retries.",
1623            truncate_query_output(query),
1624            n_try
1625        );
1626    }
1627
1628    pub async fn create_application<
1629        Abi: ContractAbi,
1630        Parameters: Serialize,
1631        InstantiationArgument: Serialize,
1632    >(
1633        &self,
1634        chain_id: &ChainId,
1635        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1636        parameters: &Parameters,
1637        argument: &InstantiationArgument,
1638        required_application_ids: &[ApplicationId],
1639    ) -> Result<ApplicationId<Abi>> {
1640        let module_id = module_id.forget_abi();
1641        let json_required_applications_ids = required_application_ids
1642            .iter()
1643            .map(ApplicationId::to_string)
1644            .collect::<Vec<_>>()
1645            .to_value();
1646        // Convert to `serde_json::Value` then `async_graphql::Value` via the trait `InputType`.
1647        let new_parameters = serde_json::to_value(parameters)
1648            .context("could not create parameters JSON")?
1649            .to_value();
1650        let new_argument = serde_json::to_value(argument)
1651            .context("could not create argument JSON")?
1652            .to_value();
1653        let query = format!(
1654            "mutation {{ createApplication(\
1655                 chainId: \"{chain_id}\",
1656                 moduleId: \"{module_id}\", \
1657                 parameters: {new_parameters}, \
1658                 instantiationArgument: {new_argument}, \
1659                 requiredApplicationIds: {json_required_applications_ids}) \
1660             }}"
1661        );
1662        let data = self.query_node(query).await?;
1663        let app_id_str = data["createApplication"]
1664            .as_str()
1665            .context("missing createApplication string in response")?
1666            .trim();
1667        Ok(app_id_str
1668            .parse::<ApplicationId>()
1669            .context("invalid application ID")?
1670            .with_abi())
1671    }
1672
1673    /// Obtains the hash and height of the `chain`'s tip block, as known by this node service.
1674    pub async fn chain_tip(&self, chain: ChainId) -> Result<Option<(CryptoHash, BlockHeight)>> {
1675        let query = format!(
1676            r#"query {{ block(chainId: "{chain}") {{
1677                hash
1678                block {{ header {{ height }} }}
1679            }} }}"#
1680        );
1681
1682        let mut response = self.query_node(&query).await?;
1683
1684        match (
1685            mem::take(&mut response["block"]["hash"]),
1686            mem::take(&mut response["block"]["block"]["header"]["height"]),
1687        ) {
1688            (Value::Null, Value::Null) => Ok(None),
1689            (Value::String(hash), Value::Number(height)) => Ok(Some((
1690                hash.parse()
1691                    .context("Received an invalid hash {hash:?} for chain tip")?,
1692                BlockHeight(height.as_u64().unwrap()),
1693            ))),
1694            invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
1695        }
1696    }
1697
1698    /// Subscribes to the node service and returns a stream of notifications about a chain.
1699    pub async fn notifications(
1700        &self,
1701        chain_id: ChainId,
1702    ) -> Result<Pin<Box<impl Stream<Item = Result<Notification>>>>> {
1703        let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
1704        let url = format!("ws://localhost:{}/ws", self.port);
1705        let mut request = url.into_client_request()?;
1706        request.headers_mut().insert(
1707            "Sec-WebSocket-Protocol",
1708            HeaderValue::from_str("graphql-transport-ws")?,
1709        );
1710        let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1711        let init_json = json!({
1712          "type": "connection_init",
1713          "payload": {}
1714        });
1715        websocket.send(init_json.to_string().into()).await?;
1716        let text = websocket
1717            .next()
1718            .await
1719            .context("Failed to establish connection")??
1720            .into_text()?;
1721        ensure!(
1722            text == "{\"type\":\"connection_ack\"}",
1723            "Unexpected response: {text}"
1724        );
1725        let query_json = json!({
1726          "id": "1",
1727          "type": "start",
1728          "payload": {
1729            "query": query,
1730            "variables": {},
1731            "operationName": null
1732          }
1733        });
1734        websocket.send(query_json.to_string().into()).await?;
1735        Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
1736            |message| async {
1737                let text = message.into_text()?;
1738                let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1739                if let Some(errors) = value["payload"].get("errors") {
1740                    bail!("Notification subscription failed: {errors:?}");
1741                }
1742                serde_json::from_value(value["payload"]["data"]["notifications"].clone())
1743                    .context("Failed to deserialize notification")
1744            },
1745        )))
1746    }
1747}
1748
1749/// A running faucet service.
1750pub struct FaucetService {
1751    port: u16,
1752    child: Child,
1753    _temp_dir: tempfile::TempDir,
1754}
1755
1756impl FaucetService {
1757    fn new(port: u16, child: Child, temp_dir: tempfile::TempDir) -> Self {
1758        Self {
1759            port,
1760            child,
1761            _temp_dir: temp_dir,
1762        }
1763    }
1764
1765    pub async fn terminate(mut self) -> Result<()> {
1766        self.child
1767            .kill()
1768            .await
1769            .context("terminating faucet service")
1770    }
1771
1772    pub fn ensure_is_running(&mut self) -> Result<()> {
1773        self.child.ensure_is_running()
1774    }
1775
1776    pub fn instance(&self) -> Faucet {
1777        Faucet::new(format!("http://localhost:{}/", self.port))
1778    }
1779}
1780
1781/// A running `Application` to be queried in GraphQL.
1782pub struct ApplicationWrapper<A> {
1783    uri: String,
1784    _phantom: PhantomData<A>,
1785}
1786
1787impl<A> ApplicationWrapper<A> {
1788    pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
1789        let query = query.as_ref();
1790        let value = self.run_json_query(json!({ "query": query })).await?;
1791        Ok(value["data"].clone())
1792    }
1793
1794    pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
1795        const MAX_RETRIES: usize = 5;
1796
1797        for i in 0.. {
1798            let client = reqwest_client();
1799            let result = client.post(&self.uri).json(&query).send().await;
1800            let response = match result {
1801                Ok(response) => response,
1802                Err(error) if i < MAX_RETRIES => {
1803                    tracing::warn!(
1804                        "Failed to post query \"{}\": {error}; retrying",
1805                        truncate_query_output_serialize(&query),
1806                    );
1807                    continue;
1808                }
1809                Err(error) => {
1810                    let query = truncate_query_output_serialize(&query);
1811                    return Err(error)
1812                        .with_context(|| format!("run_json_query: failed to post query={query}"));
1813                }
1814            };
1815            ensure!(
1816                response.status().is_success(),
1817                "Query \"{}\" failed: {}",
1818                truncate_query_output_serialize(&query),
1819                response
1820                    .text()
1821                    .await
1822                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1823            );
1824            let value: Value = response.json().await.context("invalid JSON")?;
1825            if let Some(errors) = value.get("errors") {
1826                bail!(
1827                    "Query \"{}\" failed: {}",
1828                    truncate_query_output_serialize(&query),
1829                    errors
1830                );
1831            }
1832            return Ok(value);
1833        }
1834        unreachable!()
1835    }
1836
1837    pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
1838        let query = query.as_ref();
1839        self.run_graphql_query(&format!("query {{ {query} }}"))
1840            .await
1841    }
1842
1843    pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
1844        let query = query.as_ref().trim();
1845        let name = query
1846            .split_once(|ch: char| !ch.is_alphanumeric())
1847            .map_or(query, |(name, _)| name);
1848        let data = self.query(query).await?;
1849        serde_json::from_value(data[name].clone())
1850            .with_context(|| format!("{name} field missing in response"))
1851    }
1852
1853    pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
1854        let mutation = mutation.as_ref();
1855        self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
1856            .await
1857    }
1858
1859    pub async fn multiple_mutate(&self, mutations: &[String]) -> Result<Value> {
1860        let mut out = String::from("mutation {\n");
1861        for (index, mutation) in mutations.iter().enumerate() {
1862            out = format!("{}  u{}: {}\n", out, index, mutation);
1863        }
1864        out.push_str("}\n");
1865        self.run_graphql_query(&out).await
1866    }
1867}
1868
1869impl<A> From<String> for ApplicationWrapper<A> {
1870    fn from(uri: String) -> ApplicationWrapper<A> {
1871        ApplicationWrapper {
1872            uri,
1873            _phantom: PhantomData,
1874        }
1875    }
1876}
1877
1878/// Returns the timeout for tests that wait for notifications, either read from the env
1879/// variable `LINERA_TEST_NOTIFICATION_TIMEOUT_MS`, or the default value of 10 seconds.
1880#[cfg(with_testing)]
1881fn notification_timeout() -> Duration {
1882    const NOTIFICATION_TIMEOUT_MS_ENV: &str = "LINERA_TEST_NOTIFICATION_TIMEOUT_MS";
1883    const NOTIFICATION_TIMEOUT_MS_DEFAULT: u64 = 10_000;
1884
1885    match env::var(NOTIFICATION_TIMEOUT_MS_ENV) {
1886        Ok(var) => Duration::from_millis(var.parse().unwrap_or_else(|error| {
1887            panic!("{NOTIFICATION_TIMEOUT_MS_ENV} is not a valid number: {error}")
1888        })),
1889        Err(env::VarError::NotPresent) => Duration::from_millis(NOTIFICATION_TIMEOUT_MS_DEFAULT),
1890        Err(env::VarError::NotUnicode(_)) => {
1891            panic!("{NOTIFICATION_TIMEOUT_MS_ENV} must be valid Unicode")
1892        }
1893    }
1894}
1895
1896#[cfg(with_testing)]
1897pub trait NotificationsExt {
1898    /// Waits for a notification for which `f` returns `Some(t)`, and returns `t`.
1899    fn wait_for<T>(
1900        &mut self,
1901        f: impl FnMut(Notification) -> Option<T>,
1902    ) -> impl Future<Output = Result<T>>;
1903
1904    /// Waits for a `NewEvents` notification for the given block height. If no height is specified,
1905    /// any height is accepted.
1906    fn wait_for_events(
1907        &mut self,
1908        expected_height: impl Into<Option<BlockHeight>>,
1909    ) -> impl Future<Output = Result<BTreeSet<StreamId>>> {
1910        let expected_height = expected_height.into();
1911        self.wait_for(move |notification| {
1912            if let Reason::NewEvents {
1913                height,
1914                event_streams,
1915                ..
1916            } = notification.reason
1917            {
1918                if expected_height.is_none_or(|h| h == height) {
1919                    return Some(event_streams);
1920                }
1921            }
1922            None
1923        })
1924    }
1925
1926    /// Waits for a `NewBlock` notification for the given block height. If no height is specified,
1927    /// any height is accepted.
1928    fn wait_for_block(
1929        &mut self,
1930        expected_height: impl Into<Option<BlockHeight>>,
1931    ) -> impl Future<Output = Result<CryptoHash>> {
1932        let expected_height = expected_height.into();
1933        self.wait_for(move |notification| {
1934            if let Reason::NewBlock { height, hash, .. } = notification.reason {
1935                if expected_height.is_none_or(|h| h == height) {
1936                    return Some(hash);
1937                }
1938            }
1939            None
1940        })
1941    }
1942
1943    /// Waits for a `NewIncomingBundle` notification for the given sender chain and sender block
1944    /// height. If no height is specified, any height is accepted.
1945    fn wait_for_bundle(
1946        &mut self,
1947        expected_origin: ChainId,
1948        expected_height: impl Into<Option<BlockHeight>>,
1949    ) -> impl Future<Output = Result<()>> {
1950        let expected_height = expected_height.into();
1951        self.wait_for(move |notification| {
1952            if let Reason::NewIncomingBundle { height, origin } = notification.reason {
1953                if expected_height.is_none_or(|h| h == height) && origin == expected_origin {
1954                    return Some(());
1955                }
1956            }
1957            None
1958        })
1959    }
1960}
1961
1962#[cfg(with_testing)]
1963impl<S: Stream<Item = Result<Notification>>> NotificationsExt for Pin<Box<S>> {
1964    async fn wait_for<T>(&mut self, mut f: impl FnMut(Notification) -> Option<T>) -> Result<T> {
1965        let mut timeout = Box::pin(linera_base::time::timer::sleep(notification_timeout())).fuse();
1966        loop {
1967            let notification = futures::select! {
1968                () = timeout => bail!("Timeout waiting for notification"),
1969                notification = self.next().fuse() => notification.context("Stream closed")??,
1970            };
1971            if let Some(t) = f(notification) {
1972                return Ok(t);
1973            }
1974        }
1975    }
1976}