linera_service/cli_wrappers/
wallet.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    borrow::Cow,
6    collections::BTreeMap,
7    env,
8    marker::PhantomData,
9    mem,
10    path::{Path, PathBuf},
11    pin::Pin,
12    process::Stdio,
13    str::FromStr,
14    sync,
15    time::Duration,
16};
17
18use anyhow::{bail, ensure, Context, Result};
19use async_graphql::InputType;
20use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
21use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
22use heck::ToKebabCase;
23use linera_base::{
24    abi::ContractAbi,
25    command::{resolve_binary, CommandExt},
26    crypto::{CryptoHash, InMemorySigner},
27    data_types::{Amount, BlockHeight, Bytecode, Epoch},
28    identifiers::{
29        Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
30    },
31    vm::VmRuntime,
32};
33use linera_client::client_options::ResourceControlPolicyConfig;
34use linera_core::worker::Notification;
35use linera_execution::committee::Committee;
36use linera_faucet_client::Faucet;
37use serde::{de::DeserializeOwned, ser::Serialize};
38use serde_command_opts::to_args;
39use serde_json::{json, Value};
40use tempfile::TempDir;
41use tokio::{
42    io::{AsyncBufReadExt, BufReader},
43    process::{Child, Command},
44    sync::oneshot,
45    task::JoinHandle,
46};
47#[cfg(with_testing)]
48use {
49    futures::FutureExt as _,
50    linera_core::worker::Reason,
51    std::{collections::BTreeSet, future::Future},
52};
53
54use crate::{
55    cli::command::BenchmarkCommand,
56    cli_wrappers::{
57        local_net::{PathProvider, ProcessInbox},
58        Network,
59    },
60    util::{self, ChildExt},
61    wallet::Wallet,
62};
63
64/// The name of the environment variable that allows specifying additional arguments to be passed
65/// to the node-service command of the client.
66const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
67
68fn reqwest_client() -> reqwest::Client {
69    reqwest::ClientBuilder::new()
70        .timeout(Duration::from_secs(30))
71        .build()
72        .unwrap()
73}
74
75/// Wrapper to run a Linera client command.
76pub struct ClientWrapper {
77    binary_path: sync::Mutex<Option<PathBuf>>,
78    testing_prng_seed: Option<u64>,
79    storage: String,
80    wallet: String,
81    keystore: String,
82    max_pending_message_bundles: usize,
83    network: Network,
84    pub path_provider: PathProvider,
85    on_drop: OnClientDrop,
86    extra_args: Vec<String>,
87}
88
89/// Action to perform when the [`ClientWrapper`] is dropped.
90#[derive(Clone, Copy, Debug, Eq, PartialEq)]
91pub enum OnClientDrop {
92    /// Close all the chains on the wallet.
93    CloseChains,
94    /// Do not close any chains, leaving them active.
95    LeakChains,
96}
97
98impl ClientWrapper {
99    pub fn new(
100        path_provider: PathProvider,
101        network: Network,
102        testing_prng_seed: Option<u64>,
103        id: usize,
104        on_drop: OnClientDrop,
105    ) -> Self {
106        Self::new_with_extra_args(
107            path_provider,
108            network,
109            testing_prng_seed,
110            id,
111            on_drop,
112            vec!["--wait-for-outgoing-messages".to_string()],
113        )
114    }
115
116    pub fn new_with_extra_args(
117        path_provider: PathProvider,
118        network: Network,
119        testing_prng_seed: Option<u64>,
120        id: usize,
121        on_drop: OnClientDrop,
122        extra_args: Vec<String>,
123    ) -> Self {
124        let storage = format!(
125            "rocksdb:{}/client_{}.db",
126            path_provider.path().display(),
127            id
128        );
129        let wallet = format!("wallet_{}.json", id);
130        let keystore = format!("keystore_{}.json", id);
131        Self {
132            binary_path: sync::Mutex::new(None),
133            testing_prng_seed,
134            storage,
135            wallet,
136            keystore,
137            max_pending_message_bundles: 10_000,
138            network,
139            path_provider,
140            on_drop,
141            extra_args,
142        }
143    }
144
145    /// Runs `linera project new`.
146    pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
147        let tmp = TempDir::new()?;
148        let mut command = self.command().await?;
149        command
150            .current_dir(tmp.path())
151            .arg("project")
152            .arg("new")
153            .arg(project_name)
154            .arg("--linera-root")
155            .arg(linera_root)
156            .spawn_and_wait_for_stdout()
157            .await?;
158        Ok(tmp)
159    }
160
161    /// Runs `linera project publish`.
162    pub async fn project_publish<T: Serialize>(
163        &self,
164        path: PathBuf,
165        required_application_ids: Vec<String>,
166        publisher: impl Into<Option<ChainId>>,
167        argument: &T,
168    ) -> Result<String> {
169        let json_parameters = serde_json::to_string(&())?;
170        let json_argument = serde_json::to_string(argument)?;
171        let mut command = self.command().await?;
172        command
173            .arg("project")
174            .arg("publish-and-create")
175            .arg(path)
176            .args(publisher.into().iter().map(ChainId::to_string))
177            .args(["--json-parameters", &json_parameters])
178            .args(["--json-argument", &json_argument]);
179        if !required_application_ids.is_empty() {
180            command.arg("--required-application-ids");
181            command.args(required_application_ids);
182        }
183        let stdout = command.spawn_and_wait_for_stdout().await?;
184        Ok(stdout.trim().to_string())
185    }
186
187    /// Runs `linera project test`.
188    pub async fn project_test(&self, path: &Path) -> Result<()> {
189        self.command()
190            .await
191            .context("failed to create project test command")?
192            .current_dir(path)
193            .arg("project")
194            .arg("test")
195            .spawn_and_wait_for_stdout()
196            .await?;
197        Ok(())
198    }
199
200    async fn command_with_envs_and_arguments(
201        &self,
202        envs: &[(&str, &str)],
203        arguments: impl IntoIterator<Item = Cow<'_, str>>,
204    ) -> Result<Command> {
205        let mut command = self.command_binary().await?;
206        command.current_dir(self.path_provider.path());
207        for (key, value) in envs {
208            command.env(key, value);
209        }
210        for argument in arguments {
211            command.arg(&*argument);
212        }
213        Ok(command)
214    }
215
216    async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
217        self.command_with_envs_and_arguments(envs, self.command_arguments())
218            .await
219    }
220
221    async fn command_with_arguments(
222        &self,
223        arguments: impl IntoIterator<Item = Cow<'_, str>>,
224    ) -> Result<Command> {
225        self.command_with_envs_and_arguments(
226            &[(
227                "RUST_LOG",
228                &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
229            )],
230            arguments,
231        )
232        .await
233    }
234
235    async fn command(&self) -> Result<Command> {
236        self.command_with_envs(&[(
237            "RUST_LOG",
238            &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
239        )])
240        .await
241    }
242
243    fn required_command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
244        [
245            "--wallet".into(),
246            self.wallet.as_str().into(),
247            "--keystore".into(),
248            self.keystore.as_str().into(),
249            "--storage".into(),
250            self.storage.as_str().into(),
251            "--send-timeout-ms".into(),
252            "500000".into(),
253            "--recv-timeout-ms".into(),
254            "500000".into(),
255        ]
256        .into_iter()
257        .chain(self.extra_args.iter().map(|s| s.as_str().into()))
258    }
259
260    /// Returns an iterator over the arguments that should be added to all command invocations.
261    fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
262        self.required_command_arguments().chain([
263            "--max-pending-message-bundles".into(),
264            self.max_pending_message_bundles.to_string().into(),
265        ])
266    }
267
268    /// Returns the [`Command`] instance configured to run the appropriate binary.
269    ///
270    /// The path is resolved once and cached inside `self` for subsequent usages.
271    async fn command_binary(&self) -> Result<Command> {
272        match self.command_with_cached_binary_path() {
273            Some(command) => Ok(command),
274            None => {
275                let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
276                let command = Command::new(&resolved_path);
277
278                self.set_cached_binary_path(resolved_path);
279
280                Ok(command)
281            }
282        }
283    }
284
285    /// Returns a [`Command`] instance configured with the cached `binary_path`, if available.
286    fn command_with_cached_binary_path(&self) -> Option<Command> {
287        let binary_path = self.binary_path.lock().unwrap();
288
289        binary_path.as_ref().map(Command::new)
290    }
291
292    /// Sets the cached `binary_path` with the `new_binary_path`.
293    ///
294    /// # Panics
295    ///
296    /// If the cache is already set to a different value. In theory the two threads calling
297    /// `command_binary` can race and resolve the binary path twice, but they should always be the
298    /// same path.
299    fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
300        let mut binary_path = self.binary_path.lock().unwrap();
301
302        if binary_path.is_none() {
303            *binary_path = Some(new_binary_path);
304        } else {
305            assert_eq!(*binary_path, Some(new_binary_path));
306        }
307    }
308
309    /// Runs `linera create-genesis-config`.
310    pub async fn create_genesis_config(
311        &self,
312        num_other_initial_chains: u32,
313        initial_funding: Amount,
314        policy_config: ResourceControlPolicyConfig,
315        http_allow_list: Option<Vec<String>>,
316    ) -> Result<()> {
317        let mut command = self.command().await?;
318        command
319            .args([
320                "create-genesis-config",
321                &num_other_initial_chains.to_string(),
322            ])
323            .args(["--initial-funding", &initial_funding.to_string()])
324            .args(["--committee", "committee.json"])
325            .args(["--genesis", "genesis.json"])
326            .args([
327                "--policy-config",
328                &policy_config.to_string().to_kebab_case(),
329            ]);
330        if let Some(allow_list) = http_allow_list {
331            command
332                .arg("--http-request-allow-list")
333                .arg(allow_list.join(","));
334        }
335        if let Some(seed) = self.testing_prng_seed {
336            command.arg("--testing-prng-seed").arg(seed.to_string());
337        }
338        command.spawn_and_wait_for_stdout().await?;
339        Ok(())
340    }
341
342    /// Runs `linera wallet init`. The genesis config is read from `genesis.json`, or from the
343    /// faucet if provided.
344    pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
345        let mut command = self.command().await?;
346        command.args(["wallet", "init"]);
347        match faucet {
348            None => command.args(["--genesis", "genesis.json"]),
349            Some(faucet) => command.args(["--faucet", faucet.url()]),
350        };
351        if let Some(seed) = self.testing_prng_seed {
352            command.arg("--testing-prng-seed").arg(seed.to_string());
353        }
354        command.spawn_and_wait_for_stdout().await?;
355        Ok(())
356    }
357
358    /// Runs `linera wallet request-chain`.
359    pub async fn request_chain(
360        &self,
361        faucet: &Faucet,
362        set_default: bool,
363    ) -> Result<(ChainId, AccountOwner)> {
364        let mut command = self.command().await?;
365        command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
366        if set_default {
367            command.arg("--set-default");
368        }
369        let stdout = command.spawn_and_wait_for_stdout().await?;
370        let mut lines = stdout.split_whitespace();
371        let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
372        let owner = lines.next().context("missing chain owner")?.parse()?;
373        Ok((chain_id, owner))
374    }
375
376    /// Runs `linera wallet publish-and-create`.
377    #[expect(clippy::too_many_arguments)]
378    pub async fn publish_and_create<
379        A: ContractAbi,
380        Parameters: Serialize,
381        InstantiationArgument: Serialize,
382    >(
383        &self,
384        contract: PathBuf,
385        service: PathBuf,
386        vm_runtime: VmRuntime,
387        parameters: &Parameters,
388        argument: &InstantiationArgument,
389        required_application_ids: &[ApplicationId],
390        publisher: impl Into<Option<ChainId>>,
391    ) -> Result<ApplicationId<A>> {
392        let json_parameters = serde_json::to_string(parameters)?;
393        let json_argument = serde_json::to_string(argument)?;
394        let mut command = self.command().await?;
395        let vm_runtime = format!("{}", vm_runtime);
396        command
397            .arg("publish-and-create")
398            .args([contract, service])
399            .args(["--vm-runtime", &vm_runtime.to_lowercase()])
400            .args(publisher.into().iter().map(ChainId::to_string))
401            .args(["--json-parameters", &json_parameters])
402            .args(["--json-argument", &json_argument]);
403        if !required_application_ids.is_empty() {
404            command.arg("--required-application-ids");
405            command.args(
406                required_application_ids
407                    .iter()
408                    .map(ApplicationId::to_string),
409            );
410        }
411        let stdout = command.spawn_and_wait_for_stdout().await?;
412        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
413    }
414
415    /// Runs `linera publish-module`.
416    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
417        &self,
418        contract: PathBuf,
419        service: PathBuf,
420        vm_runtime: VmRuntime,
421        publisher: impl Into<Option<ChainId>>,
422    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
423        let stdout = self
424            .command()
425            .await?
426            .arg("publish-module")
427            .args([contract, service])
428            .args(["--vm-runtime", &format!("{}", vm_runtime).to_lowercase()])
429            .args(publisher.into().iter().map(ChainId::to_string))
430            .spawn_and_wait_for_stdout()
431            .await?;
432        let module_id: ModuleId = stdout.trim().parse()?;
433        Ok(module_id.with_abi())
434    }
435
436    /// Runs `linera create-application`.
437    pub async fn create_application<
438        Abi: ContractAbi,
439        Parameters: Serialize,
440        InstantiationArgument: Serialize,
441    >(
442        &self,
443        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
444        parameters: &Parameters,
445        argument: &InstantiationArgument,
446        required_application_ids: &[ApplicationId],
447        creator: impl Into<Option<ChainId>>,
448    ) -> Result<ApplicationId<Abi>> {
449        let json_parameters = serde_json::to_string(parameters)?;
450        let json_argument = serde_json::to_string(argument)?;
451        let mut command = self.command().await?;
452        command
453            .arg("create-application")
454            .arg(module_id.forget_abi().to_string())
455            .args(["--json-parameters", &json_parameters])
456            .args(["--json-argument", &json_argument])
457            .args(creator.into().iter().map(ChainId::to_string));
458        if !required_application_ids.is_empty() {
459            command.arg("--required-application-ids");
460            command.args(
461                required_application_ids
462                    .iter()
463                    .map(ApplicationId::to_string),
464            );
465        }
466        let stdout = command.spawn_and_wait_for_stdout().await?;
467        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
468    }
469
470    /// Runs `linera service`.
471    pub async fn run_node_service(
472        &self,
473        port: impl Into<Option<u16>>,
474        process_inbox: ProcessInbox,
475    ) -> Result<NodeService> {
476        self.run_node_service_with_options(port, process_inbox, &[], &[])
477            .await
478    }
479
480    /// Runs `linera service` with optional task processor configuration.
481    pub async fn run_node_service_with_options(
482        &self,
483        port: impl Into<Option<u16>>,
484        process_inbox: ProcessInbox,
485        operator_application_ids: &[ApplicationId],
486        operators: &[(String, PathBuf)],
487    ) -> Result<NodeService> {
488        let port = port.into().unwrap_or(8080);
489        let mut command = self.command().await?;
490        command.arg("service");
491        if let ProcessInbox::Skip = process_inbox {
492            command.arg("--listener-skip-process-inbox");
493        }
494        if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
495            command.args(var.split_whitespace());
496        }
497        for app_id in operator_application_ids {
498            command.args(["--operator-application-ids", &app_id.to_string()]);
499        }
500        for (name, path) in operators {
501            command.args(["--operators", &format!("{}={}", name, path.display())]);
502        }
503        let child = command
504            .args(["--port".to_string(), port.to_string()])
505            .spawn_into()?;
506        let client = reqwest_client();
507        for i in 0..10 {
508            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
509            let request = client
510                .get(format!("http://localhost:{}/", port))
511                .send()
512                .await;
513            if request.is_ok() {
514                tracing::info!("Node service has started");
515                return Ok(NodeService::new(port, child));
516            } else {
517                tracing::warn!("Waiting for node service to start");
518            }
519        }
520        bail!("Failed to start node service");
521    }
522
523    /// Runs `linera validator query`
524    pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
525        let mut command = self.command().await?;
526        command.arg("validator").arg("query").arg(address);
527        let stdout = command.spawn_and_wait_for_stdout().await?;
528
529        // Parse the genesis config hash from the output.
530        // It's on a line like "Genesis config hash: <hash>"
531        let hash = stdout
532            .lines()
533            .find_map(|line| {
534                line.strip_prefix("Genesis config hash: ")
535                    .and_then(|hash_str| hash_str.trim().parse().ok())
536            })
537            .context("error while parsing the result of `linera validator query`")?;
538        Ok(hash)
539    }
540
541    /// Runs `linera validator list`.
542    pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
543        let mut command = self.command().await?;
544        command.arg("validator").arg("list");
545        if let Some(chain_id) = chain_id {
546            command.args(["--chain-id", &chain_id.to_string()]);
547        }
548        command.spawn_and_wait_for_stdout().await?;
549        Ok(())
550    }
551
552    /// Runs `linera sync-validator`.
553    pub async fn sync_validator(
554        &self,
555        chain_ids: impl IntoIterator<Item = &ChainId>,
556        validator_address: impl Into<String>,
557    ) -> Result<()> {
558        let mut command = self.command().await?;
559        command
560            .arg("validator")
561            .arg("sync")
562            .arg(validator_address.into());
563        let mut chain_ids = chain_ids.into_iter().peekable();
564        if chain_ids.peek().is_some() {
565            command
566                .arg("--chains")
567                .args(chain_ids.map(ChainId::to_string));
568        }
569        command.spawn_and_wait_for_stdout().await?;
570        Ok(())
571    }
572
573    /// Runs `linera faucet`.
574    pub async fn run_faucet(
575        &self,
576        port: impl Into<Option<u16>>,
577        chain_id: Option<ChainId>,
578        amount: Amount,
579    ) -> Result<FaucetService> {
580        let port = port.into().unwrap_or(8080);
581        let temp_dir = tempfile::tempdir()
582            .context("Failed to create temporary directory for faucet storage")?;
583        let storage_path = temp_dir.path().join("faucet_storage.sqlite");
584        let mut command = self.command().await?;
585        let command = command
586            .arg("faucet")
587            .args(["--port".to_string(), port.to_string()])
588            .args(["--amount".to_string(), amount.to_string()])
589            .args([
590                "--storage-path".to_string(),
591                storage_path.to_string_lossy().to_string(),
592            ]);
593        if let Some(chain_id) = chain_id {
594            command.arg(chain_id.to_string());
595        }
596        let child = command.spawn_into()?;
597        let client = reqwest_client();
598        for i in 0..10 {
599            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
600            let request = client
601                .get(format!("http://localhost:{}/", port))
602                .send()
603                .await;
604            if request.is_ok() {
605                tracing::info!("Faucet has started");
606                return Ok(FaucetService::new(port, child, temp_dir));
607            } else {
608                tracing::debug!("Waiting for faucet to start");
609            }
610        }
611        bail!("Failed to start faucet");
612    }
613
614    /// Runs `linera local-balance`.
615    pub async fn local_balance(&self, account: Account) -> Result<Amount> {
616        let stdout = self
617            .command()
618            .await?
619            .arg("local-balance")
620            .arg(account.to_string())
621            .spawn_and_wait_for_stdout()
622            .await?;
623        let amount = stdout
624            .trim()
625            .parse()
626            .context("error while parsing the result of `linera local-balance`")?;
627        Ok(amount)
628    }
629
630    /// Runs `linera query-balance`.
631    pub async fn query_balance(&self, account: Account) -> Result<Amount> {
632        let stdout = self
633            .command()
634            .await?
635            .arg("query-balance")
636            .arg(account.to_string())
637            .spawn_and_wait_for_stdout()
638            .await?;
639        let amount = stdout
640            .trim()
641            .parse()
642            .context("error while parsing the result of `linera query-balance`")?;
643        Ok(amount)
644    }
645
646    /// Runs `linera sync`.
647    pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
648        self.command()
649            .await?
650            .arg("sync")
651            .arg(chain_id.to_string())
652            .spawn_and_wait_for_stdout()
653            .await?;
654        Ok(())
655    }
656
657    /// Runs `linera process-inbox`.
658    pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
659        self.command()
660            .await?
661            .arg("process-inbox")
662            .arg(chain_id.to_string())
663            .spawn_and_wait_for_stdout()
664            .await?;
665        Ok(())
666    }
667
668    /// Runs `linera transfer`.
669    pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
670        self.command()
671            .await?
672            .arg("transfer")
673            .arg(amount.to_string())
674            .args(["--from", &from.to_string()])
675            .args(["--to", &to.to_string()])
676            .spawn_and_wait_for_stdout()
677            .await?;
678        Ok(())
679    }
680
681    /// Runs `linera transfer` with no logging.
682    pub async fn transfer_with_silent_logs(
683        &self,
684        amount: Amount,
685        from: ChainId,
686        to: ChainId,
687    ) -> Result<()> {
688        self.command()
689            .await?
690            .env("RUST_LOG", "off")
691            .arg("transfer")
692            .arg(amount.to_string())
693            .args(["--from", &from.to_string()])
694            .args(["--to", &to.to_string()])
695            .spawn_and_wait_for_stdout()
696            .await?;
697        Ok(())
698    }
699
700    /// Runs `linera transfer` with owner accounts.
701    pub async fn transfer_with_accounts(
702        &self,
703        amount: Amount,
704        from: Account,
705        to: Account,
706    ) -> Result<()> {
707        self.command()
708            .await?
709            .arg("transfer")
710            .arg(amount.to_string())
711            .args(["--from", &from.to_string()])
712            .args(["--to", &to.to_string()])
713            .spawn_and_wait_for_stdout()
714            .await?;
715        Ok(())
716    }
717
718    fn benchmark_command_internal(command: &mut Command, args: BenchmarkCommand) -> Result<()> {
719        let mut formatted_args = to_args(&args)?;
720        let subcommand = formatted_args.remove(0);
721        // The subcommand is followed by the flattened options, which are preceded by "options".
722        // So remove that as well.
723        formatted_args.remove(0);
724        let options = formatted_args
725            .chunks_exact(2)
726            .flat_map(|pair| {
727                let option = format!("--{}", pair[0]);
728                match pair[1].as_str() {
729                    "true" => vec![option],
730                    "false" => vec![],
731                    _ => vec![option, pair[1].clone()],
732                }
733            })
734            .collect::<Vec<_>>();
735        command
736            .args([
737                "--max-pending-message-bundles",
738                &args.transactions_per_block().to_string(),
739            ])
740            .arg("benchmark")
741            .arg(subcommand)
742            .args(options);
743        Ok(())
744    }
745
746    async fn benchmark_command_with_envs(
747        &self,
748        args: BenchmarkCommand,
749        envs: &[(&str, &str)],
750    ) -> Result<Command> {
751        let mut command = self
752            .command_with_envs_and_arguments(envs, self.required_command_arguments())
753            .await?;
754        Self::benchmark_command_internal(&mut command, args)?;
755        Ok(command)
756    }
757
758    async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
759        let mut command = self
760            .command_with_arguments(self.required_command_arguments())
761            .await?;
762        Self::benchmark_command_internal(&mut command, args)?;
763        Ok(command)
764    }
765
766    /// Runs `linera benchmark`.
767    pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
768        let mut command = self.benchmark_command(args).await?;
769        command.spawn_and_wait_for_stdout().await?;
770        Ok(())
771    }
772
773    /// Runs `linera benchmark`, but detached: don't wait for the command to finish, just spawn it
774    /// and return the child process, and the handles to the stdout and stderr.
775    pub async fn benchmark_detached(
776        &self,
777        args: BenchmarkCommand,
778        tx: oneshot::Sender<()>,
779    ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
780        let mut child = self
781            .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
782            .await?
783            .kill_on_drop(true)
784            .stdin(Stdio::piped())
785            .stdout(Stdio::piped())
786            .stderr(Stdio::piped())
787            .spawn()?;
788
789        let pid = child.id().expect("failed to get pid");
790        let stdout = child.stdout.take().expect("stdout not open");
791        let stdout_handle = tokio::spawn(async move {
792            let mut lines = BufReader::new(stdout).lines();
793            while let Ok(Some(line)) = lines.next_line().await {
794                println!("benchmark{{pid={pid}}} {line}");
795            }
796        });
797
798        let stderr = child.stderr.take().expect("stderr not open");
799        let stderr_handle = tokio::spawn(async move {
800            let mut lines = BufReader::new(stderr).lines();
801            let mut tx = Some(tx);
802            while let Ok(Some(line)) = lines.next_line().await {
803                if line.contains("Ready to start benchmark") {
804                    tx.take()
805                        .expect("Should only send signal once")
806                        .send(())
807                        .expect("failed to send ready signal to main thread");
808                } else {
809                    println!("benchmark{{pid={pid}}} {line}");
810                }
811            }
812        });
813        Ok((child, stdout_handle, stderr_handle))
814    }
815
816    async fn open_chain_internal(
817        &self,
818        from: ChainId,
819        owner: Option<AccountOwner>,
820        initial_balance: Amount,
821        super_owner: bool,
822    ) -> Result<(ChainId, AccountOwner)> {
823        let mut command = self.command().await?;
824        command
825            .arg("open-chain")
826            .args(["--from", &from.to_string()])
827            .args(["--initial-balance", &initial_balance.to_string()]);
828
829        if let Some(owner) = owner {
830            command.args(["--owner", &owner.to_string()]);
831        }
832
833        if super_owner {
834            command.arg("--super-owner");
835        }
836
837        let stdout = command.spawn_and_wait_for_stdout().await?;
838        let mut split = stdout.split('\n');
839        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
840        let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
841        if let Some(owner) = owner {
842            assert_eq!(owner, new_owner);
843        }
844        Ok((chain_id, new_owner))
845    }
846
847    /// Runs `linera open-chain --super-owner`.
848    pub async fn open_chain_super_owner(
849        &self,
850        from: ChainId,
851        owner: Option<AccountOwner>,
852        initial_balance: Amount,
853    ) -> Result<(ChainId, AccountOwner)> {
854        self.open_chain_internal(from, owner, initial_balance, true)
855            .await
856    }
857
858    /// Runs `linera open-chain`.
859    pub async fn open_chain(
860        &self,
861        from: ChainId,
862        owner: Option<AccountOwner>,
863        initial_balance: Amount,
864    ) -> Result<(ChainId, AccountOwner)> {
865        self.open_chain_internal(from, owner, initial_balance, false)
866            .await
867    }
868
869    /// Runs `linera open-chain` then `linera assign`.
870    pub async fn open_and_assign(
871        &self,
872        client: &ClientWrapper,
873        initial_balance: Amount,
874    ) -> Result<ChainId> {
875        let our_chain = self
876            .load_wallet()?
877            .default_chain()
878            .context("no default chain found")?;
879        let owner = client.keygen().await?;
880        let (new_chain, _) = self
881            .open_chain(our_chain, Some(owner), initial_balance)
882            .await?;
883        client.assign(owner, new_chain).await?;
884        Ok(new_chain)
885    }
886
887    pub async fn open_multi_owner_chain(
888        &self,
889        from: ChainId,
890        owners: Vec<AccountOwner>,
891        weights: Vec<u64>,
892        multi_leader_rounds: u32,
893        balance: Amount,
894        base_timeout_ms: u64,
895    ) -> Result<ChainId> {
896        let mut command = self.command().await?;
897        command
898            .arg("open-multi-owner-chain")
899            .args(["--from", &from.to_string()])
900            .arg("--owners")
901            .args(owners.iter().map(AccountOwner::to_string))
902            .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
903        if !weights.is_empty() {
904            command
905                .arg("--owner-weights")
906                .args(weights.iter().map(u64::to_string));
907        };
908        command
909            .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
910            .args(["--initial-balance", &balance.to_string()]);
911
912        let stdout = command.spawn_and_wait_for_stdout().await?;
913        let mut split = stdout.split('\n');
914        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
915
916        Ok(chain_id)
917    }
918
919    pub async fn change_ownership(
920        &self,
921        chain_id: ChainId,
922        super_owners: Vec<AccountOwner>,
923        owners: Vec<AccountOwner>,
924    ) -> Result<()> {
925        let mut command = self.command().await?;
926        command
927            .arg("change-ownership")
928            .args(["--chain-id", &chain_id.to_string()]);
929        if !super_owners.is_empty() {
930            command
931                .arg("--super-owners")
932                .args(super_owners.iter().map(AccountOwner::to_string));
933        }
934        if !owners.is_empty() {
935            command
936                .arg("--owners")
937                .args(owners.iter().map(AccountOwner::to_string));
938        }
939        command.spawn_and_wait_for_stdout().await?;
940        Ok(())
941    }
942
943    /// Runs `linera wallet follow-chain CHAIN_ID`.
944    pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
945        let mut command = self.command().await?;
946        command
947            .args(["wallet", "follow-chain"])
948            .arg(chain_id.to_string());
949        if sync {
950            command.arg("--sync");
951        }
952        command.spawn_and_wait_for_stdout().await?;
953        Ok(())
954    }
955
956    /// Runs `linera wallet forget-chain CHAIN_ID`.
957    pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
958        let mut command = self.command().await?;
959        command
960            .args(["wallet", "forget-chain"])
961            .arg(chain_id.to_string());
962        command.spawn_and_wait_for_stdout().await?;
963        Ok(())
964    }
965
966    /// Runs `linera wallet set-default CHAIN_ID`.
967    pub async fn set_default_chain(&self, chain_id: ChainId) -> Result<()> {
968        let mut command = self.command().await?;
969        command
970            .args(["wallet", "set-default"])
971            .arg(chain_id.to_string());
972        command.spawn_and_wait_for_stdout().await?;
973        Ok(())
974    }
975
976    pub async fn retry_pending_block(
977        &self,
978        chain_id: Option<ChainId>,
979    ) -> Result<Option<CryptoHash>> {
980        let mut command = self.command().await?;
981        command.arg("retry-pending-block");
982        if let Some(chain_id) = chain_id {
983            command.arg(chain_id.to_string());
984        }
985        let stdout = command.spawn_and_wait_for_stdout().await?;
986        let stdout = stdout.trim();
987        if stdout.is_empty() {
988            Ok(None)
989        } else {
990            Ok(Some(CryptoHash::from_str(stdout)?))
991        }
992    }
993
994    /// Runs `linera publish-data-blob`.
995    pub async fn publish_data_blob(
996        &self,
997        path: &Path,
998        chain_id: Option<ChainId>,
999    ) -> Result<CryptoHash> {
1000        let mut command = self.command().await?;
1001        command.arg("publish-data-blob").arg(path);
1002        if let Some(chain_id) = chain_id {
1003            command.arg(chain_id.to_string());
1004        }
1005        let stdout = command.spawn_and_wait_for_stdout().await?;
1006        let stdout = stdout.trim();
1007        Ok(CryptoHash::from_str(stdout)?)
1008    }
1009
1010    /// Runs `linera read-data-blob`.
1011    pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
1012        let mut command = self.command().await?;
1013        command.arg("read-data-blob").arg(hash.to_string());
1014        if let Some(chain_id) = chain_id {
1015            command.arg(chain_id.to_string());
1016        }
1017        command.spawn_and_wait_for_stdout().await?;
1018        Ok(())
1019    }
1020
1021    pub fn load_wallet(&self) -> Result<Wallet> {
1022        Ok(Wallet::read(&self.wallet_path())?)
1023    }
1024
1025    pub fn load_keystore(&self) -> Result<InMemorySigner> {
1026        util::read_json(self.keystore_path())
1027    }
1028
1029    pub fn wallet_path(&self) -> PathBuf {
1030        self.path_provider.path().join(&self.wallet)
1031    }
1032
1033    pub fn keystore_path(&self) -> PathBuf {
1034        self.path_provider.path().join(&self.keystore)
1035    }
1036
1037    pub fn storage_path(&self) -> &str {
1038        &self.storage
1039    }
1040
1041    pub fn get_owner(&self) -> Option<AccountOwner> {
1042        let wallet = self.load_wallet().ok()?;
1043        wallet
1044            .get(wallet.default_chain()?)
1045            .expect("default chain must be in wallet")
1046            .owner
1047    }
1048
1049    pub fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
1050        self.load_wallet()
1051            .ok()
1052            .is_some_and(|wallet| wallet.get(chain).is_some())
1053    }
1054
1055    pub async fn set_validator(
1056        &self,
1057        validator_key: &(String, String),
1058        port: usize,
1059        votes: usize,
1060    ) -> Result<()> {
1061        let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1062        self.command()
1063            .await?
1064            .arg("validator")
1065            .arg("add")
1066            .args(["--public-key", &validator_key.0])
1067            .args(["--account-key", &validator_key.1])
1068            .args(["--address", &address])
1069            .args(["--votes", &votes.to_string()])
1070            .spawn_and_wait_for_stdout()
1071            .await?;
1072        Ok(())
1073    }
1074
1075    pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
1076        self.command()
1077            .await?
1078            .arg("validator")
1079            .arg("remove")
1080            .args(["--public-key", validator_key])
1081            .spawn_and_wait_for_stdout()
1082            .await?;
1083        Ok(())
1084    }
1085
1086    pub async fn change_validators(
1087        &self,
1088        add_validators: &[(String, String, usize, usize)], // (public_key, account_key, port, votes)
1089        modify_validators: &[(String, String, usize, usize)], // (public_key, account_key, port, votes)
1090        remove_validators: &[String],
1091    ) -> Result<()> {
1092        use std::str::FromStr;
1093
1094        use linera_base::crypto::{AccountPublicKey, ValidatorPublicKey};
1095
1096        // Build a map that will be serialized to JSON
1097        // Use the exact types that deserialization expects
1098        let mut changes = std::collections::HashMap::new();
1099
1100        // Add/modify validators
1101        for (public_key_str, account_key_str, port, votes) in
1102            add_validators.iter().chain(modify_validators.iter())
1103        {
1104            let public_key = ValidatorPublicKey::from_str(public_key_str)
1105                .with_context(|| format!("Invalid validator public key: {}", public_key_str))?;
1106
1107            let account_key = AccountPublicKey::from_str(account_key_str)
1108                .with_context(|| format!("Invalid account public key: {}", account_key_str))?;
1109
1110            let address = format!("{}:127.0.0.1:{}", self.network.short(), port)
1111                .parse()
1112                .unwrap();
1113
1114            // Create ValidatorChange struct
1115            let change = crate::cli::validator::Change {
1116                account_key,
1117                address,
1118                votes: crate::cli::validator::Votes(
1119                    std::num::NonZero::new(*votes as u64).context("Votes must be non-zero")?,
1120                ),
1121            };
1122
1123            changes.insert(public_key, Some(change));
1124        }
1125
1126        // Remove validators (set to None)
1127        for validator_key_str in remove_validators {
1128            let public_key = ValidatorPublicKey::from_str(validator_key_str)
1129                .with_context(|| format!("Invalid validator public key: {}", validator_key_str))?;
1130            changes.insert(public_key, None);
1131        }
1132
1133        // Create temporary file with JSON
1134        let temp_file = tempfile::NamedTempFile::new()
1135            .context("Failed to create temporary file for validator changes")?;
1136        serde_json::to_writer(&temp_file, &changes)
1137            .context("Failed to write validator changes to file")?;
1138        let temp_path = temp_file.path();
1139
1140        self.command()
1141            .await?
1142            .arg("validator")
1143            .arg("update")
1144            .arg(temp_path)
1145            .arg("--yes") // Skip confirmation prompt
1146            .spawn_and_wait_for_stdout()
1147            .await?;
1148
1149        Ok(())
1150    }
1151
1152    pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
1153        self.command()
1154            .await?
1155            .arg("revoke-epochs")
1156            .arg(epoch.to_string())
1157            .spawn_and_wait_for_stdout()
1158            .await?;
1159        Ok(())
1160    }
1161
1162    /// Runs `linera keygen`.
1163    pub async fn keygen(&self) -> Result<AccountOwner> {
1164        let stdout = self
1165            .command()
1166            .await?
1167            .arg("keygen")
1168            .spawn_and_wait_for_stdout()
1169            .await?;
1170        AccountOwner::from_str(stdout.as_str().trim())
1171    }
1172
1173    /// Returns the default chain.
1174    pub fn default_chain(&self) -> Option<ChainId> {
1175        self.load_wallet().ok()?.default_chain()
1176    }
1177
1178    /// Runs `linera assign`.
1179    pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
1180        let _stdout = self
1181            .command()
1182            .await?
1183            .arg("assign")
1184            .args(["--owner", &owner.to_string()])
1185            .args(["--chain-id", &chain_id.to_string()])
1186            .spawn_and_wait_for_stdout()
1187            .await?;
1188        Ok(())
1189    }
1190
1191    /// Runs `linera set-preferred-owner` for `chain_id`.
1192    pub async fn set_preferred_owner(
1193        &self,
1194        chain_id: ChainId,
1195        owner: Option<AccountOwner>,
1196    ) -> Result<()> {
1197        let mut owner_arg = vec!["--owner".to_string()];
1198        if let Some(owner) = owner {
1199            owner_arg.push(owner.to_string());
1200        };
1201        self.command()
1202            .await?
1203            .arg("set-preferred-owner")
1204            .args(["--chain-id", &chain_id.to_string()])
1205            .args(owner_arg)
1206            .spawn_and_wait_for_stdout()
1207            .await?;
1208        Ok(())
1209    }
1210
1211    pub async fn build_application(
1212        &self,
1213        path: &Path,
1214        name: &str,
1215        is_workspace: bool,
1216    ) -> Result<(PathBuf, PathBuf)> {
1217        Command::new("cargo")
1218            .current_dir(self.path_provider.path())
1219            .arg("build")
1220            .arg("--release")
1221            .args(["--target", "wasm32-unknown-unknown"])
1222            .arg("--manifest-path")
1223            .arg(path.join("Cargo.toml"))
1224            .spawn_and_wait_for_stdout()
1225            .await?;
1226
1227        let release_dir = match is_workspace {
1228            true => path.join("../target/wasm32-unknown-unknown/release"),
1229            false => path.join("target/wasm32-unknown-unknown/release"),
1230        };
1231
1232        let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1233        let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1234
1235        let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1236        let service_size = fs_err::tokio::metadata(&service).await?.len();
1237        tracing::info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1238
1239        Ok((contract, service))
1240    }
1241}
1242
1243impl Drop for ClientWrapper {
1244    fn drop(&mut self) {
1245        use std::process::Command as SyncCommand;
1246
1247        if self.on_drop != OnClientDrop::CloseChains {
1248            return;
1249        }
1250
1251        let Ok(binary_path) = self.binary_path.lock() else {
1252            tracing::error!(
1253                "Failed to close chains because a thread panicked with a lock to `binary_path`"
1254            );
1255            return;
1256        };
1257
1258        let Some(binary_path) = binary_path.as_ref() else {
1259            tracing::warn!(
1260                "Assuming no chains need to be closed, because the command binary was never \
1261                resolved and therefore presumably never called"
1262            );
1263            return;
1264        };
1265
1266        let working_directory = self.path_provider.path();
1267        let mut wallet_show_command = SyncCommand::new(binary_path);
1268
1269        for argument in self.command_arguments() {
1270            wallet_show_command.arg(&*argument);
1271        }
1272
1273        let Ok(wallet_show_output) = wallet_show_command
1274            .current_dir(working_directory)
1275            .args(["wallet", "show", "--short", "--owned"])
1276            .output()
1277        else {
1278            tracing::warn!("Failed to execute `wallet show --short` to list chains to close");
1279            return;
1280        };
1281
1282        if !wallet_show_output.status.success() {
1283            tracing::warn!("Failed to list chains in the wallet to close them");
1284            return;
1285        }
1286
1287        let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1288            tracing::warn!(
1289                "Failed to close chains because `linera wallet show --short` \
1290                returned a non-UTF-8 output"
1291            );
1292            return;
1293        };
1294
1295        let chain_ids = chain_list_string
1296            .split('\n')
1297            .map(|line| line.trim())
1298            .filter(|line| !line.is_empty());
1299
1300        for chain_id in chain_ids {
1301            let mut close_chain_command = SyncCommand::new(binary_path);
1302
1303            for argument in self.command_arguments() {
1304                close_chain_command.arg(&*argument);
1305            }
1306
1307            close_chain_command.current_dir(working_directory);
1308
1309            match close_chain_command.args(["close-chain", chain_id]).status() {
1310                Ok(status) if status.success() => (),
1311                Ok(failure) => tracing::warn!("Failed to close chain {chain_id}: {failure}"),
1312                Err(error) => tracing::warn!("Failed to close chain {chain_id}: {error}"),
1313            }
1314        }
1315    }
1316}
1317
1318#[cfg(with_testing)]
1319impl ClientWrapper {
1320    pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1321        self.build_application(Self::example_path(name)?.as_path(), name, true)
1322            .await
1323    }
1324
1325    pub fn example_path(name: &str) -> Result<PathBuf> {
1326        Ok(env::current_dir()?.join("../examples/").join(name))
1327    }
1328}
1329
1330fn truncate_query_output(input: &str) -> String {
1331    let max_len = 1000;
1332    if input.len() < max_len {
1333        input.to_string()
1334    } else {
1335        format!("{} ...", input.get(..max_len).unwrap())
1336    }
1337}
1338
1339fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1340    let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1341    let max_len = 200;
1342    if query.len() < max_len {
1343        query
1344    } else {
1345        format!("{} ...", query.get(..max_len).unwrap())
1346    }
1347}
1348
1349/// A running node service.
1350pub struct NodeService {
1351    port: u16,
1352    child: Child,
1353}
1354
1355impl NodeService {
1356    fn new(port: u16, child: Child) -> Self {
1357        Self { port, child }
1358    }
1359
1360    pub async fn terminate(mut self) -> Result<()> {
1361        self.child.kill().await.context("terminating node service")
1362    }
1363
1364    pub fn port(&self) -> u16 {
1365        self.port
1366    }
1367
1368    pub fn ensure_is_running(&mut self) -> Result<()> {
1369        self.child.ensure_is_running()
1370    }
1371
1372    pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1373        let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1374        let mut data = self.query_node(query).await?;
1375        Ok(serde_json::from_value(data["processInbox"].take())?)
1376    }
1377
1378    pub async fn sync(&self, chain_id: &ChainId) -> Result<u64> {
1379        let query = format!("mutation {{ sync(chainId: \"{chain_id}\") }}");
1380        let mut data = self.query_node(query).await?;
1381        Ok(serde_json::from_value(data["sync"].take())?)
1382    }
1383
1384    pub async fn transfer(
1385        &self,
1386        chain_id: ChainId,
1387        owner: AccountOwner,
1388        recipient: Account,
1389        amount: Amount,
1390    ) -> Result<CryptoHash> {
1391        let json_owner = owner.to_value();
1392        let json_recipient = recipient.to_value();
1393        let query = format!(
1394            "mutation {{ transfer(\
1395                 chainId: \"{chain_id}\", \
1396                 owner: {json_owner}, \
1397                 recipient: {json_recipient}, \
1398                 amount: \"{amount}\") \
1399             }}"
1400        );
1401        let data = self.query_node(query).await?;
1402        serde_json::from_value(data["transfer"].clone())
1403            .context("missing transfer field in response")
1404    }
1405
1406    pub async fn balance(&self, account: &Account) -> Result<Amount> {
1407        let chain = account.chain_id;
1408        let owner = account.owner;
1409        if matches!(owner, AccountOwner::CHAIN) {
1410            let query = format!(
1411                "query {{ chain(chainId:\"{chain}\") {{
1412                    executionState {{ system {{ balance }} }}
1413                }} }}"
1414            );
1415            let response = self.query_node(query).await?;
1416            let balance = &response["chain"]["executionState"]["system"]["balance"]
1417                .as_str()
1418                .unwrap();
1419            return Ok(Amount::from_str(balance)?);
1420        }
1421        let query = format!(
1422            "query {{ chain(chainId:\"{chain}\") {{
1423                executionState {{ system {{ balances {{
1424                    entry(key:\"{owner}\") {{ value }}
1425                }} }} }}
1426            }} }}"
1427        );
1428        let response = self.query_node(query).await?;
1429        let balances = &response["chain"]["executionState"]["system"]["balances"];
1430        let balance = balances["entry"]["value"].as_str();
1431        match balance {
1432            None => Ok(Amount::ZERO),
1433            Some(amount) => Ok(Amount::from_str(amount)?),
1434        }
1435    }
1436
1437    pub fn make_application<A: ContractAbi>(
1438        &self,
1439        chain_id: &ChainId,
1440        application_id: &ApplicationId<A>,
1441    ) -> Result<ApplicationWrapper<A>> {
1442        let application_id = application_id.forget_abi().to_string();
1443        let link = format!(
1444            "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1445            self.port
1446        );
1447        Ok(ApplicationWrapper::from(link))
1448    }
1449
1450    pub async fn publish_data_blob(
1451        &self,
1452        chain_id: &ChainId,
1453        bytes: Vec<u8>,
1454    ) -> Result<CryptoHash> {
1455        let query = format!(
1456            "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1457            chain_id.to_value(),
1458            bytes.to_value(),
1459        );
1460        let data = self.query_node(query).await?;
1461        serde_json::from_value(data["publishDataBlob"].clone())
1462            .context("missing publishDataBlob field in response")
1463    }
1464
1465    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1466        &self,
1467        chain_id: &ChainId,
1468        contract: PathBuf,
1469        service: PathBuf,
1470        vm_runtime: VmRuntime,
1471    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1472        let contract_code = Bytecode::load_from_file(&contract)?;
1473        let service_code = Bytecode::load_from_file(&service)?;
1474        let query = format!(
1475            "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1476            chain_id.to_value(),
1477            contract_code.to_value(),
1478            service_code.to_value(),
1479            vm_runtime.to_value(),
1480        );
1481        let data = self.query_node(query).await?;
1482        let module_str = data["publishModule"]
1483            .as_str()
1484            .context("module ID not found")?;
1485        let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1486        Ok(module_id.with_abi())
1487    }
1488
1489    pub async fn query_committees(&self, chain_id: &ChainId) -> Result<BTreeMap<Epoch, Committee>> {
1490        let query = format!(
1491            "query {{ chain(chainId:\"{chain_id}\") {{
1492                executionState {{ system {{ committees }} }}
1493            }} }}"
1494        );
1495        let mut response = self.query_node(query).await?;
1496        let committees = response["chain"]["executionState"]["system"]["committees"].take();
1497        Ok(serde_json::from_value(committees)?)
1498    }
1499
1500    pub async fn events_from_index(
1501        &self,
1502        chain_id: &ChainId,
1503        stream_id: &StreamId,
1504        start_index: u32,
1505    ) -> Result<Vec<IndexAndEvent>> {
1506        let query = format!(
1507            "query {{
1508               eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1509               {{ index event }}
1510             }}",
1511            stream_id.to_value()
1512        );
1513        let mut response = self.query_node(query).await?;
1514        let response = response["eventsFromIndex"].take();
1515        Ok(serde_json::from_value(response)?)
1516    }
1517
1518    pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1519        let n_try = 5;
1520        let query = query.as_ref();
1521        for i in 0..n_try {
1522            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1523            let url = format!("http://localhost:{}/", self.port);
1524            let client = reqwest_client();
1525            let result = client
1526                .post(url)
1527                .json(&json!({ "query": query }))
1528                .send()
1529                .await;
1530            if matches!(result, Err(ref error) if error.is_timeout()) {
1531                tracing::warn!(
1532                    "Timeout when sending query {} to the node service",
1533                    truncate_query_output(query)
1534                );
1535                continue;
1536            }
1537            let response = result.with_context(|| {
1538                format!(
1539                    "query_node: failed to post query={}",
1540                    truncate_query_output(query)
1541                )
1542            })?;
1543            ensure!(
1544                response.status().is_success(),
1545                "Query \"{}\" failed: {}",
1546                truncate_query_output(query),
1547                response
1548                    .text()
1549                    .await
1550                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1551            );
1552            let value: Value = response.json().await.context("invalid JSON")?;
1553            if let Some(errors) = value.get("errors") {
1554                tracing::warn!(
1555                    "Query \"{}\" failed: {}",
1556                    truncate_query_output(query),
1557                    errors
1558                );
1559            } else {
1560                return Ok(value["data"].clone());
1561            }
1562        }
1563        bail!(
1564            "Query \"{}\" failed after {} retries.",
1565            truncate_query_output(query),
1566            n_try
1567        );
1568    }
1569
1570    pub async fn create_application<
1571        Abi: ContractAbi,
1572        Parameters: Serialize,
1573        InstantiationArgument: Serialize,
1574    >(
1575        &self,
1576        chain_id: &ChainId,
1577        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1578        parameters: &Parameters,
1579        argument: &InstantiationArgument,
1580        required_application_ids: &[ApplicationId],
1581    ) -> Result<ApplicationId<Abi>> {
1582        let module_id = module_id.forget_abi();
1583        let json_required_applications_ids = required_application_ids
1584            .iter()
1585            .map(ApplicationId::to_string)
1586            .collect::<Vec<_>>()
1587            .to_value();
1588        // Convert to `serde_json::Value` then `async_graphql::Value` via the trait `InputType`.
1589        let new_parameters = serde_json::to_value(parameters)
1590            .context("could not create parameters JSON")?
1591            .to_value();
1592        let new_argument = serde_json::to_value(argument)
1593            .context("could not create argument JSON")?
1594            .to_value();
1595        let query = format!(
1596            "mutation {{ createApplication(\
1597                 chainId: \"{chain_id}\",
1598                 moduleId: \"{module_id}\", \
1599                 parameters: {new_parameters}, \
1600                 instantiationArgument: {new_argument}, \
1601                 requiredApplicationIds: {json_required_applications_ids}) \
1602             }}"
1603        );
1604        let data = self.query_node(query).await?;
1605        let app_id_str = data["createApplication"]
1606            .as_str()
1607            .context("missing createApplication string in response")?
1608            .trim();
1609        Ok(app_id_str
1610            .parse::<ApplicationId>()
1611            .context("invalid application ID")?
1612            .with_abi())
1613    }
1614
1615    /// Obtains the hash and height of the `chain`'s tip block, as known by this node service.
1616    pub async fn chain_tip(&self, chain: ChainId) -> Result<Option<(CryptoHash, BlockHeight)>> {
1617        let query = format!(
1618            r#"query {{ block(chainId: "{chain}") {{
1619                hash
1620                block {{ header {{ height }} }}
1621            }} }}"#
1622        );
1623
1624        let mut response = self.query_node(&query).await?;
1625
1626        match (
1627            mem::take(&mut response["block"]["hash"]),
1628            mem::take(&mut response["block"]["block"]["header"]["height"]),
1629        ) {
1630            (Value::Null, Value::Null) => Ok(None),
1631            (Value::String(hash), Value::Number(height)) => Ok(Some((
1632                hash.parse()
1633                    .context("Received an invalid hash {hash:?} for chain tip")?,
1634                BlockHeight(height.as_u64().unwrap()),
1635            ))),
1636            invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
1637        }
1638    }
1639
1640    /// Subscribes to the node service and returns a stream of notifications about a chain.
1641    pub async fn notifications(
1642        &self,
1643        chain_id: ChainId,
1644    ) -> Result<Pin<Box<impl Stream<Item = Result<Notification>>>>> {
1645        let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
1646        let url = format!("ws://localhost:{}/ws", self.port);
1647        let mut request = url.into_client_request()?;
1648        request.headers_mut().insert(
1649            "Sec-WebSocket-Protocol",
1650            HeaderValue::from_str("graphql-transport-ws")?,
1651        );
1652        let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1653        let init_json = json!({
1654          "type": "connection_init",
1655          "payload": {}
1656        });
1657        websocket.send(init_json.to_string().into()).await?;
1658        let text = websocket
1659            .next()
1660            .await
1661            .context("Failed to establish connection")??
1662            .into_text()?;
1663        ensure!(
1664            text == "{\"type\":\"connection_ack\"}",
1665            "Unexpected response: {text}"
1666        );
1667        let query_json = json!({
1668          "id": "1",
1669          "type": "start",
1670          "payload": {
1671            "query": query,
1672            "variables": {},
1673            "operationName": null
1674          }
1675        });
1676        websocket.send(query_json.to_string().into()).await?;
1677        Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
1678            |message| async {
1679                let text = message.into_text()?;
1680                let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1681                if let Some(errors) = value["payload"].get("errors") {
1682                    bail!("Notification subscription failed: {errors:?}");
1683                }
1684                serde_json::from_value(value["payload"]["data"]["notifications"].clone())
1685                    .context("Failed to deserialize notification")
1686            },
1687        )))
1688    }
1689}
1690
1691/// A running faucet service.
1692pub struct FaucetService {
1693    port: u16,
1694    child: Child,
1695    _temp_dir: tempfile::TempDir,
1696}
1697
1698impl FaucetService {
1699    fn new(port: u16, child: Child, temp_dir: tempfile::TempDir) -> Self {
1700        Self {
1701            port,
1702            child,
1703            _temp_dir: temp_dir,
1704        }
1705    }
1706
1707    pub async fn terminate(mut self) -> Result<()> {
1708        self.child
1709            .kill()
1710            .await
1711            .context("terminating faucet service")
1712    }
1713
1714    pub fn ensure_is_running(&mut self) -> Result<()> {
1715        self.child.ensure_is_running()
1716    }
1717
1718    pub fn instance(&self) -> Faucet {
1719        Faucet::new(format!("http://localhost:{}/", self.port))
1720    }
1721}
1722
1723/// A running `Application` to be queried in GraphQL.
1724pub struct ApplicationWrapper<A> {
1725    uri: String,
1726    _phantom: PhantomData<A>,
1727}
1728
1729impl<A> ApplicationWrapper<A> {
1730    pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
1731        let query = query.as_ref();
1732        let value = self.run_json_query(json!({ "query": query })).await?;
1733        Ok(value["data"].clone())
1734    }
1735
1736    pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
1737        const MAX_RETRIES: usize = 5;
1738
1739        for i in 0.. {
1740            let client = reqwest_client();
1741            let result = client.post(&self.uri).json(&query).send().await;
1742            let response = match result {
1743                Ok(response) => response,
1744                Err(error) if i < MAX_RETRIES => {
1745                    tracing::warn!(
1746                        "Failed to post query \"{}\": {error}; retrying",
1747                        truncate_query_output_serialize(&query),
1748                    );
1749                    continue;
1750                }
1751                Err(error) => {
1752                    let query = truncate_query_output_serialize(&query);
1753                    return Err(error)
1754                        .with_context(|| format!("run_json_query: failed to post query={query}"));
1755                }
1756            };
1757            ensure!(
1758                response.status().is_success(),
1759                "Query \"{}\" failed: {}",
1760                truncate_query_output_serialize(&query),
1761                response
1762                    .text()
1763                    .await
1764                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1765            );
1766            let value: Value = response.json().await.context("invalid JSON")?;
1767            if let Some(errors) = value.get("errors") {
1768                bail!(
1769                    "Query \"{}\" failed: {}",
1770                    truncate_query_output_serialize(&query),
1771                    errors
1772                );
1773            }
1774            return Ok(value);
1775        }
1776        unreachable!()
1777    }
1778
1779    pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
1780        let query = query.as_ref();
1781        self.run_graphql_query(&format!("query {{ {query} }}"))
1782            .await
1783    }
1784
1785    pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
1786        let query = query.as_ref().trim();
1787        let name = query
1788            .split_once(|ch: char| !ch.is_alphanumeric())
1789            .map_or(query, |(name, _)| name);
1790        let data = self.query(query).await?;
1791        serde_json::from_value(data[name].clone())
1792            .with_context(|| format!("{name} field missing in response"))
1793    }
1794
1795    pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
1796        let mutation = mutation.as_ref();
1797        self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
1798            .await
1799    }
1800
1801    pub async fn multiple_mutate(&self, mutations: &[String]) -> Result<Value> {
1802        let mut out = String::from("mutation {\n");
1803        for (index, mutation) in mutations.iter().enumerate() {
1804            out = format!("{}  u{}: {}\n", out, index, mutation);
1805        }
1806        out.push_str("}\n");
1807        self.run_graphql_query(&out).await
1808    }
1809}
1810
1811impl<A> From<String> for ApplicationWrapper<A> {
1812    fn from(uri: String) -> ApplicationWrapper<A> {
1813        ApplicationWrapper {
1814            uri,
1815            _phantom: PhantomData,
1816        }
1817    }
1818}
1819
1820/// Returns the timeout for tests that wait for notifications, either read from the env
1821/// variable `LINERA_TEST_NOTIFICATION_TIMEOUT_MS`, or the default value of 10 seconds.
1822#[cfg(with_testing)]
1823fn notification_timeout() -> Duration {
1824    const NOTIFICATION_TIMEOUT_MS_ENV: &str = "LINERA_TEST_NOTIFICATION_TIMEOUT_MS";
1825    const NOTIFICATION_TIMEOUT_MS_DEFAULT: u64 = 10_000;
1826
1827    match env::var(NOTIFICATION_TIMEOUT_MS_ENV) {
1828        Ok(var) => Duration::from_millis(var.parse().unwrap_or_else(|error| {
1829            panic!("{NOTIFICATION_TIMEOUT_MS_ENV} is not a valid number: {error}")
1830        })),
1831        Err(env::VarError::NotPresent) => Duration::from_millis(NOTIFICATION_TIMEOUT_MS_DEFAULT),
1832        Err(env::VarError::NotUnicode(_)) => {
1833            panic!("{NOTIFICATION_TIMEOUT_MS_ENV} must be valid Unicode")
1834        }
1835    }
1836}
1837
1838#[cfg(with_testing)]
1839pub trait NotificationsExt {
1840    /// Waits for a notification for which `f` returns `Some(t)`, and returns `t`.
1841    fn wait_for<T>(
1842        &mut self,
1843        f: impl FnMut(Notification) -> Option<T>,
1844    ) -> impl Future<Output = Result<T>>;
1845
1846    /// Waits for a `NewEvents` notification for the given block height. If no height is specified,
1847    /// any height is accepted.
1848    fn wait_for_events(
1849        &mut self,
1850        expected_height: impl Into<Option<BlockHeight>>,
1851    ) -> impl Future<Output = Result<BTreeSet<StreamId>>> {
1852        let expected_height = expected_height.into();
1853        self.wait_for(move |notification| {
1854            if let Reason::NewEvents {
1855                height,
1856                event_streams,
1857                ..
1858            } = notification.reason
1859            {
1860                if expected_height.is_none_or(|h| h == height) {
1861                    return Some(event_streams);
1862                }
1863            }
1864            None
1865        })
1866    }
1867
1868    /// Waits for a `NewBlock` notification for the given block height. If no height is specified,
1869    /// any height is accepted.
1870    fn wait_for_block(
1871        &mut self,
1872        expected_height: impl Into<Option<BlockHeight>>,
1873    ) -> impl Future<Output = Result<CryptoHash>> {
1874        let expected_height = expected_height.into();
1875        self.wait_for(move |notification| {
1876            if let Reason::NewBlock { height, hash, .. } = notification.reason {
1877                if expected_height.is_none_or(|h| h == height) {
1878                    return Some(hash);
1879                }
1880            }
1881            None
1882        })
1883    }
1884
1885    /// Waits for a `NewIncomingBundle` notification for the given sender chain and sender block
1886    /// height. If no height is specified, any height is accepted.
1887    fn wait_for_bundle(
1888        &mut self,
1889        expected_origin: ChainId,
1890        expected_height: impl Into<Option<BlockHeight>>,
1891    ) -> impl Future<Output = Result<()>> {
1892        let expected_height = expected_height.into();
1893        self.wait_for(move |notification| {
1894            if let Reason::NewIncomingBundle { height, origin } = notification.reason {
1895                if expected_height.is_none_or(|h| h == height) && origin == expected_origin {
1896                    return Some(());
1897                }
1898            }
1899            None
1900        })
1901    }
1902}
1903
1904#[cfg(with_testing)]
1905impl<S: Stream<Item = Result<Notification>>> NotificationsExt for Pin<Box<S>> {
1906    async fn wait_for<T>(&mut self, mut f: impl FnMut(Notification) -> Option<T>) -> Result<T> {
1907        let mut timeout = Box::pin(linera_base::time::timer::sleep(notification_timeout())).fuse();
1908        loop {
1909            let notification = futures::select! {
1910                () = timeout => bail!("Timeout waiting for notification"),
1911                notification = self.next().fuse() => notification.context("Stream closed")??,
1912            };
1913            if let Some(t) = f(notification) {
1914                return Ok(t);
1915            }
1916        }
1917    }
1918}