linera_service/cli_wrappers/
wallet.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    borrow::Cow,
6    collections::BTreeMap,
7    env,
8    marker::PhantomData,
9    mem,
10    path::{Path, PathBuf},
11    pin::Pin,
12    process::Stdio,
13    str::FromStr,
14    sync,
15    time::Duration,
16};
17
18use anyhow::{bail, ensure, Context, Result};
19use async_graphql::InputType;
20use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
21use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
22use heck::ToKebabCase;
23use linera_base::{
24    abi::ContractAbi,
25    command::{resolve_binary, CommandExt},
26    crypto::{CryptoHash, InMemorySigner},
27    data_types::{Amount, ApplicationPermissions, BlockHeight, Bytecode, Epoch},
28    identifiers::{
29        Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
30    },
31    vm::VmRuntime,
32};
33use linera_client::client_options::ResourceControlPolicyConfig;
34use linera_core::worker::Notification;
35use linera_execution::committee::Committee;
36use linera_faucet_client::Faucet;
37use serde::{de::DeserializeOwned, ser::Serialize};
38use serde_command_opts::to_args;
39use serde_json::{json, Value};
40use tempfile::TempDir;
41use tokio::{
42    io::{AsyncBufReadExt, BufReader},
43    process::{Child, Command},
44    sync::oneshot,
45    task::JoinHandle,
46};
47#[cfg(with_testing)]
48use {
49    futures::FutureExt as _,
50    linera_core::worker::Reason,
51    std::{collections::BTreeSet, future::Future},
52};
53
54use crate::{
55    cli::command::BenchmarkCommand,
56    cli_wrappers::{
57        local_net::{PathProvider, ProcessInbox},
58        Network,
59    },
60    util::{self, ChildExt},
61    wallet::Wallet,
62};
63
64/// The name of the environment variable that allows specifying additional arguments to be passed
65/// to the node-service command of the client.
66const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
67
68fn reqwest_client() -> reqwest::Client {
69    reqwest::ClientBuilder::new()
70        .timeout(Duration::from_secs(60))
71        .build()
72        .unwrap()
73}
74
75/// Wrapper to run a Linera client command.
76pub struct ClientWrapper {
77    binary_path: sync::Mutex<Option<PathBuf>>,
78    testing_prng_seed: Option<u64>,
79    storage: String,
80    wallet: String,
81    keystore: String,
82    max_pending_message_bundles: usize,
83    network: Network,
84    pub path_provider: PathProvider,
85    on_drop: OnClientDrop,
86    extra_args: Vec<String>,
87}
88
89/// Action to perform when the [`ClientWrapper`] is dropped.
90#[derive(Clone, Copy, Debug, Eq, PartialEq)]
91pub enum OnClientDrop {
92    /// Close all the chains on the wallet.
93    CloseChains,
94    /// Do not close any chains, leaving them active.
95    LeakChains,
96}
97
98impl ClientWrapper {
99    pub fn new(
100        path_provider: PathProvider,
101        network: Network,
102        testing_prng_seed: Option<u64>,
103        id: usize,
104        on_drop: OnClientDrop,
105    ) -> Self {
106        Self::new_with_extra_args(
107            path_provider,
108            network,
109            testing_prng_seed,
110            id,
111            on_drop,
112            vec!["--wait-for-outgoing-messages".to_string()],
113            None,
114        )
115    }
116
117    pub fn new_with_extra_args(
118        path_provider: PathProvider,
119        network: Network,
120        testing_prng_seed: Option<u64>,
121        id: usize,
122        on_drop: OnClientDrop,
123        extra_args: Vec<String>,
124        binary_dir: Option<PathBuf>,
125    ) -> Self {
126        let storage = format!(
127            "rocksdb:{}/client_{}.db",
128            path_provider.path().display(),
129            id
130        );
131        let wallet = format!("wallet_{}.json", id);
132        let keystore = format!("keystore_{}.json", id);
133        let binary_path = binary_dir.map(|dir| dir.join("linera"));
134        Self {
135            binary_path: sync::Mutex::new(binary_path),
136            testing_prng_seed,
137            storage,
138            wallet,
139            keystore,
140            max_pending_message_bundles: 10_000,
141            network,
142            path_provider,
143            on_drop,
144            extra_args,
145        }
146    }
147
148    /// Runs `linera project new`.
149    pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
150        let tmp = TempDir::new()?;
151        let mut command = self.command().await?;
152        command
153            .current_dir(tmp.path())
154            .arg("project")
155            .arg("new")
156            .arg(project_name)
157            .arg("--linera-root")
158            .arg(linera_root)
159            .spawn_and_wait_for_stdout()
160            .await?;
161        Ok(tmp)
162    }
163
164    /// Runs `linera project publish`.
165    pub async fn project_publish<T: Serialize>(
166        &self,
167        path: PathBuf,
168        required_application_ids: Vec<String>,
169        publisher: impl Into<Option<ChainId>>,
170        argument: &T,
171    ) -> Result<String> {
172        let json_parameters = serde_json::to_string(&())?;
173        let json_argument = serde_json::to_string(argument)?;
174        let mut command = self.command().await?;
175        command
176            .arg("project")
177            .arg("publish-and-create")
178            .arg(path)
179            .args(publisher.into().iter().map(ChainId::to_string))
180            .args(["--json-parameters", &json_parameters])
181            .args(["--json-argument", &json_argument]);
182        if !required_application_ids.is_empty() {
183            command.arg("--required-application-ids");
184            command.args(required_application_ids);
185        }
186        let stdout = command.spawn_and_wait_for_stdout().await?;
187        Ok(stdout.trim().to_string())
188    }
189
190    /// Runs `linera project test`.
191    pub async fn project_test(&self, path: &Path) -> Result<()> {
192        self.command()
193            .await
194            .context("failed to create project test command")?
195            .current_dir(path)
196            .arg("project")
197            .arg("test")
198            .spawn_and_wait_for_stdout()
199            .await?;
200        Ok(())
201    }
202
203    async fn command_with_envs_and_arguments(
204        &self,
205        envs: &[(&str, &str)],
206        arguments: impl IntoIterator<Item = Cow<'_, str>>,
207    ) -> Result<Command> {
208        let mut command = self.command_binary().await?;
209        command.current_dir(self.path_provider.path());
210        for (key, value) in envs {
211            command.env(key, value);
212        }
213        for argument in arguments {
214            command.arg(&*argument);
215        }
216        Ok(command)
217    }
218
219    async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
220        self.command_with_envs_and_arguments(envs, self.command_arguments())
221            .await
222    }
223
224    async fn command_with_arguments(
225        &self,
226        arguments: impl IntoIterator<Item = Cow<'_, str>>,
227    ) -> Result<Command> {
228        self.command_with_envs_and_arguments(
229            &[(
230                "RUST_LOG",
231                &std::env::var("RUST_LOG").unwrap_or_else(|_| String::from("linera=debug")),
232            )],
233            arguments,
234        )
235        .await
236    }
237
238    async fn command(&self) -> Result<Command> {
239        self.command_with_envs(&[(
240            "RUST_LOG",
241            &std::env::var("RUST_LOG").unwrap_or_else(|_| String::from("linera=debug")),
242        )])
243        .await
244    }
245
246    fn required_command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
247        [
248            "--wallet".into(),
249            self.wallet.as_str().into(),
250            "--keystore".into(),
251            self.keystore.as_str().into(),
252            "--storage".into(),
253            self.storage.as_str().into(),
254            "--send-timeout-ms".into(),
255            "500000".into(),
256            "--recv-timeout-ms".into(),
257            "500000".into(),
258        ]
259        .into_iter()
260        .chain(self.extra_args.iter().map(|s| s.as_str().into()))
261    }
262
263    /// Returns an iterator over the arguments that should be added to all command invocations.
264    fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
265        self.required_command_arguments().chain([
266            "--max-pending-message-bundles".into(),
267            self.max_pending_message_bundles.to_string().into(),
268        ])
269    }
270
271    /// Returns the [`Command`] instance configured to run the appropriate binary.
272    ///
273    /// The path is resolved once and cached inside `self` for subsequent usages.
274    async fn command_binary(&self) -> Result<Command> {
275        match self.command_with_cached_binary_path() {
276            Some(command) => Ok(command),
277            None => {
278                let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
279                let command = Command::new(&resolved_path);
280
281                self.set_cached_binary_path(resolved_path);
282
283                Ok(command)
284            }
285        }
286    }
287
288    /// Returns a [`Command`] instance configured with the cached `binary_path`, if available.
289    fn command_with_cached_binary_path(&self) -> Option<Command> {
290        let binary_path = self.binary_path.lock().unwrap();
291
292        binary_path.as_ref().map(Command::new)
293    }
294
295    /// Sets the cached `binary_path` with the `new_binary_path`.
296    ///
297    /// # Panics
298    ///
299    /// If the cache is already set to a different value. In theory the two threads calling
300    /// `command_binary` can race and resolve the binary path twice, but they should always be the
301    /// same path.
302    fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
303        let mut binary_path = self.binary_path.lock().unwrap();
304
305        if binary_path.is_none() {
306            *binary_path = Some(new_binary_path);
307        } else {
308            assert_eq!(*binary_path, Some(new_binary_path));
309        }
310    }
311
312    /// Runs `linera create-genesis-config`.
313    pub async fn create_genesis_config(
314        &self,
315        num_other_initial_chains: u32,
316        initial_funding: Amount,
317        policy_config: ResourceControlPolicyConfig,
318        http_allow_list: Option<Vec<String>>,
319    ) -> Result<()> {
320        let mut command = self.command().await?;
321        command
322            .args([
323                "create-genesis-config",
324                &num_other_initial_chains.to_string(),
325            ])
326            .args(["--initial-funding", &initial_funding.to_string()])
327            .args(["--committee", "committee.json"])
328            .args(["--genesis", "genesis.json"])
329            .args([
330                "--policy-config",
331                &policy_config.to_string().to_kebab_case(),
332            ]);
333        if let Some(allow_list) = http_allow_list {
334            command
335                .arg("--http-request-allow-list")
336                .arg(allow_list.join(","));
337        }
338        if let Some(seed) = self.testing_prng_seed {
339            command.arg("--testing-prng-seed").arg(seed.to_string());
340        }
341        command.spawn_and_wait_for_stdout().await?;
342        Ok(())
343    }
344
345    /// Runs `linera wallet init`. The genesis config is read from `genesis.json`, or from the
346    /// faucet if provided.
347    pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
348        let mut command = self.command().await?;
349        command.args(["wallet", "init"]);
350        match faucet {
351            None => command.args(["--genesis", "genesis.json"]),
352            Some(faucet) => command.args(["--faucet", faucet.url()]),
353        };
354        if let Some(seed) = self.testing_prng_seed {
355            command.arg("--testing-prng-seed").arg(seed.to_string());
356        }
357        command.spawn_and_wait_for_stdout().await?;
358        Ok(())
359    }
360
361    /// Runs `linera wallet request-chain`.
362    pub async fn request_chain(
363        &self,
364        faucet: &Faucet,
365        set_default: bool,
366    ) -> Result<(ChainId, AccountOwner)> {
367        let mut command = self.command().await?;
368        command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
369        if set_default {
370            command.arg("--set-default");
371        }
372        let stdout = command.spawn_and_wait_for_stdout().await?;
373        let mut lines = stdout.split_whitespace();
374        let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
375        let owner = lines.next().context("missing chain owner")?.parse()?;
376        Ok((chain_id, owner))
377    }
378
379    /// Runs `linera wallet publish-and-create`.
380    #[expect(clippy::too_many_arguments)]
381    pub async fn publish_and_create<
382        A: ContractAbi,
383        Parameters: Serialize,
384        InstantiationArgument: Serialize,
385    >(
386        &self,
387        contract: PathBuf,
388        service: PathBuf,
389        vm_runtime: VmRuntime,
390        parameters: &Parameters,
391        argument: &InstantiationArgument,
392        required_application_ids: &[ApplicationId],
393        publisher: impl Into<Option<ChainId>>,
394    ) -> Result<ApplicationId<A>> {
395        let json_parameters = serde_json::to_string(parameters)?;
396        let json_argument = serde_json::to_string(argument)?;
397        let mut command = self.command().await?;
398        let vm_runtime = format!("{}", vm_runtime);
399        command
400            .arg("publish-and-create")
401            .args([contract, service])
402            .args(["--vm-runtime", &vm_runtime.to_lowercase()])
403            .args(publisher.into().iter().map(ChainId::to_string))
404            .args(["--json-parameters", &json_parameters])
405            .args(["--json-argument", &json_argument]);
406        if !required_application_ids.is_empty() {
407            command.arg("--required-application-ids");
408            command.args(
409                required_application_ids
410                    .iter()
411                    .map(ApplicationId::to_string),
412            );
413        }
414        let stdout = command.spawn_and_wait_for_stdout().await?;
415        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
416    }
417
418    /// Runs `linera publish-module`.
419    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
420        &self,
421        contract: PathBuf,
422        service: PathBuf,
423        vm_runtime: VmRuntime,
424        publisher: impl Into<Option<ChainId>>,
425    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
426        let stdout = self
427            .command()
428            .await?
429            .arg("publish-module")
430            .args([contract, service])
431            .args(["--vm-runtime", &format!("{}", vm_runtime).to_lowercase()])
432            .args(publisher.into().iter().map(ChainId::to_string))
433            .spawn_and_wait_for_stdout()
434            .await?;
435        let module_id: ModuleId = stdout.trim().parse()?;
436        Ok(module_id.with_abi())
437    }
438
439    /// Runs `linera create-application`.
440    pub async fn create_application<
441        Abi: ContractAbi,
442        Parameters: Serialize,
443        InstantiationArgument: Serialize,
444    >(
445        &self,
446        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
447        parameters: &Parameters,
448        argument: &InstantiationArgument,
449        required_application_ids: &[ApplicationId],
450        creator: impl Into<Option<ChainId>>,
451    ) -> Result<ApplicationId<Abi>> {
452        let json_parameters = serde_json::to_string(parameters)?;
453        let json_argument = serde_json::to_string(argument)?;
454        let mut command = self.command().await?;
455        command
456            .arg("create-application")
457            .arg(module_id.forget_abi().to_string())
458            .args(["--json-parameters", &json_parameters])
459            .args(["--json-argument", &json_argument])
460            .args(creator.into().iter().map(ChainId::to_string));
461        if !required_application_ids.is_empty() {
462            command.arg("--required-application-ids");
463            command.args(
464                required_application_ids
465                    .iter()
466                    .map(ApplicationId::to_string),
467            );
468        }
469        let stdout = command.spawn_and_wait_for_stdout().await?;
470        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
471    }
472
473    /// Runs `linera service`.
474    pub async fn run_node_service(
475        &self,
476        port: impl Into<Option<u16>>,
477        process_inbox: ProcessInbox,
478    ) -> Result<NodeService> {
479        self.run_node_service_with_options(port, process_inbox, &[], &[], false)
480            .await
481    }
482
483    /// Runs `linera service` with optional task processor configuration.
484    pub async fn run_node_service_with_options(
485        &self,
486        port: impl Into<Option<u16>>,
487        process_inbox: ProcessInbox,
488        operator_application_ids: &[ApplicationId],
489        operators: &[(String, PathBuf)],
490        read_only: bool,
491    ) -> Result<NodeService> {
492        self.run_node_service_with_all_options(
493            port,
494            process_inbox,
495            operator_application_ids,
496            operators,
497            read_only,
498            &[],
499            &[],
500        )
501        .await
502    }
503
504    /// Runs `linera service` with all available options.
505    #[allow(clippy::too_many_arguments)]
506    pub async fn run_node_service_with_all_options(
507        &self,
508        port: impl Into<Option<u16>>,
509        process_inbox: ProcessInbox,
510        operator_application_ids: &[ApplicationId],
511        operators: &[(String, PathBuf)],
512        read_only: bool,
513        allowed_subscriptions: &[String],
514        subscription_ttls: &[(String, u64)],
515    ) -> Result<NodeService> {
516        let port = port.into().unwrap_or(8080);
517        let mut command = self.command().await?;
518        command.arg("service");
519        if let ProcessInbox::Skip = process_inbox {
520            command.arg("--listener-skip-process-inbox");
521        }
522        if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
523            command.args(var.split_whitespace());
524        }
525        for app_id in operator_application_ids {
526            command.args(["--operator-application-ids", &app_id.to_string()]);
527        }
528        for (name, path) in operators {
529            command.args(["--operators", &format!("{}={}", name, path.display())]);
530        }
531        if read_only {
532            command.arg("--read-only");
533        }
534        for query in allowed_subscriptions {
535            command.args(["--allow-subscription", query]);
536        }
537        for (name, secs) in subscription_ttls {
538            command.args(["--subscription-ttl-secs", &format!("{name}={secs}")]);
539        }
540        let child = command
541            .args(["--port".to_string(), port.to_string()])
542            .spawn_into()?;
543        let client = reqwest_client();
544        for i in 0..10 {
545            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
546            let request = client
547                .get(format!("http://localhost:{}/", port))
548                .send()
549                .await;
550            if request.is_ok() {
551                tracing::info!("Node service has started");
552                return Ok(NodeService::new(port, child));
553            } else {
554                tracing::warn!("Waiting for node service to start");
555            }
556        }
557        bail!("Failed to start node service");
558    }
559
560    /// Runs `linera service` with a controller application.
561    pub async fn run_node_service_with_controller(
562        &self,
563        port: impl Into<Option<u16>>,
564        process_inbox: ProcessInbox,
565        controller_id: &ApplicationId,
566        operators: &[(String, PathBuf)],
567    ) -> Result<NodeService> {
568        let port = port.into().unwrap_or(8080);
569        let mut command = self.command().await?;
570        command.arg("service");
571        if let ProcessInbox::Skip = process_inbox {
572            command.arg("--listener-skip-process-inbox");
573        }
574        if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
575            command.args(var.split_whitespace());
576        }
577        command.args(["--controller-id", &controller_id.to_string()]);
578        for (name, path) in operators {
579            command.args(["--operators", &format!("{}={}", name, path.display())]);
580        }
581        let child = command
582            .args(["--port".to_string(), port.to_string()])
583            .spawn_into()?;
584        let client = reqwest_client();
585        for i in 0..10 {
586            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
587            let request = client
588                .get(format!("http://localhost:{}/", port))
589                .send()
590                .await;
591            if request.is_ok() {
592                tracing::info!("Node service has started");
593                return Ok(NodeService::new(port, child));
594            } else {
595                tracing::warn!("Waiting for node service to start");
596            }
597        }
598        bail!("Failed to start node service");
599    }
600
601    /// Runs `linera validator query`
602    pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
603        let mut command = self.command().await?;
604        command.arg("validator").arg("query").arg(address);
605        let stdout = command.spawn_and_wait_for_stdout().await?;
606
607        // Parse the genesis config hash from the output.
608        // It's on a line like "Genesis config hash: <hash>"
609        let hash = stdout
610            .lines()
611            .find_map(|line| {
612                line.strip_prefix("Genesis config hash: ")
613                    .and_then(|hash_str| hash_str.trim().parse().ok())
614            })
615            .context("error while parsing the result of `linera validator query`")?;
616        Ok(hash)
617    }
618
619    /// Runs `linera validator list`.
620    pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
621        let mut command = self.command().await?;
622        command.arg("validator").arg("list");
623        if let Some(chain_id) = chain_id {
624            command.args(["--chain-id", &chain_id.to_string()]);
625        }
626        command.spawn_and_wait_for_stdout().await?;
627        Ok(())
628    }
629
630    /// Runs `linera sync-validator`.
631    pub async fn sync_validator(
632        &self,
633        chain_ids: impl IntoIterator<Item = &ChainId>,
634        validator_address: impl Into<String>,
635    ) -> Result<()> {
636        let mut command = self.command().await?;
637        command
638            .arg("validator")
639            .arg("sync")
640            .arg(validator_address.into());
641        let mut chain_ids = chain_ids.into_iter().peekable();
642        if chain_ids.peek().is_some() {
643            command
644                .arg("--chains")
645                .args(chain_ids.map(ChainId::to_string));
646        }
647        command.spawn_and_wait_for_stdout().await?;
648        Ok(())
649    }
650
651    /// Runs `linera faucet`.
652    pub async fn run_faucet(
653        &self,
654        port: impl Into<Option<u16>>,
655        chain_id: Option<ChainId>,
656        amount: Amount,
657    ) -> Result<FaucetService> {
658        let port = port.into().unwrap_or(8080);
659        let temp_dir = tempfile::tempdir()
660            .context("Failed to create temporary directory for faucet storage")?;
661        let storage_path = temp_dir.path().join("faucet_storage.sqlite");
662        let mut command = self.command().await?;
663        let command = command
664            .arg("faucet")
665            .args(["--port".to_string(), port.to_string()])
666            .args(["--amount".to_string(), amount.to_string()])
667            .args([
668                "--storage-path".to_string(),
669                storage_path.to_string_lossy().to_string(),
670            ]);
671        if let Some(chain_id) = chain_id {
672            command.arg(chain_id.to_string());
673        }
674        let child = command.spawn_into()?;
675        let client = reqwest_client();
676        for i in 0..10 {
677            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
678            let request = client
679                .get(format!("http://localhost:{}/", port))
680                .send()
681                .await;
682            if request.is_ok() {
683                tracing::info!("Faucet has started");
684                return Ok(FaucetService::new(port, child, temp_dir));
685            } else {
686                tracing::debug!("Waiting for faucet to start");
687            }
688        }
689        bail!("Failed to start faucet");
690    }
691
692    /// Runs `linera local-balance`.
693    pub async fn local_balance(&self, account: Account) -> Result<Amount> {
694        let stdout = self
695            .command()
696            .await?
697            .arg("local-balance")
698            .arg(account.to_string())
699            .spawn_and_wait_for_stdout()
700            .await?;
701        let amount = stdout
702            .trim()
703            .parse()
704            .context("error while parsing the result of `linera local-balance`")?;
705        Ok(amount)
706    }
707
708    /// Runs `linera query-balance`.
709    pub async fn query_balance(&self, account: Account) -> Result<Amount> {
710        let stdout = self
711            .command()
712            .await?
713            .arg("query-balance")
714            .arg(account.to_string())
715            .spawn_and_wait_for_stdout()
716            .await?;
717        let amount = stdout
718            .trim()
719            .parse()
720            .context("error while parsing the result of `linera query-balance`")?;
721        Ok(amount)
722    }
723
724    /// Runs `linera query-application` and parses the JSON result.
725    pub async fn query_application_json<T: DeserializeOwned>(
726        &self,
727        chain_id: ChainId,
728        application_id: ApplicationId,
729        query: impl AsRef<str>,
730    ) -> Result<T> {
731        let query = query.as_ref().trim();
732        let name = query
733            .split_once(|ch: char| !ch.is_alphanumeric())
734            .map_or(query, |(name, _)| name);
735        let stdout = self
736            .command()
737            .await?
738            .arg("query-application")
739            .arg("--chain-id")
740            .arg(chain_id.to_string())
741            .arg("--application-id")
742            .arg(application_id.to_string())
743            .arg(query)
744            .spawn_and_wait_for_stdout()
745            .await?;
746        let data: serde_json::Value =
747            serde_json::from_str(stdout.trim()).context("invalid JSON from query-application")?;
748        serde_json::from_value(data[name].clone())
749            .with_context(|| format!("{name} field missing in query-application response"))
750    }
751
752    /// Runs `linera sync`.
753    pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
754        self.command()
755            .await?
756            .arg("sync")
757            .arg(chain_id.to_string())
758            .spawn_and_wait_for_stdout()
759            .await?;
760        Ok(())
761    }
762
763    /// Runs `linera process-inbox`.
764    pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
765        self.command()
766            .await?
767            .arg("process-inbox")
768            .arg(chain_id.to_string())
769            .spawn_and_wait_for_stdout()
770            .await?;
771        Ok(())
772    }
773
774    /// Runs `linera transfer`.
775    pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
776        self.command()
777            .await?
778            .arg("transfer")
779            .arg(amount.to_string())
780            .args(["--from", &from.to_string()])
781            .args(["--to", &to.to_string()])
782            .spawn_and_wait_for_stdout()
783            .await?;
784        Ok(())
785    }
786
787    /// Runs `linera transfer` with no logging.
788    pub async fn transfer_with_silent_logs(
789        &self,
790        amount: Amount,
791        from: ChainId,
792        to: ChainId,
793    ) -> Result<()> {
794        self.command()
795            .await?
796            .env("RUST_LOG", "off")
797            .arg("transfer")
798            .arg(amount.to_string())
799            .args(["--from", &from.to_string()])
800            .args(["--to", &to.to_string()])
801            .spawn_and_wait_for_stdout()
802            .await?;
803        Ok(())
804    }
805
806    /// Runs `linera transfer` with owner accounts.
807    pub async fn transfer_with_accounts(
808        &self,
809        amount: Amount,
810        from: Account,
811        to: Account,
812    ) -> Result<()> {
813        self.command()
814            .await?
815            .arg("transfer")
816            .arg(amount.to_string())
817            .args(["--from", &from.to_string()])
818            .args(["--to", &to.to_string()])
819            .spawn_and_wait_for_stdout()
820            .await?;
821        Ok(())
822    }
823
824    fn benchmark_command_internal(command: &mut Command, args: &BenchmarkCommand) -> Result<()> {
825        let mut formatted_args = to_args(&args)?;
826        let subcommand = formatted_args.remove(0);
827        // The subcommand is followed by the flattened options, which are preceded by "options".
828        // So remove that as well.
829        formatted_args.remove(0);
830        let options = formatted_args
831            .chunks_exact(2)
832            .flat_map(|pair| {
833                let option = format!("--{}", pair[0]);
834                match pair[1].as_str() {
835                    "true" => vec![option],
836                    "false" => vec![],
837                    _ => vec![option, pair[1].clone()],
838                }
839            })
840            .collect::<Vec<_>>();
841        command
842            .args([
843                "--max-pending-message-bundles",
844                &args.transactions_per_block().to_string(),
845            ])
846            .arg("benchmark")
847            .arg(subcommand)
848            .args(options);
849        Ok(())
850    }
851
852    async fn benchmark_command_with_envs(
853        &self,
854        args: BenchmarkCommand,
855        envs: &[(&str, &str)],
856    ) -> Result<Command> {
857        let mut command = self
858            .command_with_envs_and_arguments(envs, self.required_command_arguments())
859            .await?;
860        Self::benchmark_command_internal(&mut command, &args)?;
861        Ok(command)
862    }
863
864    async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
865        let mut command = self
866            .command_with_arguments(self.required_command_arguments())
867            .await?;
868        Self::benchmark_command_internal(&mut command, &args)?;
869        Ok(command)
870    }
871
872    /// Runs `linera benchmark`.
873    pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
874        let mut command = self.benchmark_command(args).await?;
875        command.spawn_and_wait_for_stdout().await?;
876        Ok(())
877    }
878
879    /// Runs `linera benchmark`, but detached: don't wait for the command to finish, just spawn it
880    /// and return the child process, and the handles to the stdout and stderr.
881    pub async fn benchmark_detached(
882        &self,
883        args: BenchmarkCommand,
884        tx: oneshot::Sender<()>,
885    ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
886        let mut child = self
887            .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
888            .await?
889            .kill_on_drop(true)
890            .stdin(Stdio::piped())
891            .stdout(Stdio::piped())
892            .stderr(Stdio::piped())
893            .spawn()?;
894
895        let pid = child.id().expect("failed to get pid");
896        let stdout = child.stdout.take().expect("stdout not open");
897        let stdout_handle = tokio::spawn(async move {
898            let mut lines = BufReader::new(stdout).lines();
899            while let Ok(Some(line)) = lines.next_line().await {
900                println!("benchmark{{pid={pid}}} {line}");
901            }
902        });
903
904        let stderr = child.stderr.take().expect("stderr not open");
905        let stderr_handle = tokio::spawn(async move {
906            let mut lines = BufReader::new(stderr).lines();
907            let mut tx = Some(tx);
908            while let Ok(Some(line)) = lines.next_line().await {
909                if line.contains("Ready to start benchmark") {
910                    tx.take()
911                        .expect("Should only send signal once")
912                        .send(())
913                        .expect("failed to send ready signal to main thread");
914                } else {
915                    println!("benchmark{{pid={pid}}} {line}");
916                }
917            }
918        });
919        Ok((child, stdout_handle, stderr_handle))
920    }
921
922    async fn open_chain_internal(
923        &self,
924        from: ChainId,
925        owner: Option<AccountOwner>,
926        initial_balance: Amount,
927        super_owner: bool,
928    ) -> Result<(ChainId, AccountOwner)> {
929        let mut command = self.command().await?;
930        command
931            .arg("open-chain")
932            .args(["--from", &from.to_string()])
933            .args(["--initial-balance", &initial_balance.to_string()]);
934
935        if let Some(owner) = owner {
936            command.args(["--owner", &owner.to_string()]);
937        }
938
939        if super_owner {
940            command.arg("--super-owner");
941        }
942
943        let stdout = command.spawn_and_wait_for_stdout().await?;
944        let mut split = stdout.split('\n');
945        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
946        let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
947        if let Some(owner) = owner {
948            assert_eq!(owner, new_owner);
949        }
950        Ok((chain_id, new_owner))
951    }
952
953    /// Runs `linera open-chain --super-owner`.
954    pub async fn open_chain_super_owner(
955        &self,
956        from: ChainId,
957        owner: Option<AccountOwner>,
958        initial_balance: Amount,
959    ) -> Result<(ChainId, AccountOwner)> {
960        self.open_chain_internal(from, owner, initial_balance, true)
961            .await
962    }
963
964    /// Runs `linera open-chain`.
965    pub async fn open_chain(
966        &self,
967        from: ChainId,
968        owner: Option<AccountOwner>,
969        initial_balance: Amount,
970    ) -> Result<(ChainId, AccountOwner)> {
971        self.open_chain_internal(from, owner, initial_balance, false)
972            .await
973    }
974
975    /// Runs `linera open-chain` then `linera assign`.
976    pub async fn open_and_assign(
977        &self,
978        client: &ClientWrapper,
979        initial_balance: Amount,
980    ) -> Result<ChainId> {
981        let our_chain = self
982            .load_wallet()?
983            .default_chain()
984            .context("no default chain found")?;
985        let owner = client.keygen().await?;
986        let (new_chain, _) = self
987            .open_chain(our_chain, Some(owner), initial_balance)
988            .await?;
989        client.assign(owner, new_chain).await?;
990        Ok(new_chain)
991    }
992
993    pub async fn open_multi_owner_chain(
994        &self,
995        from: ChainId,
996        owners: BTreeMap<AccountOwner, u64>,
997        multi_leader_rounds: u32,
998        balance: Amount,
999        base_timeout_ms: u64,
1000    ) -> Result<ChainId> {
1001        let mut command = self.command().await?;
1002        command
1003            .arg("open-multi-owner-chain")
1004            .args(["--from", &from.to_string()])
1005            .arg("--owners")
1006            .arg(serde_json::to_string(&owners)?)
1007            .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
1008        command
1009            .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
1010            .args(["--initial-balance", &balance.to_string()]);
1011
1012        let stdout = command.spawn_and_wait_for_stdout().await?;
1013        let mut split = stdout.split('\n');
1014        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
1015
1016        Ok(chain_id)
1017    }
1018
1019    pub async fn change_ownership(
1020        &self,
1021        chain_id: ChainId,
1022        super_owners: Vec<AccountOwner>,
1023        owners: Vec<AccountOwner>,
1024    ) -> Result<()> {
1025        let mut command = self.command().await?;
1026        command
1027            .arg("change-ownership")
1028            .args(["--chain-id", &chain_id.to_string()]);
1029        command
1030            .arg("--super-owners")
1031            .arg(serde_json::to_string(&super_owners)?);
1032        command.arg("--owners").arg(serde_json::to_string(
1033            &owners
1034                .into_iter()
1035                .zip(std::iter::repeat(100u64))
1036                .collect::<BTreeMap<_, _>>(),
1037        )?);
1038        command.spawn_and_wait_for_stdout().await?;
1039        Ok(())
1040    }
1041
1042    pub async fn change_application_permissions(
1043        &self,
1044        chain_id: ChainId,
1045        application_permissions: ApplicationPermissions,
1046    ) -> Result<()> {
1047        let mut command = self.command().await?;
1048        command
1049            .arg("change-application-permissions")
1050            .args(["--chain-id", &chain_id.to_string()]);
1051        command.arg("--manage-chain").arg(serde_json::to_string(
1052            &application_permissions.manage_chain,
1053        )?);
1054        // TODO: add other fields
1055        command.spawn_and_wait_for_stdout().await?;
1056        Ok(())
1057    }
1058
1059    /// Runs `linera wallet follow-chain CHAIN_ID`.
1060    pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
1061        let mut command = self.command().await?;
1062        command
1063            .args(["wallet", "follow-chain"])
1064            .arg(chain_id.to_string());
1065        if sync {
1066            command.arg("--sync");
1067        }
1068        command.spawn_and_wait_for_stdout().await?;
1069        Ok(())
1070    }
1071
1072    /// Runs `linera wallet forget-chain CHAIN_ID`.
1073    pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
1074        let mut command = self.command().await?;
1075        command
1076            .args(["wallet", "forget-chain"])
1077            .arg(chain_id.to_string());
1078        command.spawn_and_wait_for_stdout().await?;
1079        Ok(())
1080    }
1081
1082    /// Runs `linera wallet set-default CHAIN_ID`.
1083    pub async fn set_default_chain(&self, chain_id: ChainId) -> Result<()> {
1084        let mut command = self.command().await?;
1085        command
1086            .args(["wallet", "set-default"])
1087            .arg(chain_id.to_string());
1088        command.spawn_and_wait_for_stdout().await?;
1089        Ok(())
1090    }
1091
1092    pub async fn retry_pending_block(
1093        &self,
1094        chain_id: Option<ChainId>,
1095    ) -> Result<Option<CryptoHash>> {
1096        let mut command = self.command().await?;
1097        command.arg("retry-pending-block");
1098        if let Some(chain_id) = chain_id {
1099            command.arg(chain_id.to_string());
1100        }
1101        let stdout = command.spawn_and_wait_for_stdout().await?;
1102        let stdout = stdout.trim();
1103        if stdout.is_empty() {
1104            Ok(None)
1105        } else {
1106            Ok(Some(CryptoHash::from_str(stdout)?))
1107        }
1108    }
1109
1110    /// Runs `linera publish-data-blob`.
1111    pub async fn publish_data_blob(
1112        &self,
1113        path: &Path,
1114        chain_id: Option<ChainId>,
1115    ) -> Result<CryptoHash> {
1116        let mut command = self.command().await?;
1117        command.arg("publish-data-blob").arg(path);
1118        if let Some(chain_id) = chain_id {
1119            command.arg(chain_id.to_string());
1120        }
1121        let stdout = command.spawn_and_wait_for_stdout().await?;
1122        let stdout = stdout.trim();
1123        Ok(CryptoHash::from_str(stdout)?)
1124    }
1125
1126    /// Runs `linera read-data-blob`.
1127    pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
1128        let mut command = self.command().await?;
1129        command.arg("read-data-blob").arg(hash.to_string());
1130        if let Some(chain_id) = chain_id {
1131            command.arg(chain_id.to_string());
1132        }
1133        command.spawn_and_wait_for_stdout().await?;
1134        Ok(())
1135    }
1136
1137    pub fn load_wallet(&self) -> Result<Wallet> {
1138        Ok(Wallet::read(&self.wallet_path())?)
1139    }
1140
1141    pub fn load_keystore(&self) -> Result<InMemorySigner> {
1142        util::read_json(self.keystore_path())
1143    }
1144
1145    pub fn wallet_path(&self) -> PathBuf {
1146        self.path_provider.path().join(&self.wallet)
1147    }
1148
1149    pub fn keystore_path(&self) -> PathBuf {
1150        self.path_provider.path().join(&self.keystore)
1151    }
1152
1153    pub fn storage_path(&self) -> &str {
1154        &self.storage
1155    }
1156
1157    pub fn get_owner(&self) -> Option<AccountOwner> {
1158        let wallet = self.load_wallet().ok()?;
1159        wallet
1160            .get(wallet.default_chain()?)
1161            .expect("default chain must be in wallet")
1162            .owner
1163    }
1164
1165    pub fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
1166        self.load_wallet()
1167            .ok()
1168            .is_some_and(|wallet| wallet.get(chain).is_some())
1169    }
1170
1171    pub async fn set_validator(
1172        &self,
1173        validator_key: &(String, String),
1174        port: usize,
1175        votes: usize,
1176    ) -> Result<()> {
1177        let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1178        self.command()
1179            .await?
1180            .arg("validator")
1181            .arg("add")
1182            .args(["--public-key", &validator_key.0])
1183            .args(["--account-key", &validator_key.1])
1184            .args(["--address", &address])
1185            .args(["--votes", &votes.to_string()])
1186            .spawn_and_wait_for_stdout()
1187            .await?;
1188        Ok(())
1189    }
1190
1191    pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
1192        self.command()
1193            .await?
1194            .arg("validator")
1195            .arg("remove")
1196            .args(["--public-key", validator_key])
1197            .spawn_and_wait_for_stdout()
1198            .await?;
1199        Ok(())
1200    }
1201
1202    pub async fn change_validators(
1203        &self,
1204        add_validators: &[(String, String, usize, usize)], // (public_key, account_key, port, votes)
1205        modify_validators: &[(String, String, usize, usize)], // (public_key, account_key, port, votes)
1206        remove_validators: &[String],
1207    ) -> Result<()> {
1208        use std::str::FromStr;
1209
1210        use linera_base::crypto::{AccountPublicKey, ValidatorPublicKey};
1211
1212        // Build a map that will be serialized to JSON
1213        // Use the exact types that deserialization expects
1214        let mut changes = std::collections::HashMap::new();
1215
1216        // Add/modify validators
1217        for (public_key_str, account_key_str, port, votes) in
1218            add_validators.iter().chain(modify_validators.iter())
1219        {
1220            let public_key = ValidatorPublicKey::from_str(public_key_str)
1221                .with_context(|| format!("Invalid validator public key: {}", public_key_str))?;
1222
1223            let account_key = AccountPublicKey::from_str(account_key_str)
1224                .with_context(|| format!("Invalid account public key: {}", account_key_str))?;
1225
1226            let address = format!("{}:127.0.0.1:{}", self.network.short(), port)
1227                .parse()
1228                .unwrap();
1229
1230            // Create ValidatorChange struct
1231            let change = crate::cli::validator::Change {
1232                account_key,
1233                address,
1234                votes: crate::cli::validator::Votes(
1235                    std::num::NonZero::new(*votes as u64).context("Votes must be non-zero")?,
1236                ),
1237            };
1238
1239            changes.insert(public_key, Some(change));
1240        }
1241
1242        // Remove validators (set to None)
1243        for validator_key_str in remove_validators {
1244            let public_key = ValidatorPublicKey::from_str(validator_key_str)
1245                .with_context(|| format!("Invalid validator public key: {}", validator_key_str))?;
1246            changes.insert(public_key, None);
1247        }
1248
1249        // Create temporary file with JSON
1250        let temp_file = tempfile::NamedTempFile::new()
1251            .context("Failed to create temporary file for validator changes")?;
1252        serde_json::to_writer(&temp_file, &changes)
1253            .context("Failed to write validator changes to file")?;
1254        let temp_path = temp_file.path();
1255
1256        self.command()
1257            .await?
1258            .arg("validator")
1259            .arg("update")
1260            .arg(temp_path)
1261            .arg("--yes") // Skip confirmation prompt
1262            .spawn_and_wait_for_stdout()
1263            .await?;
1264
1265        Ok(())
1266    }
1267
1268    pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
1269        self.command()
1270            .await?
1271            .arg("revoke-epochs")
1272            .arg(epoch.to_string())
1273            .spawn_and_wait_for_stdout()
1274            .await?;
1275        Ok(())
1276    }
1277
1278    /// Runs `linera keygen`.
1279    pub async fn keygen(&self) -> Result<AccountOwner> {
1280        let stdout = self
1281            .command()
1282            .await?
1283            .arg("keygen")
1284            .spawn_and_wait_for_stdout()
1285            .await?;
1286        AccountOwner::from_str(stdout.as_str().trim())
1287    }
1288
1289    /// Returns the default chain.
1290    pub fn default_chain(&self) -> Option<ChainId> {
1291        self.load_wallet().ok()?.default_chain()
1292    }
1293
1294    /// Runs `linera assign`.
1295    pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
1296        let _stdout = self
1297            .command()
1298            .await?
1299            .arg("assign")
1300            .args(["--owner", &owner.to_string()])
1301            .args(["--chain-id", &chain_id.to_string()])
1302            .spawn_and_wait_for_stdout()
1303            .await?;
1304        Ok(())
1305    }
1306
1307    /// Runs `linera set-preferred-owner` for `chain_id`.
1308    pub async fn set_preferred_owner(
1309        &self,
1310        chain_id: ChainId,
1311        owner: Option<AccountOwner>,
1312    ) -> Result<()> {
1313        let mut owner_arg = vec!["--owner".to_string()];
1314        if let Some(owner) = owner {
1315            owner_arg.push(owner.to_string());
1316        };
1317        self.command()
1318            .await?
1319            .arg("set-preferred-owner")
1320            .args(["--chain-id", &chain_id.to_string()])
1321            .args(owner_arg)
1322            .spawn_and_wait_for_stdout()
1323            .await?;
1324        Ok(())
1325    }
1326
1327    pub async fn build_application(
1328        &self,
1329        path: &Path,
1330        name: &str,
1331        is_workspace: bool,
1332    ) -> Result<(PathBuf, PathBuf)> {
1333        Command::new("cargo")
1334            .current_dir(self.path_provider.path())
1335            .arg("build")
1336            .arg("--release")
1337            .args(["--target", "wasm32-unknown-unknown"])
1338            .arg("--manifest-path")
1339            .arg(path.join("Cargo.toml"))
1340            .spawn_and_wait_for_stdout()
1341            .await?;
1342
1343        let release_dir = match is_workspace {
1344            true => path.join("../target/wasm32-unknown-unknown/release"),
1345            false => path.join("target/wasm32-unknown-unknown/release"),
1346        };
1347
1348        let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1349        let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1350
1351        let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1352        let service_size = fs_err::tokio::metadata(&service).await?.len();
1353        tracing::info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1354
1355        Ok((contract, service))
1356    }
1357}
1358
1359impl Drop for ClientWrapper {
1360    fn drop(&mut self) {
1361        use std::process::Command as SyncCommand;
1362
1363        if self.on_drop != OnClientDrop::CloseChains {
1364            return;
1365        }
1366
1367        let Ok(binary_path) = self.binary_path.lock() else {
1368            tracing::error!(
1369                "Failed to close chains because a thread panicked with a lock to `binary_path`"
1370            );
1371            return;
1372        };
1373
1374        let Some(binary_path) = binary_path.as_ref() else {
1375            tracing::warn!(
1376                "Assuming no chains need to be closed, because the command binary was never \
1377                resolved and therefore presumably never called"
1378            );
1379            return;
1380        };
1381
1382        let working_directory = self.path_provider.path();
1383        let mut wallet_show_command = SyncCommand::new(binary_path);
1384
1385        for argument in self.command_arguments() {
1386            wallet_show_command.arg(&*argument);
1387        }
1388
1389        let Ok(wallet_show_output) = wallet_show_command
1390            .current_dir(working_directory)
1391            .args(["wallet", "show", "--short", "--owned"])
1392            .output()
1393        else {
1394            tracing::warn!("Failed to execute `wallet show --short` to list chains to close");
1395            return;
1396        };
1397
1398        if !wallet_show_output.status.success() {
1399            tracing::warn!("Failed to list chains in the wallet to close them");
1400            return;
1401        }
1402
1403        let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1404            tracing::warn!(
1405                "Failed to close chains because `linera wallet show --short` \
1406                returned a non-UTF-8 output"
1407            );
1408            return;
1409        };
1410
1411        let chain_ids = chain_list_string
1412            .split('\n')
1413            .map(|line| line.trim())
1414            .filter(|line| !line.is_empty());
1415
1416        for chain_id in chain_ids {
1417            let mut close_chain_command = SyncCommand::new(binary_path);
1418
1419            for argument in self.command_arguments() {
1420                close_chain_command.arg(&*argument);
1421            }
1422
1423            close_chain_command.current_dir(working_directory);
1424
1425            match close_chain_command.args(["close-chain", chain_id]).status() {
1426                Ok(status) if status.success() => (),
1427                Ok(failure) => tracing::warn!("Failed to close chain {chain_id}: {failure}"),
1428                Err(error) => tracing::warn!("Failed to close chain {chain_id}: {error}"),
1429            }
1430        }
1431    }
1432}
1433
1434#[cfg(with_testing)]
1435impl ClientWrapper {
1436    pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1437        self.build_application(Self::example_path(name)?.as_path(), name, true)
1438            .await
1439    }
1440
1441    pub fn example_path(name: &str) -> Result<PathBuf> {
1442        Ok(env::current_dir()?.join("../examples/").join(name))
1443    }
1444}
1445
1446fn truncate_query_output(input: &str) -> String {
1447    let max_len = 1000;
1448    if input.len() < max_len {
1449        input.to_string()
1450    } else {
1451        format!("{} ...", input.get(..max_len).unwrap())
1452    }
1453}
1454
1455fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1456    let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1457    let max_len = 200;
1458    if query.len() < max_len {
1459        query
1460    } else {
1461        format!("{} ...", query.get(..max_len).unwrap())
1462    }
1463}
1464
1465/// A running node service.
1466/// Logs a warning if a child process has already exited when its wrapper is dropped.
1467/// On Unix, includes the signal number if the process was killed (e.g. signal 9 = OOM).
1468fn log_unexpected_exit(child: &mut Child, service_kind: &str, port: u16) {
1469    match child.try_wait() {
1470        Ok(Some(status)) => {
1471            #[cfg(unix)]
1472            {
1473                use std::os::unix::process::ExitStatusExt as _;
1474                if let Some(signal) = status.signal() {
1475                    tracing::error!(
1476                        port,
1477                        signal,
1478                        "The {service_kind} service was killed by signal {signal}",
1479                    );
1480                    return;
1481                }
1482            }
1483            if !status.success() {
1484                tracing::error!(
1485                    port,
1486                    %status,
1487                    "The {service_kind} service exited unexpectedly with {status}",
1488                );
1489            }
1490        }
1491        Ok(None) => {} // Still running — normal case when terminate() was called.
1492        Err(error) => {
1493            tracing::warn!(
1494                port,
1495                %error,
1496                "Failed to check {service_kind} service status",
1497            );
1498        }
1499    }
1500}
1501
1502pub struct NodeService {
1503    port: u16,
1504    child: Child,
1505    terminated: bool,
1506}
1507
1508impl Drop for NodeService {
1509    fn drop(&mut self) {
1510        if !self.terminated {
1511            log_unexpected_exit(&mut self.child, "node", self.port);
1512        }
1513    }
1514}
1515
1516impl NodeService {
1517    fn new(port: u16, child: Child) -> Self {
1518        Self {
1519            port,
1520            child,
1521            terminated: false,
1522        }
1523    }
1524
1525    pub async fn terminate(mut self) -> Result<()> {
1526        self.terminated = true;
1527        self.child.kill().await.context("terminating node service")
1528    }
1529
1530    pub fn port(&self) -> u16 {
1531        self.port
1532    }
1533
1534    pub fn ensure_is_running(&mut self) -> Result<()> {
1535        self.child.ensure_is_running()
1536    }
1537
1538    pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1539        let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1540        let mut data = self.query_node(query).await?;
1541        Ok(serde_json::from_value(data["processInbox"].take())?)
1542    }
1543
1544    pub async fn sync(&self, chain_id: &ChainId) -> Result<u64> {
1545        let query = format!("mutation {{ sync(chainId: \"{chain_id}\") }}");
1546        let mut data = self.query_node(query).await?;
1547        Ok(serde_json::from_value(data["sync"].take())?)
1548    }
1549
1550    pub async fn transfer(
1551        &self,
1552        chain_id: ChainId,
1553        owner: AccountOwner,
1554        recipient: Account,
1555        amount: Amount,
1556    ) -> Result<CryptoHash> {
1557        let json_owner = owner.to_value();
1558        let json_recipient = recipient.to_value();
1559        let query = format!(
1560            "mutation {{ transfer(\
1561                 chainId: \"{chain_id}\", \
1562                 owner: {json_owner}, \
1563                 recipient: {json_recipient}, \
1564                 amount: \"{amount}\") \
1565             }}"
1566        );
1567        let data = self.query_node(query).await?;
1568        serde_json::from_value(data["transfer"].clone())
1569            .context("missing transfer field in response")
1570    }
1571
1572    pub async fn balance(&self, account: &Account) -> Result<Amount> {
1573        let chain = account.chain_id;
1574        let owner = account.owner;
1575        if matches!(owner, AccountOwner::CHAIN) {
1576            let query = format!(
1577                "query {{ chain(chainId:\"{chain}\") {{
1578                    executionState {{ system {{ balance }} }}
1579                }} }}"
1580            );
1581            let response = self.query_node(query).await?;
1582            let balance = &response["chain"]["executionState"]["system"]["balance"]
1583                .as_str()
1584                .unwrap();
1585            return Ok(Amount::from_str(balance)?);
1586        }
1587        let query = format!(
1588            "query {{ chain(chainId:\"{chain}\") {{
1589                executionState {{ system {{ balances {{
1590                    entry(key:\"{owner}\") {{ value }}
1591                }} }} }}
1592            }} }}"
1593        );
1594        let response = self.query_node(query).await?;
1595        let balances = &response["chain"]["executionState"]["system"]["balances"];
1596        let balance = balances["entry"]["value"].as_str();
1597        match balance {
1598            None => Ok(Amount::ZERO),
1599            Some(amount) => Ok(Amount::from_str(amount)?),
1600        }
1601    }
1602
1603    pub fn make_application<A: ContractAbi>(
1604        &self,
1605        chain_id: &ChainId,
1606        application_id: &ApplicationId<A>,
1607    ) -> Result<ApplicationWrapper<A>> {
1608        let application_id = application_id.forget_abi().to_string();
1609        let link = format!(
1610            "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1611            self.port
1612        );
1613        Ok(ApplicationWrapper::from(link))
1614    }
1615
1616    pub async fn publish_data_blob(
1617        &self,
1618        chain_id: &ChainId,
1619        bytes: Vec<u8>,
1620    ) -> Result<CryptoHash> {
1621        let query = format!(
1622            "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1623            chain_id.to_value(),
1624            bytes.to_value(),
1625        );
1626        let data = self.query_node(query).await?;
1627        serde_json::from_value(data["publishDataBlob"].clone())
1628            .context("missing publishDataBlob field in response")
1629    }
1630
1631    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1632        &self,
1633        chain_id: &ChainId,
1634        contract: PathBuf,
1635        service: PathBuf,
1636        vm_runtime: VmRuntime,
1637    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1638        let contract_code = Bytecode::load_from_file(&contract).await?;
1639        let service_code = Bytecode::load_from_file(&service).await?;
1640        let query = format!(
1641            "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1642            chain_id.to_value(),
1643            contract_code.to_value(),
1644            service_code.to_value(),
1645            vm_runtime.to_value(),
1646        );
1647        let data = self.query_node(query).await?;
1648        let module_str = data["publishModule"]
1649            .as_str()
1650            .context("module ID not found")?;
1651        let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1652        Ok(module_id.with_abi())
1653    }
1654
1655    pub async fn query_committees(&self, chain_id: &ChainId) -> Result<BTreeMap<Epoch, Committee>> {
1656        let query = format!(
1657            "query {{ chain(chainId:\"{chain_id}\") {{
1658                executionState {{ system {{ committees }} }}
1659            }} }}"
1660        );
1661        let mut response = self.query_node(query).await?;
1662        let committees = response["chain"]["executionState"]["system"]["committees"].take();
1663        Ok(serde_json::from_value(committees)?)
1664    }
1665
1666    pub async fn events_from_index(
1667        &self,
1668        chain_id: &ChainId,
1669        stream_id: &StreamId,
1670        start_index: u32,
1671    ) -> Result<Vec<IndexAndEvent>> {
1672        let query = format!(
1673            "query {{
1674               eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1675               {{ index event }}
1676             }}",
1677            stream_id.to_value()
1678        );
1679        let mut response = self.query_node(query).await?;
1680        let response = response["eventsFromIndex"].take();
1681        Ok(serde_json::from_value(response)?)
1682    }
1683
1684    pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1685        let n_try = 5;
1686        let query = query.as_ref();
1687        for i in 0..n_try {
1688            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1689            let url = format!("http://localhost:{}/", self.port);
1690            let client = reqwest_client();
1691            let result = client
1692                .post(url)
1693                .json(&json!({ "query": query }))
1694                .send()
1695                .await;
1696            if matches!(result, Err(ref error) if error.is_timeout()) {
1697                tracing::warn!(
1698                    "Timeout when sending query {} to the node service",
1699                    truncate_query_output(query)
1700                );
1701                continue;
1702            }
1703            let response = result.with_context(|| {
1704                format!(
1705                    "query_node: failed to post query={}",
1706                    truncate_query_output(query)
1707                )
1708            })?;
1709            ensure!(
1710                response.status().is_success(),
1711                "Query \"{}\" failed: {}",
1712                truncate_query_output(query),
1713                response
1714                    .text()
1715                    .await
1716                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1717            );
1718            let value: Value = response.json().await.context("invalid JSON")?;
1719            if let Some(errors) = value.get("errors") {
1720                tracing::warn!(
1721                    "Query \"{}\" failed: {}",
1722                    truncate_query_output(query),
1723                    errors
1724                );
1725            } else {
1726                return Ok(value["data"].clone());
1727            }
1728        }
1729        bail!(
1730            "Query \"{}\" failed after {} retries.",
1731            truncate_query_output(query),
1732            n_try
1733        );
1734    }
1735
1736    pub async fn create_application<
1737        Abi: ContractAbi,
1738        Parameters: Serialize,
1739        InstantiationArgument: Serialize,
1740    >(
1741        &self,
1742        chain_id: &ChainId,
1743        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1744        parameters: &Parameters,
1745        argument: &InstantiationArgument,
1746        required_application_ids: &[ApplicationId],
1747    ) -> Result<ApplicationId<Abi>> {
1748        let module_id = module_id.forget_abi();
1749        let json_required_applications_ids = required_application_ids
1750            .iter()
1751            .map(ApplicationId::to_string)
1752            .collect::<Vec<_>>()
1753            .to_value();
1754        // Convert to `serde_json::Value` then `async_graphql::Value` via the trait `InputType`.
1755        let new_parameters = serde_json::to_value(parameters)
1756            .context("could not create parameters JSON")?
1757            .to_value();
1758        let new_argument = serde_json::to_value(argument)
1759            .context("could not create argument JSON")?
1760            .to_value();
1761        let query = format!(
1762            "mutation {{ createApplication(\
1763                 chainId: \"{chain_id}\",
1764                 moduleId: \"{module_id}\", \
1765                 parameters: {new_parameters}, \
1766                 instantiationArgument: {new_argument}, \
1767                 requiredApplicationIds: {json_required_applications_ids}) \
1768             }}"
1769        );
1770        let data = self.query_node(query).await?;
1771        let app_id_str = data["createApplication"]
1772            .as_str()
1773            .context("missing createApplication string in response")?
1774            .trim();
1775        Ok(app_id_str
1776            .parse::<ApplicationId>()
1777            .context("invalid application ID")?
1778            .with_abi())
1779    }
1780
1781    /// Obtains the hash and height of the `chain`'s tip block, as known by this node service.
1782    pub async fn chain_tip(&self, chain: ChainId) -> Result<Option<(CryptoHash, BlockHeight)>> {
1783        let query = format!(
1784            r#"query {{ block(chainId: "{chain}") {{
1785                hash
1786                block {{ header {{ height }} }}
1787            }} }}"#
1788        );
1789
1790        let mut response = self.query_node(&query).await?;
1791
1792        match (
1793            mem::take(&mut response["block"]["hash"]),
1794            mem::take(&mut response["block"]["block"]["header"]["height"]),
1795        ) {
1796            (Value::Null, Value::Null) => Ok(None),
1797            (Value::String(hash), Value::Number(height)) => Ok(Some((
1798                hash.parse()
1799                    .context("Received an invalid hash {hash:?} for chain tip")?,
1800                BlockHeight(height.as_u64().unwrap()),
1801            ))),
1802            invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
1803        }
1804    }
1805
1806    /// Subscribes to the node service and returns a stream of notifications about a chain.
1807    pub async fn notifications(
1808        &self,
1809        chain_id: ChainId,
1810    ) -> Result<Pin<Box<impl Stream<Item = Result<Notification>>>>> {
1811        let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
1812        let url = format!("ws://localhost:{}/ws", self.port);
1813        let mut request = url.into_client_request()?;
1814        request.headers_mut().insert(
1815            "Sec-WebSocket-Protocol",
1816            HeaderValue::from_str("graphql-transport-ws")?,
1817        );
1818        let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1819        let init_json = json!({
1820          "type": "connection_init",
1821          "payload": {}
1822        });
1823        websocket.send(init_json.to_string().into()).await?;
1824        let text = websocket
1825            .next()
1826            .await
1827            .context("Failed to establish connection")??
1828            .into_text()?;
1829        ensure!(
1830            text == "{\"type\":\"connection_ack\"}",
1831            "Unexpected response: {text}"
1832        );
1833        let query_json = json!({
1834          "id": "1",
1835          "type": "start",
1836          "payload": {
1837            "query": query,
1838            "variables": {},
1839            "operationName": null
1840          }
1841        });
1842        websocket.send(query_json.to_string().into()).await?;
1843        Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
1844            |message| async {
1845                let text = message.into_text()?;
1846                let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1847                if let Some(errors) = value["payload"].get("errors") {
1848                    bail!("Notification subscription failed: {errors:?}");
1849                }
1850                serde_json::from_value(value["payload"]["data"]["notifications"].clone())
1851                    .context("Failed to deserialize notification")
1852            },
1853        )))
1854    }
1855
1856    /// Subscribes to query results via the `queryResult` GraphQL subscription.
1857    pub async fn query_result(
1858        &self,
1859        name: &str,
1860        chain_id: ChainId,
1861        application_id: &ApplicationId,
1862    ) -> Result<Pin<Box<impl Stream<Item = Result<Value>>>>> {
1863        let query = format!(
1864            r#"subscription {{ queryResult(name: "{name}", chainId: "{chain_id}", applicationId: "{application_id}") }}"#,
1865        );
1866        let url = format!("ws://localhost:{}/ws", self.port);
1867        let mut request = url.into_client_request()?;
1868        request.headers_mut().insert(
1869            "Sec-WebSocket-Protocol",
1870            HeaderValue::from_str("graphql-transport-ws")?,
1871        );
1872        let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1873        let init_json = json!({
1874          "type": "connection_init",
1875          "payload": {}
1876        });
1877        websocket.send(init_json.to_string().into()).await?;
1878        let text = websocket
1879            .next()
1880            .await
1881            .context("Failed to establish connection")??
1882            .into_text()?;
1883        ensure!(
1884            text == "{\"type\":\"connection_ack\"}",
1885            "Unexpected response: {text}"
1886        );
1887        let query_json = json!({
1888          "id": "1",
1889          "type": "start",
1890          "payload": {
1891            "query": query,
1892            "variables": {},
1893            "operationName": null
1894          }
1895        });
1896        websocket.send(query_json.to_string().into()).await?;
1897        Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
1898            |message| async {
1899                let text = message.into_text()?;
1900                let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1901                if let Some(errors) = value["payload"].get("errors") {
1902                    bail!("Query result subscription failed: {errors:?}");
1903                }
1904                Ok(value["payload"]["data"]["queryResult"].clone())
1905            },
1906        )))
1907    }
1908}
1909
1910/// A running faucet service.
1911pub struct FaucetService {
1912    port: u16,
1913    child: Child,
1914    _temp_dir: tempfile::TempDir,
1915    terminated: bool,
1916}
1917
1918impl Drop for FaucetService {
1919    fn drop(&mut self) {
1920        if !self.terminated {
1921            log_unexpected_exit(&mut self.child, "faucet", self.port);
1922        }
1923    }
1924}
1925
1926impl FaucetService {
1927    fn new(port: u16, child: Child, temp_dir: tempfile::TempDir) -> Self {
1928        Self {
1929            port,
1930            child,
1931            _temp_dir: temp_dir,
1932            terminated: false,
1933        }
1934    }
1935
1936    pub async fn terminate(mut self) -> Result<()> {
1937        self.terminated = true;
1938        self.child
1939            .kill()
1940            .await
1941            .context("terminating faucet service")
1942    }
1943
1944    pub fn ensure_is_running(&mut self) -> Result<()> {
1945        self.child.ensure_is_running()
1946    }
1947
1948    pub fn instance(&self) -> Faucet {
1949        Faucet::new(format!("http://localhost:{}/", self.port))
1950    }
1951}
1952
1953/// A running `Application` to be queried in GraphQL.
1954pub struct ApplicationWrapper<A> {
1955    uri: String,
1956    _phantom: PhantomData<A>,
1957}
1958
1959impl<A> ApplicationWrapper<A> {
1960    pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
1961        let query = query.as_ref();
1962        let value = self.run_json_query(json!({ "query": query })).await?;
1963        Ok(value["data"].clone())
1964    }
1965
1966    pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
1967        const MAX_RETRIES: usize = 5;
1968
1969        for i in 0.. {
1970            let client = reqwest_client();
1971            let result = client.post(&self.uri).json(&query).send().await;
1972            let response = match result {
1973                Ok(response) => response,
1974                Err(error) if i < MAX_RETRIES => {
1975                    tracing::warn!(
1976                        "Failed to post query \"{}\": {error}; retrying",
1977                        truncate_query_output_serialize(&query),
1978                    );
1979                    continue;
1980                }
1981                Err(error) => {
1982                    let query = truncate_query_output_serialize(&query);
1983                    return Err(error)
1984                        .with_context(|| format!("run_json_query: failed to post query={query}"));
1985                }
1986            };
1987            ensure!(
1988                response.status().is_success(),
1989                "Query \"{}\" failed: {}",
1990                truncate_query_output_serialize(&query),
1991                response
1992                    .text()
1993                    .await
1994                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1995            );
1996            let value: Value = response.json().await.context("invalid JSON")?;
1997            if let Some(errors) = value.get("errors") {
1998                bail!(
1999                    "Query \"{}\" failed: {}",
2000                    truncate_query_output_serialize(&query),
2001                    errors
2002                );
2003            }
2004            return Ok(value);
2005        }
2006        unreachable!()
2007    }
2008
2009    pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
2010        let query = query.as_ref();
2011        self.run_graphql_query(&format!("query {{ {query} }}"))
2012            .await
2013    }
2014
2015    pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
2016        let query = query.as_ref().trim();
2017        let name = query
2018            .split_once(|ch: char| !ch.is_alphanumeric())
2019            .map_or(query, |(name, _)| name);
2020        let data = self.query(query).await?;
2021        serde_json::from_value(data[name].clone())
2022            .with_context(|| format!("{name} field missing in response"))
2023    }
2024
2025    pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
2026        let mutation = mutation.as_ref();
2027        self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
2028            .await
2029    }
2030
2031    pub async fn multiple_mutate(&self, mutations: &[String]) -> Result<Value> {
2032        let mut out = String::from("mutation {\n");
2033        for (index, mutation) in mutations.iter().enumerate() {
2034            out = format!("{}  u{}: {}\n", out, index, mutation);
2035        }
2036        out.push_str("}\n");
2037        self.run_graphql_query(&out).await
2038    }
2039}
2040
2041impl<A> From<String> for ApplicationWrapper<A> {
2042    fn from(uri: String) -> ApplicationWrapper<A> {
2043        ApplicationWrapper {
2044            uri,
2045            _phantom: PhantomData,
2046        }
2047    }
2048}
2049
2050/// Returns the timeout for tests that wait for notifications, either read from the env
2051/// variable `LINERA_TEST_NOTIFICATION_TIMEOUT_MS`, or the default value of 10 seconds.
2052#[cfg(with_testing)]
2053fn notification_timeout() -> Duration {
2054    const NOTIFICATION_TIMEOUT_MS_ENV: &str = "LINERA_TEST_NOTIFICATION_TIMEOUT_MS";
2055    const NOTIFICATION_TIMEOUT_MS_DEFAULT: u64 = 10_000;
2056
2057    match env::var(NOTIFICATION_TIMEOUT_MS_ENV) {
2058        Ok(var) => Duration::from_millis(var.parse().unwrap_or_else(|error| {
2059            panic!("{NOTIFICATION_TIMEOUT_MS_ENV} is not a valid number: {error}")
2060        })),
2061        Err(env::VarError::NotPresent) => Duration::from_millis(NOTIFICATION_TIMEOUT_MS_DEFAULT),
2062        Err(env::VarError::NotUnicode(_)) => {
2063            panic!("{NOTIFICATION_TIMEOUT_MS_ENV} must be valid Unicode")
2064        }
2065    }
2066}
2067
2068#[cfg(with_testing)]
2069pub trait NotificationsExt {
2070    /// Waits for a notification for which `f` returns `Some(t)`, and returns `t`.
2071    fn wait_for<T>(
2072        &mut self,
2073        f: impl FnMut(Notification) -> Option<T>,
2074    ) -> impl Future<Output = Result<T>>;
2075
2076    /// Waits for a `NewEvents` notification for the given block height. If no height is specified,
2077    /// any height is accepted.
2078    fn wait_for_events(
2079        &mut self,
2080        expected_height: impl Into<Option<BlockHeight>>,
2081    ) -> impl Future<Output = Result<BTreeSet<StreamId>>> {
2082        let expected_height = expected_height.into();
2083        self.wait_for(move |notification| {
2084            if let Reason::NewEvents {
2085                height,
2086                event_streams,
2087                ..
2088            } = notification.reason
2089            {
2090                if expected_height.is_none_or(|h| h == height) {
2091                    return Some(event_streams);
2092                }
2093            }
2094            None
2095        })
2096    }
2097
2098    /// Waits for a `NewBlock` notification for the given block height. If no height is specified,
2099    /// any height is accepted.
2100    fn wait_for_block(
2101        &mut self,
2102        expected_height: impl Into<Option<BlockHeight>>,
2103    ) -> impl Future<Output = Result<CryptoHash>> {
2104        let expected_height = expected_height.into();
2105        self.wait_for(move |notification| {
2106            if let Reason::NewBlock {
2107                height, block_hash, ..
2108            } = notification.reason
2109            {
2110                if expected_height.is_none_or(|h| h == height) {
2111                    return Some(block_hash);
2112                }
2113            }
2114            None
2115        })
2116    }
2117
2118    /// Waits for a `NewIncomingBundle` notification for the given sender chain and sender block
2119    /// height. If no height is specified, any height is accepted.
2120    fn wait_for_bundle(
2121        &mut self,
2122        expected_origin: ChainId,
2123        expected_height: impl Into<Option<BlockHeight>>,
2124    ) -> impl Future<Output = Result<()>> {
2125        let expected_height = expected_height.into();
2126        self.wait_for(move |notification| {
2127            if let Reason::NewIncomingBundle { height, origin } = notification.reason {
2128                if expected_height.is_none_or(|h| h == height) && origin == expected_origin {
2129                    return Some(());
2130                }
2131            }
2132            None
2133        })
2134    }
2135}
2136
2137#[cfg(with_testing)]
2138impl<S: Stream<Item = Result<Notification>>> NotificationsExt for Pin<Box<S>> {
2139    async fn wait_for<T>(&mut self, mut f: impl FnMut(Notification) -> Option<T>) -> Result<T> {
2140        let mut timeout = Box::pin(linera_base::time::timer::sleep(notification_timeout())).fuse();
2141        loop {
2142            let notification = futures::select! {
2143                () = timeout => bail!("Timeout waiting for notification"),
2144                notification = self.next().fuse() => notification.context("Stream closed")??,
2145            };
2146            if let Some(t) = f(notification) {
2147                return Ok(t);
2148            }
2149        }
2150    }
2151}