linera_service/cli_wrappers/
wallet.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    borrow::Cow,
6    collections::BTreeMap,
7    env,
8    marker::PhantomData,
9    mem,
10    path::{Path, PathBuf},
11    pin::Pin,
12    process::Stdio,
13    str::FromStr,
14    sync,
15    time::Duration,
16};
17
18use anyhow::{bail, ensure, Context, Result};
19use async_graphql::InputType;
20use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
21use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
22use heck::ToKebabCase;
23use linera_base::{
24    abi::ContractAbi,
25    command::{resolve_binary, CommandExt},
26    crypto::{CryptoHash, InMemorySigner},
27    data_types::{Amount, ApplicationPermissions, BlockHeight, Bytecode, Epoch},
28    identifiers::{
29        Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
30    },
31    vm::VmRuntime,
32};
33use linera_client::client_options::ResourceControlPolicyConfig;
34use linera_core::worker::Notification;
35use linera_execution::committee::Committee;
36use linera_faucet_client::Faucet;
37use serde::{de::DeserializeOwned, ser::Serialize};
38use serde_command_opts::to_args;
39use serde_json::{json, Value};
40use tempfile::TempDir;
41use tokio::{
42    io::{AsyncBufReadExt, BufReader},
43    process::{Child, Command},
44    sync::oneshot,
45    task::JoinHandle,
46};
47#[cfg(with_testing)]
48use {
49    futures::FutureExt as _,
50    linera_core::worker::Reason,
51    std::{collections::BTreeSet, future::Future},
52};
53
54use crate::{
55    cli::command::BenchmarkCommand,
56    cli_wrappers::{
57        local_net::{PathProvider, ProcessInbox},
58        Network,
59    },
60    util::{self, ChildExt},
61    wallet::Wallet,
62};
63
64/// The name of the environment variable that allows specifying additional arguments to be passed
65/// to the node-service command of the client.
66const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
67
68fn reqwest_client() -> reqwest::Client {
69    reqwest::ClientBuilder::new()
70        .timeout(Duration::from_secs(30))
71        .build()
72        .unwrap()
73}
74
75/// Wrapper to run a Linera client command.
76pub struct ClientWrapper {
77    binary_path: sync::Mutex<Option<PathBuf>>,
78    testing_prng_seed: Option<u64>,
79    storage: String,
80    wallet: String,
81    keystore: String,
82    max_pending_message_bundles: usize,
83    network: Network,
84    pub path_provider: PathProvider,
85    on_drop: OnClientDrop,
86    extra_args: Vec<String>,
87}
88
89/// Action to perform when the [`ClientWrapper`] is dropped.
90#[derive(Clone, Copy, Debug, Eq, PartialEq)]
91pub enum OnClientDrop {
92    /// Close all the chains on the wallet.
93    CloseChains,
94    /// Do not close any chains, leaving them active.
95    LeakChains,
96}
97
98impl ClientWrapper {
99    pub fn new(
100        path_provider: PathProvider,
101        network: Network,
102        testing_prng_seed: Option<u64>,
103        id: usize,
104        on_drop: OnClientDrop,
105    ) -> Self {
106        Self::new_with_extra_args(
107            path_provider,
108            network,
109            testing_prng_seed,
110            id,
111            on_drop,
112            vec!["--wait-for-outgoing-messages".to_string()],
113            None,
114        )
115    }
116
117    pub fn new_with_extra_args(
118        path_provider: PathProvider,
119        network: Network,
120        testing_prng_seed: Option<u64>,
121        id: usize,
122        on_drop: OnClientDrop,
123        extra_args: Vec<String>,
124        binary_dir: Option<PathBuf>,
125    ) -> Self {
126        let storage = format!(
127            "rocksdb:{}/client_{}.db",
128            path_provider.path().display(),
129            id
130        );
131        let wallet = format!("wallet_{}.json", id);
132        let keystore = format!("keystore_{}.json", id);
133        let binary_path = binary_dir.map(|dir| dir.join("linera"));
134        Self {
135            binary_path: sync::Mutex::new(binary_path),
136            testing_prng_seed,
137            storage,
138            wallet,
139            keystore,
140            max_pending_message_bundles: 10_000,
141            network,
142            path_provider,
143            on_drop,
144            extra_args,
145        }
146    }
147
148    /// Runs `linera project new`.
149    pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
150        let tmp = TempDir::new()?;
151        let mut command = self.command().await?;
152        command
153            .current_dir(tmp.path())
154            .arg("project")
155            .arg("new")
156            .arg(project_name)
157            .arg("--linera-root")
158            .arg(linera_root)
159            .spawn_and_wait_for_stdout()
160            .await?;
161        Ok(tmp)
162    }
163
164    /// Runs `linera project publish`.
165    pub async fn project_publish<T: Serialize>(
166        &self,
167        path: PathBuf,
168        required_application_ids: Vec<String>,
169        publisher: impl Into<Option<ChainId>>,
170        argument: &T,
171    ) -> Result<String> {
172        let json_parameters = serde_json::to_string(&())?;
173        let json_argument = serde_json::to_string(argument)?;
174        let mut command = self.command().await?;
175        command
176            .arg("project")
177            .arg("publish-and-create")
178            .arg(path)
179            .args(publisher.into().iter().map(ChainId::to_string))
180            .args(["--json-parameters", &json_parameters])
181            .args(["--json-argument", &json_argument]);
182        if !required_application_ids.is_empty() {
183            command.arg("--required-application-ids");
184            command.args(required_application_ids);
185        }
186        let stdout = command.spawn_and_wait_for_stdout().await?;
187        Ok(stdout.trim().to_string())
188    }
189
190    /// Runs `linera project test`.
191    pub async fn project_test(&self, path: &Path) -> Result<()> {
192        self.command()
193            .await
194            .context("failed to create project test command")?
195            .current_dir(path)
196            .arg("project")
197            .arg("test")
198            .spawn_and_wait_for_stdout()
199            .await?;
200        Ok(())
201    }
202
203    async fn command_with_envs_and_arguments(
204        &self,
205        envs: &[(&str, &str)],
206        arguments: impl IntoIterator<Item = Cow<'_, str>>,
207    ) -> Result<Command> {
208        let mut command = self.command_binary().await?;
209        command.current_dir(self.path_provider.path());
210        for (key, value) in envs {
211            command.env(key, value);
212        }
213        for argument in arguments {
214            command.arg(&*argument);
215        }
216        Ok(command)
217    }
218
219    async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
220        self.command_with_envs_and_arguments(envs, self.command_arguments())
221            .await
222    }
223
224    async fn command_with_arguments(
225        &self,
226        arguments: impl IntoIterator<Item = Cow<'_, str>>,
227    ) -> Result<Command> {
228        self.command_with_envs_and_arguments(
229            &[(
230                "RUST_LOG",
231                &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
232            )],
233            arguments,
234        )
235        .await
236    }
237
238    async fn command(&self) -> Result<Command> {
239        self.command_with_envs(&[(
240            "RUST_LOG",
241            &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
242        )])
243        .await
244    }
245
246    fn required_command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
247        [
248            "--wallet".into(),
249            self.wallet.as_str().into(),
250            "--keystore".into(),
251            self.keystore.as_str().into(),
252            "--storage".into(),
253            self.storage.as_str().into(),
254            "--send-timeout-ms".into(),
255            "500000".into(),
256            "--recv-timeout-ms".into(),
257            "500000".into(),
258        ]
259        .into_iter()
260        .chain(self.extra_args.iter().map(|s| s.as_str().into()))
261    }
262
263    /// Returns an iterator over the arguments that should be added to all command invocations.
264    fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
265        self.required_command_arguments().chain([
266            "--max-pending-message-bundles".into(),
267            self.max_pending_message_bundles.to_string().into(),
268        ])
269    }
270
271    /// Returns the [`Command`] instance configured to run the appropriate binary.
272    ///
273    /// The path is resolved once and cached inside `self` for subsequent usages.
274    async fn command_binary(&self) -> Result<Command> {
275        match self.command_with_cached_binary_path() {
276            Some(command) => Ok(command),
277            None => {
278                let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
279                let command = Command::new(&resolved_path);
280
281                self.set_cached_binary_path(resolved_path);
282
283                Ok(command)
284            }
285        }
286    }
287
288    /// Returns a [`Command`] instance configured with the cached `binary_path`, if available.
289    fn command_with_cached_binary_path(&self) -> Option<Command> {
290        let binary_path = self.binary_path.lock().unwrap();
291
292        binary_path.as_ref().map(Command::new)
293    }
294
295    /// Sets the cached `binary_path` with the `new_binary_path`.
296    ///
297    /// # Panics
298    ///
299    /// If the cache is already set to a different value. In theory the two threads calling
300    /// `command_binary` can race and resolve the binary path twice, but they should always be the
301    /// same path.
302    fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
303        let mut binary_path = self.binary_path.lock().unwrap();
304
305        if binary_path.is_none() {
306            *binary_path = Some(new_binary_path);
307        } else {
308            assert_eq!(*binary_path, Some(new_binary_path));
309        }
310    }
311
312    /// Runs `linera create-genesis-config`.
313    pub async fn create_genesis_config(
314        &self,
315        num_other_initial_chains: u32,
316        initial_funding: Amount,
317        policy_config: ResourceControlPolicyConfig,
318        http_allow_list: Option<Vec<String>>,
319    ) -> Result<()> {
320        let mut command = self.command().await?;
321        command
322            .args([
323                "create-genesis-config",
324                &num_other_initial_chains.to_string(),
325            ])
326            .args(["--initial-funding", &initial_funding.to_string()])
327            .args(["--committee", "committee.json"])
328            .args(["--genesis", "genesis.json"])
329            .args([
330                "--policy-config",
331                &policy_config.to_string().to_kebab_case(),
332            ]);
333        if let Some(allow_list) = http_allow_list {
334            command
335                .arg("--http-request-allow-list")
336                .arg(allow_list.join(","));
337        }
338        if let Some(seed) = self.testing_prng_seed {
339            command.arg("--testing-prng-seed").arg(seed.to_string());
340        }
341        command.spawn_and_wait_for_stdout().await?;
342        Ok(())
343    }
344
345    /// Runs `linera wallet init`. The genesis config is read from `genesis.json`, or from the
346    /// faucet if provided.
347    pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
348        let mut command = self.command().await?;
349        command.args(["wallet", "init"]);
350        match faucet {
351            None => command.args(["--genesis", "genesis.json"]),
352            Some(faucet) => command.args(["--faucet", faucet.url()]),
353        };
354        if let Some(seed) = self.testing_prng_seed {
355            command.arg("--testing-prng-seed").arg(seed.to_string());
356        }
357        command.spawn_and_wait_for_stdout().await?;
358        Ok(())
359    }
360
361    /// Runs `linera wallet request-chain`.
362    pub async fn request_chain(
363        &self,
364        faucet: &Faucet,
365        set_default: bool,
366    ) -> Result<(ChainId, AccountOwner)> {
367        let mut command = self.command().await?;
368        command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
369        if set_default {
370            command.arg("--set-default");
371        }
372        let stdout = command.spawn_and_wait_for_stdout().await?;
373        let mut lines = stdout.split_whitespace();
374        let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
375        let owner = lines.next().context("missing chain owner")?.parse()?;
376        Ok((chain_id, owner))
377    }
378
379    /// Runs `linera wallet publish-and-create`.
380    #[expect(clippy::too_many_arguments)]
381    pub async fn publish_and_create<
382        A: ContractAbi,
383        Parameters: Serialize,
384        InstantiationArgument: Serialize,
385    >(
386        &self,
387        contract: PathBuf,
388        service: PathBuf,
389        vm_runtime: VmRuntime,
390        parameters: &Parameters,
391        argument: &InstantiationArgument,
392        required_application_ids: &[ApplicationId],
393        publisher: impl Into<Option<ChainId>>,
394    ) -> Result<ApplicationId<A>> {
395        let json_parameters = serde_json::to_string(parameters)?;
396        let json_argument = serde_json::to_string(argument)?;
397        let mut command = self.command().await?;
398        let vm_runtime = format!("{}", vm_runtime);
399        command
400            .arg("publish-and-create")
401            .args([contract, service])
402            .args(["--vm-runtime", &vm_runtime.to_lowercase()])
403            .args(publisher.into().iter().map(ChainId::to_string))
404            .args(["--json-parameters", &json_parameters])
405            .args(["--json-argument", &json_argument]);
406        if !required_application_ids.is_empty() {
407            command.arg("--required-application-ids");
408            command.args(
409                required_application_ids
410                    .iter()
411                    .map(ApplicationId::to_string),
412            );
413        }
414        let stdout = command.spawn_and_wait_for_stdout().await?;
415        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
416    }
417
418    /// Runs `linera publish-module`.
419    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
420        &self,
421        contract: PathBuf,
422        service: PathBuf,
423        vm_runtime: VmRuntime,
424        publisher: impl Into<Option<ChainId>>,
425    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
426        let stdout = self
427            .command()
428            .await?
429            .arg("publish-module")
430            .args([contract, service])
431            .args(["--vm-runtime", &format!("{}", vm_runtime).to_lowercase()])
432            .args(publisher.into().iter().map(ChainId::to_string))
433            .spawn_and_wait_for_stdout()
434            .await?;
435        let module_id: ModuleId = stdout.trim().parse()?;
436        Ok(module_id.with_abi())
437    }
438
439    /// Runs `linera create-application`.
440    pub async fn create_application<
441        Abi: ContractAbi,
442        Parameters: Serialize,
443        InstantiationArgument: Serialize,
444    >(
445        &self,
446        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
447        parameters: &Parameters,
448        argument: &InstantiationArgument,
449        required_application_ids: &[ApplicationId],
450        creator: impl Into<Option<ChainId>>,
451    ) -> Result<ApplicationId<Abi>> {
452        let json_parameters = serde_json::to_string(parameters)?;
453        let json_argument = serde_json::to_string(argument)?;
454        let mut command = self.command().await?;
455        command
456            .arg("create-application")
457            .arg(module_id.forget_abi().to_string())
458            .args(["--json-parameters", &json_parameters])
459            .args(["--json-argument", &json_argument])
460            .args(creator.into().iter().map(ChainId::to_string));
461        if !required_application_ids.is_empty() {
462            command.arg("--required-application-ids");
463            command.args(
464                required_application_ids
465                    .iter()
466                    .map(ApplicationId::to_string),
467            );
468        }
469        let stdout = command.spawn_and_wait_for_stdout().await?;
470        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
471    }
472
473    /// Runs `linera service`.
474    pub async fn run_node_service(
475        &self,
476        port: impl Into<Option<u16>>,
477        process_inbox: ProcessInbox,
478    ) -> Result<NodeService> {
479        self.run_node_service_with_options(port, process_inbox, &[], &[], false)
480            .await
481    }
482
483    /// Runs `linera service` with optional task processor configuration.
484    pub async fn run_node_service_with_options(
485        &self,
486        port: impl Into<Option<u16>>,
487        process_inbox: ProcessInbox,
488        operator_application_ids: &[ApplicationId],
489        operators: &[(String, PathBuf)],
490        read_only: bool,
491    ) -> Result<NodeService> {
492        self.run_node_service_with_all_options(
493            port,
494            process_inbox,
495            operator_application_ids,
496            operators,
497            read_only,
498            &[],
499        )
500        .await
501    }
502
503    /// Runs `linera service` with all available options.
504    pub async fn run_node_service_with_all_options(
505        &self,
506        port: impl Into<Option<u16>>,
507        process_inbox: ProcessInbox,
508        operator_application_ids: &[ApplicationId],
509        operators: &[(String, PathBuf)],
510        read_only: bool,
511        allowed_subscriptions: &[String],
512    ) -> Result<NodeService> {
513        let port = port.into().unwrap_or(8080);
514        let mut command = self.command().await?;
515        command.arg("service");
516        if let ProcessInbox::Skip = process_inbox {
517            command.arg("--listener-skip-process-inbox");
518        }
519        if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
520            command.args(var.split_whitespace());
521        }
522        for app_id in operator_application_ids {
523            command.args(["--operator-application-ids", &app_id.to_string()]);
524        }
525        for (name, path) in operators {
526            command.args(["--operators", &format!("{}={}", name, path.display())]);
527        }
528        if read_only {
529            command.arg("--read-only");
530        }
531        for query in allowed_subscriptions {
532            command.args(["--allow-subscription", query]);
533        }
534        let child = command
535            .args(["--port".to_string(), port.to_string()])
536            .spawn_into()?;
537        let client = reqwest_client();
538        for i in 0..10 {
539            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
540            let request = client
541                .get(format!("http://localhost:{}/", port))
542                .send()
543                .await;
544            if request.is_ok() {
545                tracing::info!("Node service has started");
546                return Ok(NodeService::new(port, child));
547            } else {
548                tracing::warn!("Waiting for node service to start");
549            }
550        }
551        bail!("Failed to start node service");
552    }
553
554    /// Runs `linera service` with a controller application.
555    pub async fn run_node_service_with_controller(
556        &self,
557        port: impl Into<Option<u16>>,
558        process_inbox: ProcessInbox,
559        controller_id: &ApplicationId,
560        operators: &[(String, PathBuf)],
561    ) -> Result<NodeService> {
562        let port = port.into().unwrap_or(8080);
563        let mut command = self.command().await?;
564        command.arg("service");
565        if let ProcessInbox::Skip = process_inbox {
566            command.arg("--listener-skip-process-inbox");
567        }
568        if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
569            command.args(var.split_whitespace());
570        }
571        command.args(["--controller-id", &controller_id.to_string()]);
572        for (name, path) in operators {
573            command.args(["--operators", &format!("{}={}", name, path.display())]);
574        }
575        let child = command
576            .args(["--port".to_string(), port.to_string()])
577            .spawn_into()?;
578        let client = reqwest_client();
579        for i in 0..10 {
580            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
581            let request = client
582                .get(format!("http://localhost:{}/", port))
583                .send()
584                .await;
585            if request.is_ok() {
586                tracing::info!("Node service has started");
587                return Ok(NodeService::new(port, child));
588            } else {
589                tracing::warn!("Waiting for node service to start");
590            }
591        }
592        bail!("Failed to start node service");
593    }
594
595    /// Runs `linera validator query`
596    pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
597        let mut command = self.command().await?;
598        command.arg("validator").arg("query").arg(address);
599        let stdout = command.spawn_and_wait_for_stdout().await?;
600
601        // Parse the genesis config hash from the output.
602        // It's on a line like "Genesis config hash: <hash>"
603        let hash = stdout
604            .lines()
605            .find_map(|line| {
606                line.strip_prefix("Genesis config hash: ")
607                    .and_then(|hash_str| hash_str.trim().parse().ok())
608            })
609            .context("error while parsing the result of `linera validator query`")?;
610        Ok(hash)
611    }
612
613    /// Runs `linera validator list`.
614    pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
615        let mut command = self.command().await?;
616        command.arg("validator").arg("list");
617        if let Some(chain_id) = chain_id {
618            command.args(["--chain-id", &chain_id.to_string()]);
619        }
620        command.spawn_and_wait_for_stdout().await?;
621        Ok(())
622    }
623
624    /// Runs `linera sync-validator`.
625    pub async fn sync_validator(
626        &self,
627        chain_ids: impl IntoIterator<Item = &ChainId>,
628        validator_address: impl Into<String>,
629    ) -> Result<()> {
630        let mut command = self.command().await?;
631        command
632            .arg("validator")
633            .arg("sync")
634            .arg(validator_address.into());
635        let mut chain_ids = chain_ids.into_iter().peekable();
636        if chain_ids.peek().is_some() {
637            command
638                .arg("--chains")
639                .args(chain_ids.map(ChainId::to_string));
640        }
641        command.spawn_and_wait_for_stdout().await?;
642        Ok(())
643    }
644
645    /// Runs `linera faucet`.
646    pub async fn run_faucet(
647        &self,
648        port: impl Into<Option<u16>>,
649        chain_id: Option<ChainId>,
650        amount: Amount,
651    ) -> Result<FaucetService> {
652        let port = port.into().unwrap_or(8080);
653        let temp_dir = tempfile::tempdir()
654            .context("Failed to create temporary directory for faucet storage")?;
655        let storage_path = temp_dir.path().join("faucet_storage.sqlite");
656        let mut command = self.command().await?;
657        let command = command
658            .arg("faucet")
659            .args(["--port".to_string(), port.to_string()])
660            .args(["--amount".to_string(), amount.to_string()])
661            .args([
662                "--storage-path".to_string(),
663                storage_path.to_string_lossy().to_string(),
664            ]);
665        if let Some(chain_id) = chain_id {
666            command.arg(chain_id.to_string());
667        }
668        let child = command.spawn_into()?;
669        let client = reqwest_client();
670        for i in 0..10 {
671            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
672            let request = client
673                .get(format!("http://localhost:{}/", port))
674                .send()
675                .await;
676            if request.is_ok() {
677                tracing::info!("Faucet has started");
678                return Ok(FaucetService::new(port, child, temp_dir));
679            } else {
680                tracing::debug!("Waiting for faucet to start");
681            }
682        }
683        bail!("Failed to start faucet");
684    }
685
686    /// Runs `linera local-balance`.
687    pub async fn local_balance(&self, account: Account) -> Result<Amount> {
688        let stdout = self
689            .command()
690            .await?
691            .arg("local-balance")
692            .arg(account.to_string())
693            .spawn_and_wait_for_stdout()
694            .await?;
695        let amount = stdout
696            .trim()
697            .parse()
698            .context("error while parsing the result of `linera local-balance`")?;
699        Ok(amount)
700    }
701
702    /// Runs `linera query-balance`.
703    pub async fn query_balance(&self, account: Account) -> Result<Amount> {
704        let stdout = self
705            .command()
706            .await?
707            .arg("query-balance")
708            .arg(account.to_string())
709            .spawn_and_wait_for_stdout()
710            .await?;
711        let amount = stdout
712            .trim()
713            .parse()
714            .context("error while parsing the result of `linera query-balance`")?;
715        Ok(amount)
716    }
717
718    /// Runs `linera sync`.
719    pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
720        self.command()
721            .await?
722            .arg("sync")
723            .arg(chain_id.to_string())
724            .spawn_and_wait_for_stdout()
725            .await?;
726        Ok(())
727    }
728
729    /// Runs `linera process-inbox`.
730    pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
731        self.command()
732            .await?
733            .arg("process-inbox")
734            .arg(chain_id.to_string())
735            .spawn_and_wait_for_stdout()
736            .await?;
737        Ok(())
738    }
739
740    /// Runs `linera transfer`.
741    pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
742        self.command()
743            .await?
744            .arg("transfer")
745            .arg(amount.to_string())
746            .args(["--from", &from.to_string()])
747            .args(["--to", &to.to_string()])
748            .spawn_and_wait_for_stdout()
749            .await?;
750        Ok(())
751    }
752
753    /// Runs `linera transfer` with no logging.
754    pub async fn transfer_with_silent_logs(
755        &self,
756        amount: Amount,
757        from: ChainId,
758        to: ChainId,
759    ) -> Result<()> {
760        self.command()
761            .await?
762            .env("RUST_LOG", "off")
763            .arg("transfer")
764            .arg(amount.to_string())
765            .args(["--from", &from.to_string()])
766            .args(["--to", &to.to_string()])
767            .spawn_and_wait_for_stdout()
768            .await?;
769        Ok(())
770    }
771
772    /// Runs `linera transfer` with owner accounts.
773    pub async fn transfer_with_accounts(
774        &self,
775        amount: Amount,
776        from: Account,
777        to: Account,
778    ) -> Result<()> {
779        self.command()
780            .await?
781            .arg("transfer")
782            .arg(amount.to_string())
783            .args(["--from", &from.to_string()])
784            .args(["--to", &to.to_string()])
785            .spawn_and_wait_for_stdout()
786            .await?;
787        Ok(())
788    }
789
790    fn benchmark_command_internal(command: &mut Command, args: &BenchmarkCommand) -> Result<()> {
791        let mut formatted_args = to_args(&args)?;
792        let subcommand = formatted_args.remove(0);
793        // The subcommand is followed by the flattened options, which are preceded by "options".
794        // So remove that as well.
795        formatted_args.remove(0);
796        let options = formatted_args
797            .chunks_exact(2)
798            .flat_map(|pair| {
799                let option = format!("--{}", pair[0]);
800                match pair[1].as_str() {
801                    "true" => vec![option],
802                    "false" => vec![],
803                    _ => vec![option, pair[1].clone()],
804                }
805            })
806            .collect::<Vec<_>>();
807        command
808            .args([
809                "--max-pending-message-bundles",
810                &args.transactions_per_block().to_string(),
811            ])
812            .arg("benchmark")
813            .arg(subcommand)
814            .args(options);
815        Ok(())
816    }
817
818    async fn benchmark_command_with_envs(
819        &self,
820        args: BenchmarkCommand,
821        envs: &[(&str, &str)],
822    ) -> Result<Command> {
823        let mut command = self
824            .command_with_envs_and_arguments(envs, self.required_command_arguments())
825            .await?;
826        Self::benchmark_command_internal(&mut command, &args)?;
827        Ok(command)
828    }
829
830    async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
831        let mut command = self
832            .command_with_arguments(self.required_command_arguments())
833            .await?;
834        Self::benchmark_command_internal(&mut command, &args)?;
835        Ok(command)
836    }
837
838    /// Runs `linera benchmark`.
839    pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
840        let mut command = self.benchmark_command(args).await?;
841        command.spawn_and_wait_for_stdout().await?;
842        Ok(())
843    }
844
845    /// Runs `linera benchmark`, but detached: don't wait for the command to finish, just spawn it
846    /// and return the child process, and the handles to the stdout and stderr.
847    pub async fn benchmark_detached(
848        &self,
849        args: BenchmarkCommand,
850        tx: oneshot::Sender<()>,
851    ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
852        let mut child = self
853            .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
854            .await?
855            .kill_on_drop(true)
856            .stdin(Stdio::piped())
857            .stdout(Stdio::piped())
858            .stderr(Stdio::piped())
859            .spawn()?;
860
861        let pid = child.id().expect("failed to get pid");
862        let stdout = child.stdout.take().expect("stdout not open");
863        let stdout_handle = tokio::spawn(async move {
864            let mut lines = BufReader::new(stdout).lines();
865            while let Ok(Some(line)) = lines.next_line().await {
866                println!("benchmark{{pid={pid}}} {line}");
867            }
868        });
869
870        let stderr = child.stderr.take().expect("stderr not open");
871        let stderr_handle = tokio::spawn(async move {
872            let mut lines = BufReader::new(stderr).lines();
873            let mut tx = Some(tx);
874            while let Ok(Some(line)) = lines.next_line().await {
875                if line.contains("Ready to start benchmark") {
876                    tx.take()
877                        .expect("Should only send signal once")
878                        .send(())
879                        .expect("failed to send ready signal to main thread");
880                } else {
881                    println!("benchmark{{pid={pid}}} {line}");
882                }
883            }
884        });
885        Ok((child, stdout_handle, stderr_handle))
886    }
887
888    async fn open_chain_internal(
889        &self,
890        from: ChainId,
891        owner: Option<AccountOwner>,
892        initial_balance: Amount,
893        super_owner: bool,
894    ) -> Result<(ChainId, AccountOwner)> {
895        let mut command = self.command().await?;
896        command
897            .arg("open-chain")
898            .args(["--from", &from.to_string()])
899            .args(["--initial-balance", &initial_balance.to_string()]);
900
901        if let Some(owner) = owner {
902            command.args(["--owner", &owner.to_string()]);
903        }
904
905        if super_owner {
906            command.arg("--super-owner");
907        }
908
909        let stdout = command.spawn_and_wait_for_stdout().await?;
910        let mut split = stdout.split('\n');
911        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
912        let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
913        if let Some(owner) = owner {
914            assert_eq!(owner, new_owner);
915        }
916        Ok((chain_id, new_owner))
917    }
918
919    /// Runs `linera open-chain --super-owner`.
920    pub async fn open_chain_super_owner(
921        &self,
922        from: ChainId,
923        owner: Option<AccountOwner>,
924        initial_balance: Amount,
925    ) -> Result<(ChainId, AccountOwner)> {
926        self.open_chain_internal(from, owner, initial_balance, true)
927            .await
928    }
929
930    /// Runs `linera open-chain`.
931    pub async fn open_chain(
932        &self,
933        from: ChainId,
934        owner: Option<AccountOwner>,
935        initial_balance: Amount,
936    ) -> Result<(ChainId, AccountOwner)> {
937        self.open_chain_internal(from, owner, initial_balance, false)
938            .await
939    }
940
941    /// Runs `linera open-chain` then `linera assign`.
942    pub async fn open_and_assign(
943        &self,
944        client: &ClientWrapper,
945        initial_balance: Amount,
946    ) -> Result<ChainId> {
947        let our_chain = self
948            .load_wallet()?
949            .default_chain()
950            .context("no default chain found")?;
951        let owner = client.keygen().await?;
952        let (new_chain, _) = self
953            .open_chain(our_chain, Some(owner), initial_balance)
954            .await?;
955        client.assign(owner, new_chain).await?;
956        Ok(new_chain)
957    }
958
959    pub async fn open_multi_owner_chain(
960        &self,
961        from: ChainId,
962        owners: BTreeMap<AccountOwner, u64>,
963        multi_leader_rounds: u32,
964        balance: Amount,
965        base_timeout_ms: u64,
966    ) -> Result<ChainId> {
967        let mut command = self.command().await?;
968        command
969            .arg("open-multi-owner-chain")
970            .args(["--from", &from.to_string()])
971            .arg("--owners")
972            .arg(serde_json::to_string(&owners)?)
973            .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
974        command
975            .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
976            .args(["--initial-balance", &balance.to_string()]);
977
978        let stdout = command.spawn_and_wait_for_stdout().await?;
979        let mut split = stdout.split('\n');
980        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
981
982        Ok(chain_id)
983    }
984
985    pub async fn change_ownership(
986        &self,
987        chain_id: ChainId,
988        super_owners: Vec<AccountOwner>,
989        owners: Vec<AccountOwner>,
990    ) -> Result<()> {
991        let mut command = self.command().await?;
992        command
993            .arg("change-ownership")
994            .args(["--chain-id", &chain_id.to_string()]);
995        command
996            .arg("--super-owners")
997            .arg(serde_json::to_string(&super_owners)?);
998        command.arg("--owners").arg(serde_json::to_string(
999            &owners
1000                .into_iter()
1001                .zip(std::iter::repeat(100u64))
1002                .collect::<BTreeMap<_, _>>(),
1003        )?);
1004        command.spawn_and_wait_for_stdout().await?;
1005        Ok(())
1006    }
1007
1008    pub async fn change_application_permissions(
1009        &self,
1010        chain_id: ChainId,
1011        application_permissions: ApplicationPermissions,
1012    ) -> Result<()> {
1013        let mut command = self.command().await?;
1014        command
1015            .arg("change-application-permissions")
1016            .args(["--chain-id", &chain_id.to_string()]);
1017        command.arg("--manage-chain").arg(serde_json::to_string(
1018            &application_permissions.manage_chain,
1019        )?);
1020        // TODO: add other fields
1021        command.spawn_and_wait_for_stdout().await?;
1022        Ok(())
1023    }
1024
1025    /// Runs `linera wallet follow-chain CHAIN_ID`.
1026    pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
1027        let mut command = self.command().await?;
1028        command
1029            .args(["wallet", "follow-chain"])
1030            .arg(chain_id.to_string());
1031        if sync {
1032            command.arg("--sync");
1033        }
1034        command.spawn_and_wait_for_stdout().await?;
1035        Ok(())
1036    }
1037
1038    /// Runs `linera wallet forget-chain CHAIN_ID`.
1039    pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
1040        let mut command = self.command().await?;
1041        command
1042            .args(["wallet", "forget-chain"])
1043            .arg(chain_id.to_string());
1044        command.spawn_and_wait_for_stdout().await?;
1045        Ok(())
1046    }
1047
1048    /// Runs `linera wallet set-default CHAIN_ID`.
1049    pub async fn set_default_chain(&self, chain_id: ChainId) -> Result<()> {
1050        let mut command = self.command().await?;
1051        command
1052            .args(["wallet", "set-default"])
1053            .arg(chain_id.to_string());
1054        command.spawn_and_wait_for_stdout().await?;
1055        Ok(())
1056    }
1057
1058    pub async fn retry_pending_block(
1059        &self,
1060        chain_id: Option<ChainId>,
1061    ) -> Result<Option<CryptoHash>> {
1062        let mut command = self.command().await?;
1063        command.arg("retry-pending-block");
1064        if let Some(chain_id) = chain_id {
1065            command.arg(chain_id.to_string());
1066        }
1067        let stdout = command.spawn_and_wait_for_stdout().await?;
1068        let stdout = stdout.trim();
1069        if stdout.is_empty() {
1070            Ok(None)
1071        } else {
1072            Ok(Some(CryptoHash::from_str(stdout)?))
1073        }
1074    }
1075
1076    /// Runs `linera publish-data-blob`.
1077    pub async fn publish_data_blob(
1078        &self,
1079        path: &Path,
1080        chain_id: Option<ChainId>,
1081    ) -> Result<CryptoHash> {
1082        let mut command = self.command().await?;
1083        command.arg("publish-data-blob").arg(path);
1084        if let Some(chain_id) = chain_id {
1085            command.arg(chain_id.to_string());
1086        }
1087        let stdout = command.spawn_and_wait_for_stdout().await?;
1088        let stdout = stdout.trim();
1089        Ok(CryptoHash::from_str(stdout)?)
1090    }
1091
1092    /// Runs `linera read-data-blob`.
1093    pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
1094        let mut command = self.command().await?;
1095        command.arg("read-data-blob").arg(hash.to_string());
1096        if let Some(chain_id) = chain_id {
1097            command.arg(chain_id.to_string());
1098        }
1099        command.spawn_and_wait_for_stdout().await?;
1100        Ok(())
1101    }
1102
1103    pub fn load_wallet(&self) -> Result<Wallet> {
1104        Ok(Wallet::read(&self.wallet_path())?)
1105    }
1106
1107    pub fn load_keystore(&self) -> Result<InMemorySigner> {
1108        util::read_json(self.keystore_path())
1109    }
1110
1111    pub fn wallet_path(&self) -> PathBuf {
1112        self.path_provider.path().join(&self.wallet)
1113    }
1114
1115    pub fn keystore_path(&self) -> PathBuf {
1116        self.path_provider.path().join(&self.keystore)
1117    }
1118
1119    pub fn storage_path(&self) -> &str {
1120        &self.storage
1121    }
1122
1123    pub fn get_owner(&self) -> Option<AccountOwner> {
1124        let wallet = self.load_wallet().ok()?;
1125        wallet
1126            .get(wallet.default_chain()?)
1127            .expect("default chain must be in wallet")
1128            .owner
1129    }
1130
1131    pub fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
1132        self.load_wallet()
1133            .ok()
1134            .is_some_and(|wallet| wallet.get(chain).is_some())
1135    }
1136
1137    pub async fn set_validator(
1138        &self,
1139        validator_key: &(String, String),
1140        port: usize,
1141        votes: usize,
1142    ) -> Result<()> {
1143        let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1144        self.command()
1145            .await?
1146            .arg("validator")
1147            .arg("add")
1148            .args(["--public-key", &validator_key.0])
1149            .args(["--account-key", &validator_key.1])
1150            .args(["--address", &address])
1151            .args(["--votes", &votes.to_string()])
1152            .spawn_and_wait_for_stdout()
1153            .await?;
1154        Ok(())
1155    }
1156
1157    pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
1158        self.command()
1159            .await?
1160            .arg("validator")
1161            .arg("remove")
1162            .args(["--public-key", validator_key])
1163            .spawn_and_wait_for_stdout()
1164            .await?;
1165        Ok(())
1166    }
1167
1168    pub async fn change_validators(
1169        &self,
1170        add_validators: &[(String, String, usize, usize)], // (public_key, account_key, port, votes)
1171        modify_validators: &[(String, String, usize, usize)], // (public_key, account_key, port, votes)
1172        remove_validators: &[String],
1173    ) -> Result<()> {
1174        use std::str::FromStr;
1175
1176        use linera_base::crypto::{AccountPublicKey, ValidatorPublicKey};
1177
1178        // Build a map that will be serialized to JSON
1179        // Use the exact types that deserialization expects
1180        let mut changes = std::collections::HashMap::new();
1181
1182        // Add/modify validators
1183        for (public_key_str, account_key_str, port, votes) in
1184            add_validators.iter().chain(modify_validators.iter())
1185        {
1186            let public_key = ValidatorPublicKey::from_str(public_key_str)
1187                .with_context(|| format!("Invalid validator public key: {}", public_key_str))?;
1188
1189            let account_key = AccountPublicKey::from_str(account_key_str)
1190                .with_context(|| format!("Invalid account public key: {}", account_key_str))?;
1191
1192            let address = format!("{}:127.0.0.1:{}", self.network.short(), port)
1193                .parse()
1194                .unwrap();
1195
1196            // Create ValidatorChange struct
1197            let change = crate::cli::validator::Change {
1198                account_key,
1199                address,
1200                votes: crate::cli::validator::Votes(
1201                    std::num::NonZero::new(*votes as u64).context("Votes must be non-zero")?,
1202                ),
1203            };
1204
1205            changes.insert(public_key, Some(change));
1206        }
1207
1208        // Remove validators (set to None)
1209        for validator_key_str in remove_validators {
1210            let public_key = ValidatorPublicKey::from_str(validator_key_str)
1211                .with_context(|| format!("Invalid validator public key: {}", validator_key_str))?;
1212            changes.insert(public_key, None);
1213        }
1214
1215        // Create temporary file with JSON
1216        let temp_file = tempfile::NamedTempFile::new()
1217            .context("Failed to create temporary file for validator changes")?;
1218        serde_json::to_writer(&temp_file, &changes)
1219            .context("Failed to write validator changes to file")?;
1220        let temp_path = temp_file.path();
1221
1222        self.command()
1223            .await?
1224            .arg("validator")
1225            .arg("update")
1226            .arg(temp_path)
1227            .arg("--yes") // Skip confirmation prompt
1228            .spawn_and_wait_for_stdout()
1229            .await?;
1230
1231        Ok(())
1232    }
1233
1234    pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
1235        self.command()
1236            .await?
1237            .arg("revoke-epochs")
1238            .arg(epoch.to_string())
1239            .spawn_and_wait_for_stdout()
1240            .await?;
1241        Ok(())
1242    }
1243
1244    /// Runs `linera keygen`.
1245    pub async fn keygen(&self) -> Result<AccountOwner> {
1246        let stdout = self
1247            .command()
1248            .await?
1249            .arg("keygen")
1250            .spawn_and_wait_for_stdout()
1251            .await?;
1252        AccountOwner::from_str(stdout.as_str().trim())
1253    }
1254
1255    /// Returns the default chain.
1256    pub fn default_chain(&self) -> Option<ChainId> {
1257        self.load_wallet().ok()?.default_chain()
1258    }
1259
1260    /// Runs `linera assign`.
1261    pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
1262        let _stdout = self
1263            .command()
1264            .await?
1265            .arg("assign")
1266            .args(["--owner", &owner.to_string()])
1267            .args(["--chain-id", &chain_id.to_string()])
1268            .spawn_and_wait_for_stdout()
1269            .await?;
1270        Ok(())
1271    }
1272
1273    /// Runs `linera set-preferred-owner` for `chain_id`.
1274    pub async fn set_preferred_owner(
1275        &self,
1276        chain_id: ChainId,
1277        owner: Option<AccountOwner>,
1278    ) -> Result<()> {
1279        let mut owner_arg = vec!["--owner".to_string()];
1280        if let Some(owner) = owner {
1281            owner_arg.push(owner.to_string());
1282        };
1283        self.command()
1284            .await?
1285            .arg("set-preferred-owner")
1286            .args(["--chain-id", &chain_id.to_string()])
1287            .args(owner_arg)
1288            .spawn_and_wait_for_stdout()
1289            .await?;
1290        Ok(())
1291    }
1292
1293    pub async fn build_application(
1294        &self,
1295        path: &Path,
1296        name: &str,
1297        is_workspace: bool,
1298    ) -> Result<(PathBuf, PathBuf)> {
1299        Command::new("cargo")
1300            .current_dir(self.path_provider.path())
1301            .arg("build")
1302            .arg("--release")
1303            .args(["--target", "wasm32-unknown-unknown"])
1304            .arg("--manifest-path")
1305            .arg(path.join("Cargo.toml"))
1306            .spawn_and_wait_for_stdout()
1307            .await?;
1308
1309        let release_dir = match is_workspace {
1310            true => path.join("../target/wasm32-unknown-unknown/release"),
1311            false => path.join("target/wasm32-unknown-unknown/release"),
1312        };
1313
1314        let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1315        let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1316
1317        let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1318        let service_size = fs_err::tokio::metadata(&service).await?.len();
1319        tracing::info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1320
1321        Ok((contract, service))
1322    }
1323}
1324
1325impl Drop for ClientWrapper {
1326    fn drop(&mut self) {
1327        use std::process::Command as SyncCommand;
1328
1329        if self.on_drop != OnClientDrop::CloseChains {
1330            return;
1331        }
1332
1333        let Ok(binary_path) = self.binary_path.lock() else {
1334            tracing::error!(
1335                "Failed to close chains because a thread panicked with a lock to `binary_path`"
1336            );
1337            return;
1338        };
1339
1340        let Some(binary_path) = binary_path.as_ref() else {
1341            tracing::warn!(
1342                "Assuming no chains need to be closed, because the command binary was never \
1343                resolved and therefore presumably never called"
1344            );
1345            return;
1346        };
1347
1348        let working_directory = self.path_provider.path();
1349        let mut wallet_show_command = SyncCommand::new(binary_path);
1350
1351        for argument in self.command_arguments() {
1352            wallet_show_command.arg(&*argument);
1353        }
1354
1355        let Ok(wallet_show_output) = wallet_show_command
1356            .current_dir(working_directory)
1357            .args(["wallet", "show", "--short", "--owned"])
1358            .output()
1359        else {
1360            tracing::warn!("Failed to execute `wallet show --short` to list chains to close");
1361            return;
1362        };
1363
1364        if !wallet_show_output.status.success() {
1365            tracing::warn!("Failed to list chains in the wallet to close them");
1366            return;
1367        }
1368
1369        let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1370            tracing::warn!(
1371                "Failed to close chains because `linera wallet show --short` \
1372                returned a non-UTF-8 output"
1373            );
1374            return;
1375        };
1376
1377        let chain_ids = chain_list_string
1378            .split('\n')
1379            .map(|line| line.trim())
1380            .filter(|line| !line.is_empty());
1381
1382        for chain_id in chain_ids {
1383            let mut close_chain_command = SyncCommand::new(binary_path);
1384
1385            for argument in self.command_arguments() {
1386                close_chain_command.arg(&*argument);
1387            }
1388
1389            close_chain_command.current_dir(working_directory);
1390
1391            match close_chain_command.args(["close-chain", chain_id]).status() {
1392                Ok(status) if status.success() => (),
1393                Ok(failure) => tracing::warn!("Failed to close chain {chain_id}: {failure}"),
1394                Err(error) => tracing::warn!("Failed to close chain {chain_id}: {error}"),
1395            }
1396        }
1397    }
1398}
1399
1400#[cfg(with_testing)]
1401impl ClientWrapper {
1402    pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1403        self.build_application(Self::example_path(name)?.as_path(), name, true)
1404            .await
1405    }
1406
1407    pub fn example_path(name: &str) -> Result<PathBuf> {
1408        Ok(env::current_dir()?.join("../examples/").join(name))
1409    }
1410}
1411
1412fn truncate_query_output(input: &str) -> String {
1413    let max_len = 1000;
1414    if input.len() < max_len {
1415        input.to_string()
1416    } else {
1417        format!("{} ...", input.get(..max_len).unwrap())
1418    }
1419}
1420
1421fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1422    let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1423    let max_len = 200;
1424    if query.len() < max_len {
1425        query
1426    } else {
1427        format!("{} ...", query.get(..max_len).unwrap())
1428    }
1429}
1430
1431/// A running node service.
1432pub struct NodeService {
1433    port: u16,
1434    child: Child,
1435}
1436
1437impl NodeService {
1438    fn new(port: u16, child: Child) -> Self {
1439        Self { port, child }
1440    }
1441
1442    pub async fn terminate(mut self) -> Result<()> {
1443        self.child.kill().await.context("terminating node service")
1444    }
1445
1446    pub fn port(&self) -> u16 {
1447        self.port
1448    }
1449
1450    pub fn ensure_is_running(&mut self) -> Result<()> {
1451        self.child.ensure_is_running()
1452    }
1453
1454    pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1455        let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1456        let mut data = self.query_node(query).await?;
1457        Ok(serde_json::from_value(data["processInbox"].take())?)
1458    }
1459
1460    pub async fn sync(&self, chain_id: &ChainId) -> Result<u64> {
1461        let query = format!("mutation {{ sync(chainId: \"{chain_id}\") }}");
1462        let mut data = self.query_node(query).await?;
1463        Ok(serde_json::from_value(data["sync"].take())?)
1464    }
1465
1466    pub async fn transfer(
1467        &self,
1468        chain_id: ChainId,
1469        owner: AccountOwner,
1470        recipient: Account,
1471        amount: Amount,
1472    ) -> Result<CryptoHash> {
1473        let json_owner = owner.to_value();
1474        let json_recipient = recipient.to_value();
1475        let query = format!(
1476            "mutation {{ transfer(\
1477                 chainId: \"{chain_id}\", \
1478                 owner: {json_owner}, \
1479                 recipient: {json_recipient}, \
1480                 amount: \"{amount}\") \
1481             }}"
1482        );
1483        let data = self.query_node(query).await?;
1484        serde_json::from_value(data["transfer"].clone())
1485            .context("missing transfer field in response")
1486    }
1487
1488    pub async fn balance(&self, account: &Account) -> Result<Amount> {
1489        let chain = account.chain_id;
1490        let owner = account.owner;
1491        if matches!(owner, AccountOwner::CHAIN) {
1492            let query = format!(
1493                "query {{ chain(chainId:\"{chain}\") {{
1494                    executionState {{ system {{ balance }} }}
1495                }} }}"
1496            );
1497            let response = self.query_node(query).await?;
1498            let balance = &response["chain"]["executionState"]["system"]["balance"]
1499                .as_str()
1500                .unwrap();
1501            return Ok(Amount::from_str(balance)?);
1502        }
1503        let query = format!(
1504            "query {{ chain(chainId:\"{chain}\") {{
1505                executionState {{ system {{ balances {{
1506                    entry(key:\"{owner}\") {{ value }}
1507                }} }} }}
1508            }} }}"
1509        );
1510        let response = self.query_node(query).await?;
1511        let balances = &response["chain"]["executionState"]["system"]["balances"];
1512        let balance = balances["entry"]["value"].as_str();
1513        match balance {
1514            None => Ok(Amount::ZERO),
1515            Some(amount) => Ok(Amount::from_str(amount)?),
1516        }
1517    }
1518
1519    pub fn make_application<A: ContractAbi>(
1520        &self,
1521        chain_id: &ChainId,
1522        application_id: &ApplicationId<A>,
1523    ) -> Result<ApplicationWrapper<A>> {
1524        let application_id = application_id.forget_abi().to_string();
1525        let link = format!(
1526            "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1527            self.port
1528        );
1529        Ok(ApplicationWrapper::from(link))
1530    }
1531
1532    pub async fn publish_data_blob(
1533        &self,
1534        chain_id: &ChainId,
1535        bytes: Vec<u8>,
1536    ) -> Result<CryptoHash> {
1537        let query = format!(
1538            "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1539            chain_id.to_value(),
1540            bytes.to_value(),
1541        );
1542        let data = self.query_node(query).await?;
1543        serde_json::from_value(data["publishDataBlob"].clone())
1544            .context("missing publishDataBlob field in response")
1545    }
1546
1547    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1548        &self,
1549        chain_id: &ChainId,
1550        contract: PathBuf,
1551        service: PathBuf,
1552        vm_runtime: VmRuntime,
1553    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1554        let contract_code = Bytecode::load_from_file(&contract).await?;
1555        let service_code = Bytecode::load_from_file(&service).await?;
1556        let query = format!(
1557            "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1558            chain_id.to_value(),
1559            contract_code.to_value(),
1560            service_code.to_value(),
1561            vm_runtime.to_value(),
1562        );
1563        let data = self.query_node(query).await?;
1564        let module_str = data["publishModule"]
1565            .as_str()
1566            .context("module ID not found")?;
1567        let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1568        Ok(module_id.with_abi())
1569    }
1570
1571    pub async fn query_committees(&self, chain_id: &ChainId) -> Result<BTreeMap<Epoch, Committee>> {
1572        let query = format!(
1573            "query {{ chain(chainId:\"{chain_id}\") {{
1574                executionState {{ system {{ committees }} }}
1575            }} }}"
1576        );
1577        let mut response = self.query_node(query).await?;
1578        let committees = response["chain"]["executionState"]["system"]["committees"].take();
1579        Ok(serde_json::from_value(committees)?)
1580    }
1581
1582    pub async fn events_from_index(
1583        &self,
1584        chain_id: &ChainId,
1585        stream_id: &StreamId,
1586        start_index: u32,
1587    ) -> Result<Vec<IndexAndEvent>> {
1588        let query = format!(
1589            "query {{
1590               eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1591               {{ index event }}
1592             }}",
1593            stream_id.to_value()
1594        );
1595        let mut response = self.query_node(query).await?;
1596        let response = response["eventsFromIndex"].take();
1597        Ok(serde_json::from_value(response)?)
1598    }
1599
1600    pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1601        let n_try = 5;
1602        let query = query.as_ref();
1603        for i in 0..n_try {
1604            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1605            let url = format!("http://localhost:{}/", self.port);
1606            let client = reqwest_client();
1607            let result = client
1608                .post(url)
1609                .json(&json!({ "query": query }))
1610                .send()
1611                .await;
1612            if matches!(result, Err(ref error) if error.is_timeout()) {
1613                tracing::warn!(
1614                    "Timeout when sending query {} to the node service",
1615                    truncate_query_output(query)
1616                );
1617                continue;
1618            }
1619            let response = result.with_context(|| {
1620                format!(
1621                    "query_node: failed to post query={}",
1622                    truncate_query_output(query)
1623                )
1624            })?;
1625            ensure!(
1626                response.status().is_success(),
1627                "Query \"{}\" failed: {}",
1628                truncate_query_output(query),
1629                response
1630                    .text()
1631                    .await
1632                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1633            );
1634            let value: Value = response.json().await.context("invalid JSON")?;
1635            if let Some(errors) = value.get("errors") {
1636                tracing::warn!(
1637                    "Query \"{}\" failed: {}",
1638                    truncate_query_output(query),
1639                    errors
1640                );
1641            } else {
1642                return Ok(value["data"].clone());
1643            }
1644        }
1645        bail!(
1646            "Query \"{}\" failed after {} retries.",
1647            truncate_query_output(query),
1648            n_try
1649        );
1650    }
1651
1652    pub async fn create_application<
1653        Abi: ContractAbi,
1654        Parameters: Serialize,
1655        InstantiationArgument: Serialize,
1656    >(
1657        &self,
1658        chain_id: &ChainId,
1659        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1660        parameters: &Parameters,
1661        argument: &InstantiationArgument,
1662        required_application_ids: &[ApplicationId],
1663    ) -> Result<ApplicationId<Abi>> {
1664        let module_id = module_id.forget_abi();
1665        let json_required_applications_ids = required_application_ids
1666            .iter()
1667            .map(ApplicationId::to_string)
1668            .collect::<Vec<_>>()
1669            .to_value();
1670        // Convert to `serde_json::Value` then `async_graphql::Value` via the trait `InputType`.
1671        let new_parameters = serde_json::to_value(parameters)
1672            .context("could not create parameters JSON")?
1673            .to_value();
1674        let new_argument = serde_json::to_value(argument)
1675            .context("could not create argument JSON")?
1676            .to_value();
1677        let query = format!(
1678            "mutation {{ createApplication(\
1679                 chainId: \"{chain_id}\",
1680                 moduleId: \"{module_id}\", \
1681                 parameters: {new_parameters}, \
1682                 instantiationArgument: {new_argument}, \
1683                 requiredApplicationIds: {json_required_applications_ids}) \
1684             }}"
1685        );
1686        let data = self.query_node(query).await?;
1687        let app_id_str = data["createApplication"]
1688            .as_str()
1689            .context("missing createApplication string in response")?
1690            .trim();
1691        Ok(app_id_str
1692            .parse::<ApplicationId>()
1693            .context("invalid application ID")?
1694            .with_abi())
1695    }
1696
1697    /// Obtains the hash and height of the `chain`'s tip block, as known by this node service.
1698    pub async fn chain_tip(&self, chain: ChainId) -> Result<Option<(CryptoHash, BlockHeight)>> {
1699        let query = format!(
1700            r#"query {{ block(chainId: "{chain}") {{
1701                hash
1702                block {{ header {{ height }} }}
1703            }} }}"#
1704        );
1705
1706        let mut response = self.query_node(&query).await?;
1707
1708        match (
1709            mem::take(&mut response["block"]["hash"]),
1710            mem::take(&mut response["block"]["block"]["header"]["height"]),
1711        ) {
1712            (Value::Null, Value::Null) => Ok(None),
1713            (Value::String(hash), Value::Number(height)) => Ok(Some((
1714                hash.parse()
1715                    .context("Received an invalid hash {hash:?} for chain tip")?,
1716                BlockHeight(height.as_u64().unwrap()),
1717            ))),
1718            invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
1719        }
1720    }
1721
1722    /// Subscribes to the node service and returns a stream of notifications about a chain.
1723    pub async fn notifications(
1724        &self,
1725        chain_id: ChainId,
1726    ) -> Result<Pin<Box<impl Stream<Item = Result<Notification>>>>> {
1727        let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
1728        let url = format!("ws://localhost:{}/ws", self.port);
1729        let mut request = url.into_client_request()?;
1730        request.headers_mut().insert(
1731            "Sec-WebSocket-Protocol",
1732            HeaderValue::from_str("graphql-transport-ws")?,
1733        );
1734        let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1735        let init_json = json!({
1736          "type": "connection_init",
1737          "payload": {}
1738        });
1739        websocket.send(init_json.to_string().into()).await?;
1740        let text = websocket
1741            .next()
1742            .await
1743            .context("Failed to establish connection")??
1744            .into_text()?;
1745        ensure!(
1746            text == "{\"type\":\"connection_ack\"}",
1747            "Unexpected response: {text}"
1748        );
1749        let query_json = json!({
1750          "id": "1",
1751          "type": "start",
1752          "payload": {
1753            "query": query,
1754            "variables": {},
1755            "operationName": null
1756          }
1757        });
1758        websocket.send(query_json.to_string().into()).await?;
1759        Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
1760            |message| async {
1761                let text = message.into_text()?;
1762                let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1763                if let Some(errors) = value["payload"].get("errors") {
1764                    bail!("Notification subscription failed: {errors:?}");
1765                }
1766                serde_json::from_value(value["payload"]["data"]["notifications"].clone())
1767                    .context("Failed to deserialize notification")
1768            },
1769        )))
1770    }
1771
1772    /// Subscribes to query results via the `queryResult` GraphQL subscription.
1773    pub async fn query_result(
1774        &self,
1775        name: &str,
1776        chain_id: ChainId,
1777        application_id: &ApplicationId,
1778    ) -> Result<Pin<Box<impl Stream<Item = Result<Value>>>>> {
1779        let query = format!(
1780            r#"subscription {{ queryResult(name: "{name}", chainId: "{chain_id}", applicationId: "{application_id}") }}"#,
1781        );
1782        let url = format!("ws://localhost:{}/ws", self.port);
1783        let mut request = url.into_client_request()?;
1784        request.headers_mut().insert(
1785            "Sec-WebSocket-Protocol",
1786            HeaderValue::from_str("graphql-transport-ws")?,
1787        );
1788        let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1789        let init_json = json!({
1790          "type": "connection_init",
1791          "payload": {}
1792        });
1793        websocket.send(init_json.to_string().into()).await?;
1794        let text = websocket
1795            .next()
1796            .await
1797            .context("Failed to establish connection")??
1798            .into_text()?;
1799        ensure!(
1800            text == "{\"type\":\"connection_ack\"}",
1801            "Unexpected response: {text}"
1802        );
1803        let query_json = json!({
1804          "id": "1",
1805          "type": "start",
1806          "payload": {
1807            "query": query,
1808            "variables": {},
1809            "operationName": null
1810          }
1811        });
1812        websocket.send(query_json.to_string().into()).await?;
1813        Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
1814            |message| async {
1815                let text = message.into_text()?;
1816                let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1817                if let Some(errors) = value["payload"].get("errors") {
1818                    bail!("Query result subscription failed: {errors:?}");
1819                }
1820                Ok(value["payload"]["data"]["queryResult"].clone())
1821            },
1822        )))
1823    }
1824}
1825
1826/// A running faucet service.
1827pub struct FaucetService {
1828    port: u16,
1829    child: Child,
1830    _temp_dir: tempfile::TempDir,
1831}
1832
1833impl FaucetService {
1834    fn new(port: u16, child: Child, temp_dir: tempfile::TempDir) -> Self {
1835        Self {
1836            port,
1837            child,
1838            _temp_dir: temp_dir,
1839        }
1840    }
1841
1842    pub async fn terminate(mut self) -> Result<()> {
1843        self.child
1844            .kill()
1845            .await
1846            .context("terminating faucet service")
1847    }
1848
1849    pub fn ensure_is_running(&mut self) -> Result<()> {
1850        self.child.ensure_is_running()
1851    }
1852
1853    pub fn instance(&self) -> Faucet {
1854        Faucet::new(format!("http://localhost:{}/", self.port))
1855    }
1856}
1857
1858/// A running `Application` to be queried in GraphQL.
1859pub struct ApplicationWrapper<A> {
1860    uri: String,
1861    _phantom: PhantomData<A>,
1862}
1863
1864impl<A> ApplicationWrapper<A> {
1865    pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
1866        let query = query.as_ref();
1867        let value = self.run_json_query(json!({ "query": query })).await?;
1868        Ok(value["data"].clone())
1869    }
1870
1871    pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
1872        const MAX_RETRIES: usize = 5;
1873
1874        for i in 0.. {
1875            let client = reqwest_client();
1876            let result = client.post(&self.uri).json(&query).send().await;
1877            let response = match result {
1878                Ok(response) => response,
1879                Err(error) if i < MAX_RETRIES => {
1880                    tracing::warn!(
1881                        "Failed to post query \"{}\": {error}; retrying",
1882                        truncate_query_output_serialize(&query),
1883                    );
1884                    continue;
1885                }
1886                Err(error) => {
1887                    let query = truncate_query_output_serialize(&query);
1888                    return Err(error)
1889                        .with_context(|| format!("run_json_query: failed to post query={query}"));
1890                }
1891            };
1892            ensure!(
1893                response.status().is_success(),
1894                "Query \"{}\" failed: {}",
1895                truncate_query_output_serialize(&query),
1896                response
1897                    .text()
1898                    .await
1899                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1900            );
1901            let value: Value = response.json().await.context("invalid JSON")?;
1902            if let Some(errors) = value.get("errors") {
1903                bail!(
1904                    "Query \"{}\" failed: {}",
1905                    truncate_query_output_serialize(&query),
1906                    errors
1907                );
1908            }
1909            return Ok(value);
1910        }
1911        unreachable!()
1912    }
1913
1914    pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
1915        let query = query.as_ref();
1916        self.run_graphql_query(&format!("query {{ {query} }}"))
1917            .await
1918    }
1919
1920    pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
1921        let query = query.as_ref().trim();
1922        let name = query
1923            .split_once(|ch: char| !ch.is_alphanumeric())
1924            .map_or(query, |(name, _)| name);
1925        let data = self.query(query).await?;
1926        serde_json::from_value(data[name].clone())
1927            .with_context(|| format!("{name} field missing in response"))
1928    }
1929
1930    pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
1931        let mutation = mutation.as_ref();
1932        self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
1933            .await
1934    }
1935
1936    pub async fn multiple_mutate(&self, mutations: &[String]) -> Result<Value> {
1937        let mut out = String::from("mutation {\n");
1938        for (index, mutation) in mutations.iter().enumerate() {
1939            out = format!("{}  u{}: {}\n", out, index, mutation);
1940        }
1941        out.push_str("}\n");
1942        self.run_graphql_query(&out).await
1943    }
1944}
1945
1946impl<A> From<String> for ApplicationWrapper<A> {
1947    fn from(uri: String) -> ApplicationWrapper<A> {
1948        ApplicationWrapper {
1949            uri,
1950            _phantom: PhantomData,
1951        }
1952    }
1953}
1954
1955/// Returns the timeout for tests that wait for notifications, either read from the env
1956/// variable `LINERA_TEST_NOTIFICATION_TIMEOUT_MS`, or the default value of 10 seconds.
1957#[cfg(with_testing)]
1958fn notification_timeout() -> Duration {
1959    const NOTIFICATION_TIMEOUT_MS_ENV: &str = "LINERA_TEST_NOTIFICATION_TIMEOUT_MS";
1960    const NOTIFICATION_TIMEOUT_MS_DEFAULT: u64 = 10_000;
1961
1962    match env::var(NOTIFICATION_TIMEOUT_MS_ENV) {
1963        Ok(var) => Duration::from_millis(var.parse().unwrap_or_else(|error| {
1964            panic!("{NOTIFICATION_TIMEOUT_MS_ENV} is not a valid number: {error}")
1965        })),
1966        Err(env::VarError::NotPresent) => Duration::from_millis(NOTIFICATION_TIMEOUT_MS_DEFAULT),
1967        Err(env::VarError::NotUnicode(_)) => {
1968            panic!("{NOTIFICATION_TIMEOUT_MS_ENV} must be valid Unicode")
1969        }
1970    }
1971}
1972
1973#[cfg(with_testing)]
1974pub trait NotificationsExt {
1975    /// Waits for a notification for which `f` returns `Some(t)`, and returns `t`.
1976    fn wait_for<T>(
1977        &mut self,
1978        f: impl FnMut(Notification) -> Option<T>,
1979    ) -> impl Future<Output = Result<T>>;
1980
1981    /// Waits for a `NewEvents` notification for the given block height. If no height is specified,
1982    /// any height is accepted.
1983    fn wait_for_events(
1984        &mut self,
1985        expected_height: impl Into<Option<BlockHeight>>,
1986    ) -> impl Future<Output = Result<BTreeSet<StreamId>>> {
1987        let expected_height = expected_height.into();
1988        self.wait_for(move |notification| {
1989            if let Reason::NewEvents {
1990                height,
1991                event_streams,
1992                ..
1993            } = notification.reason
1994            {
1995                if expected_height.is_none_or(|h| h == height) {
1996                    return Some(event_streams);
1997                }
1998            }
1999            None
2000        })
2001    }
2002
2003    /// Waits for a `NewBlock` notification for the given block height. If no height is specified,
2004    /// any height is accepted.
2005    fn wait_for_block(
2006        &mut self,
2007        expected_height: impl Into<Option<BlockHeight>>,
2008    ) -> impl Future<Output = Result<CryptoHash>> {
2009        let expected_height = expected_height.into();
2010        self.wait_for(move |notification| {
2011            if let Reason::NewBlock { height, hash, .. } = notification.reason {
2012                if expected_height.is_none_or(|h| h == height) {
2013                    return Some(hash);
2014                }
2015            }
2016            None
2017        })
2018    }
2019
2020    /// Waits for a `NewIncomingBundle` notification for the given sender chain and sender block
2021    /// height. If no height is specified, any height is accepted.
2022    fn wait_for_bundle(
2023        &mut self,
2024        expected_origin: ChainId,
2025        expected_height: impl Into<Option<BlockHeight>>,
2026    ) -> impl Future<Output = Result<()>> {
2027        let expected_height = expected_height.into();
2028        self.wait_for(move |notification| {
2029            if let Reason::NewIncomingBundle { height, origin } = notification.reason {
2030                if expected_height.is_none_or(|h| h == height) && origin == expected_origin {
2031                    return Some(());
2032                }
2033            }
2034            None
2035        })
2036    }
2037}
2038
2039#[cfg(with_testing)]
2040impl<S: Stream<Item = Result<Notification>>> NotificationsExt for Pin<Box<S>> {
2041    async fn wait_for<T>(&mut self, mut f: impl FnMut(Notification) -> Option<T>) -> Result<T> {
2042        let mut timeout = Box::pin(linera_base::time::timer::sleep(notification_timeout())).fuse();
2043        loop {
2044            let notification = futures::select! {
2045                () = timeout => bail!("Timeout waiting for notification"),
2046                notification = self.next().fuse() => notification.context("Stream closed")??,
2047            };
2048            if let Some(t) = f(notification) {
2049                return Ok(t);
2050            }
2051        }
2052    }
2053}