linera_service/cli_wrappers/
wallet.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    borrow::Cow,
6    collections::BTreeMap,
7    env,
8    marker::PhantomData,
9    mem,
10    path::{Path, PathBuf},
11    pin::Pin,
12    process::Stdio,
13    str::FromStr,
14    sync,
15    time::Duration,
16};
17
18use anyhow::{bail, ensure, Context, Result};
19use async_graphql::InputType;
20use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
21use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
22use heck::ToKebabCase;
23use linera_base::{
24    abi::ContractAbi,
25    command::{resolve_binary, CommandExt},
26    crypto::{CryptoHash, InMemorySigner},
27    data_types::{Amount, ApplicationPermissions, BlockHeight, Bytecode, Epoch},
28    identifiers::{
29        Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
30    },
31    vm::VmRuntime,
32};
33use linera_client::client_options::ResourceControlPolicyConfig;
34use linera_core::worker::Notification;
35use linera_faucet_client::Faucet;
36use serde::{de::DeserializeOwned, ser::Serialize};
37use serde_command_opts::to_args;
38use serde_json::{json, Value};
39use tempfile::TempDir;
40use tokio::{
41    io::{AsyncBufReadExt, BufReader},
42    process::{Child, Command},
43    sync::oneshot,
44    task::JoinHandle,
45};
46#[cfg(with_testing)]
47use {
48    futures::FutureExt as _,
49    linera_core::worker::Reason,
50    std::{collections::BTreeSet, future::Future},
51};
52
53use crate::{
54    cli::command::BenchmarkCommand,
55    cli_wrappers::{
56        local_net::{PathProvider, ProcessInbox},
57        Network,
58    },
59    util::{self, ChildExt},
60    Wallet,
61};
62
63/// The name of the environment variable that allows specifying additional arguments to be passed
64/// to the node-service command of the client.
65const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
66
67fn reqwest_client() -> reqwest::Client {
68    reqwest::ClientBuilder::new()
69        .timeout(Duration::from_secs(60))
70        .build()
71        .unwrap()
72}
73
74/// Wrapper to run a Linera client command.
75pub struct ClientWrapper {
76    binary_path: sync::Mutex<Option<PathBuf>>,
77    testing_prng_seed: Option<u64>,
78    storage: String,
79    wallet: String,
80    keystore: String,
81    max_pending_message_bundles: usize,
82    network: Network,
83    pub path_provider: PathProvider,
84    on_drop: OnClientDrop,
85    extra_args: Vec<String>,
86}
87
88/// Action to perform when the [`ClientWrapper`] is dropped.
89#[derive(Clone, Copy, Debug, Eq, PartialEq)]
90pub enum OnClientDrop {
91    /// Close all the chains on the wallet.
92    CloseChains,
93    /// Do not close any chains, leaving them active.
94    LeakChains,
95}
96
97impl ClientWrapper {
98    pub fn new(
99        path_provider: PathProvider,
100        network: Network,
101        testing_prng_seed: Option<u64>,
102        id: usize,
103        on_drop: OnClientDrop,
104    ) -> Self {
105        Self::new_with_extra_args(
106            path_provider,
107            network,
108            testing_prng_seed,
109            id,
110            on_drop,
111            vec!["--wait-for-outgoing-messages".to_string()],
112            None,
113        )
114    }
115
116    pub fn new_with_extra_args(
117        path_provider: PathProvider,
118        network: Network,
119        testing_prng_seed: Option<u64>,
120        id: usize,
121        on_drop: OnClientDrop,
122        extra_args: Vec<String>,
123        binary_dir: Option<PathBuf>,
124    ) -> Self {
125        let storage = format!(
126            "rocksdb:{}/client_{}.db",
127            path_provider.path().display(),
128            id
129        );
130        let wallet = format!("wallet_{}.json", id);
131        let keystore = format!("keystore_{}.json", id);
132        let binary_path = binary_dir.map(|dir| dir.join("linera"));
133        Self {
134            binary_path: sync::Mutex::new(binary_path),
135            testing_prng_seed,
136            storage,
137            wallet,
138            keystore,
139            max_pending_message_bundles: 10_000,
140            network,
141            path_provider,
142            on_drop,
143            extra_args,
144        }
145    }
146
147    /// Runs `linera project new`.
148    pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
149        let tmp = TempDir::new()?;
150        let mut command = self.command().await?;
151        command
152            .current_dir(tmp.path())
153            .arg("project")
154            .arg("new")
155            .arg(project_name)
156            .arg("--linera-root")
157            .arg(linera_root)
158            .spawn_and_wait_for_stdout()
159            .await?;
160        Ok(tmp)
161    }
162
163    /// Runs `linera project publish`.
164    pub async fn project_publish<T: Serialize>(
165        &self,
166        path: PathBuf,
167        required_application_ids: Vec<String>,
168        publisher: impl Into<Option<ChainId>>,
169        argument: &T,
170    ) -> Result<String> {
171        let json_parameters = serde_json::to_string(&())?;
172        let json_argument = serde_json::to_string(argument)?;
173        let mut command = self.command().await?;
174        command
175            .arg("project")
176            .arg("publish-and-create")
177            .arg(path)
178            .args(publisher.into().iter().map(ChainId::to_string))
179            .args(["--json-parameters", &json_parameters])
180            .args(["--json-argument", &json_argument]);
181        if !required_application_ids.is_empty() {
182            command.arg("--required-application-ids");
183            command.args(required_application_ids);
184        }
185        let stdout = command.spawn_and_wait_for_stdout().await?;
186        Ok(stdout.trim().to_string())
187    }
188
189    /// Runs `linera project test`.
190    pub async fn project_test(&self, path: &Path) -> Result<()> {
191        self.command()
192            .await
193            .context("failed to create project test command")?
194            .current_dir(path)
195            .arg("project")
196            .arg("test")
197            .spawn_and_wait_for_stdout()
198            .await?;
199        Ok(())
200    }
201
202    async fn command_with_envs_and_arguments(
203        &self,
204        envs: &[(&str, &str)],
205        arguments: impl IntoIterator<Item = Cow<'_, str>>,
206    ) -> Result<Command> {
207        let mut command = self.command_binary().await?;
208        command.current_dir(self.path_provider.path());
209        for (key, value) in envs {
210            command.env(key, value);
211        }
212        for argument in arguments {
213            command.arg(&*argument);
214        }
215        Ok(command)
216    }
217
218    async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
219        self.command_with_envs_and_arguments(envs, self.command_arguments())
220            .await
221    }
222
223    async fn command_with_arguments(
224        &self,
225        arguments: impl IntoIterator<Item = Cow<'_, str>>,
226    ) -> Result<Command> {
227        self.command_with_envs_and_arguments(
228            &[(
229                "RUST_LOG",
230                &std::env::var("RUST_LOG").unwrap_or_else(|_| String::from("linera=debug")),
231            )],
232            arguments,
233        )
234        .await
235    }
236
237    async fn command(&self) -> Result<Command> {
238        self.command_with_envs(&[(
239            "RUST_LOG",
240            &std::env::var("RUST_LOG").unwrap_or_else(|_| String::from("linera=debug")),
241        )])
242        .await
243    }
244
245    fn required_command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
246        [
247            "--wallet".into(),
248            self.wallet.as_str().into(),
249            "--keystore".into(),
250            self.keystore.as_str().into(),
251            "--storage".into(),
252            self.storage.as_str().into(),
253            "--send-timeout-ms".into(),
254            "500000".into(),
255            "--recv-timeout-ms".into(),
256            "500000".into(),
257        ]
258        .into_iter()
259        .chain(self.extra_args.iter().map(|s| s.as_str().into()))
260    }
261
262    /// Returns an iterator over the arguments that should be added to all command invocations.
263    fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
264        self.required_command_arguments().chain([
265            "--max-pending-message-bundles".into(),
266            self.max_pending_message_bundles.to_string().into(),
267        ])
268    }
269
270    /// Returns the [`Command`] instance configured to run the appropriate binary.
271    ///
272    /// The path is resolved once and cached inside `self` for subsequent usages.
273    async fn command_binary(&self) -> Result<Command> {
274        match self.command_with_cached_binary_path() {
275            Some(command) => Ok(command),
276            None => {
277                let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
278                let command = Command::new(&resolved_path);
279
280                self.set_cached_binary_path(resolved_path);
281
282                Ok(command)
283            }
284        }
285    }
286
287    /// Returns a [`Command`] instance configured with the cached `binary_path`, if available.
288    fn command_with_cached_binary_path(&self) -> Option<Command> {
289        let binary_path = self.binary_path.lock().unwrap();
290
291        binary_path.as_ref().map(Command::new)
292    }
293
294    /// Sets the cached `binary_path` with the `new_binary_path`.
295    ///
296    /// # Panics
297    ///
298    /// If the cache is already set to a different value. In theory the two threads calling
299    /// `command_binary` can race and resolve the binary path twice, but they should always be the
300    /// same path.
301    fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
302        let mut binary_path = self.binary_path.lock().unwrap();
303
304        if binary_path.is_none() {
305            *binary_path = Some(new_binary_path);
306        } else {
307            assert_eq!(*binary_path, Some(new_binary_path));
308        }
309    }
310
311    /// Runs `linera create-genesis-config`.
312    pub async fn create_genesis_config(
313        &self,
314        num_other_initial_chains: u32,
315        initial_funding: Amount,
316        policy_config: ResourceControlPolicyConfig,
317        http_allow_list: Option<Vec<String>>,
318    ) -> Result<()> {
319        let mut command = self.command().await?;
320        command
321            .args([
322                "create-genesis-config",
323                &num_other_initial_chains.to_string(),
324            ])
325            .args(["--initial-funding", &initial_funding.to_string()])
326            .args(["--committee", "committee.json"])
327            .args(["--genesis", "genesis.json"])
328            .args([
329                "--policy-config",
330                &policy_config.to_string().to_kebab_case(),
331            ]);
332        if let Some(allow_list) = http_allow_list {
333            command
334                .arg("--http-request-allow-list")
335                .arg(allow_list.join(","));
336        }
337        if let Some(seed) = self.testing_prng_seed {
338            command.arg("--testing-prng-seed").arg(seed.to_string());
339        }
340        command.spawn_and_wait_for_stdout().await?;
341        Ok(())
342    }
343
344    /// Runs `linera wallet init`. The genesis config is read from `genesis.json`, or from the
345    /// faucet if provided.
346    pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
347        let mut command = self.command().await?;
348        command.args(["wallet", "init"]);
349        match faucet {
350            None => command.args(["--genesis", "genesis.json"]),
351            Some(faucet) => command.args(["--faucet", faucet.url()]),
352        };
353        if let Some(seed) = self.testing_prng_seed {
354            command.arg("--testing-prng-seed").arg(seed.to_string());
355        }
356        command.spawn_and_wait_for_stdout().await?;
357        Ok(())
358    }
359
360    /// Runs `linera wallet request-chain`.
361    pub async fn request_chain(
362        &self,
363        faucet: &Faucet,
364        set_default: bool,
365    ) -> Result<(ChainId, AccountOwner)> {
366        let mut command = self.command().await?;
367        command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
368        if set_default {
369            command.arg("--set-default");
370        }
371        let stdout = command.spawn_and_wait_for_stdout().await?;
372        let mut lines = stdout.split_whitespace();
373        let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
374        let owner = lines.next().context("missing chain owner")?.parse()?;
375        Ok((chain_id, owner))
376    }
377
378    /// Runs `linera wallet publish-and-create`.
379    #[expect(clippy::too_many_arguments)]
380    pub async fn publish_and_create<
381        A: ContractAbi,
382        Parameters: Serialize,
383        InstantiationArgument: Serialize,
384    >(
385        &self,
386        contract: PathBuf,
387        service: PathBuf,
388        vm_runtime: VmRuntime,
389        parameters: &Parameters,
390        argument: &InstantiationArgument,
391        required_application_ids: &[ApplicationId],
392        publisher: impl Into<Option<ChainId>>,
393    ) -> Result<ApplicationId<A>> {
394        let json_parameters = serde_json::to_string(parameters)?;
395        let json_argument = serde_json::to_string(argument)?;
396        let mut command = self.command().await?;
397        let vm_runtime = format!("{}", vm_runtime);
398        command
399            .arg("publish-and-create")
400            .args([contract, service])
401            .args(["--vm-runtime", &vm_runtime.to_lowercase()])
402            .args(publisher.into().iter().map(ChainId::to_string))
403            .args(["--json-parameters", &json_parameters])
404            .args(["--json-argument", &json_argument]);
405        if !required_application_ids.is_empty() {
406            command.arg("--required-application-ids");
407            command.args(
408                required_application_ids
409                    .iter()
410                    .map(ApplicationId::to_string),
411            );
412        }
413        let stdout = command.spawn_and_wait_for_stdout().await?;
414        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
415    }
416
417    /// Runs `linera publish-module`.
418    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
419        &self,
420        contract: PathBuf,
421        service: PathBuf,
422        vm_runtime: VmRuntime,
423        publisher: impl Into<Option<ChainId>>,
424    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
425        let stdout = self
426            .command()
427            .await?
428            .arg("publish-module")
429            .args([contract, service])
430            .args(["--vm-runtime", &format!("{}", vm_runtime).to_lowercase()])
431            .args(publisher.into().iter().map(ChainId::to_string))
432            .spawn_and_wait_for_stdout()
433            .await?;
434        let module_id: ModuleId = stdout.trim().parse()?;
435        Ok(module_id.with_abi())
436    }
437
438    /// Runs `linera create-application`.
439    pub async fn create_application<
440        Abi: ContractAbi,
441        Parameters: Serialize,
442        InstantiationArgument: Serialize,
443    >(
444        &self,
445        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
446        parameters: &Parameters,
447        argument: &InstantiationArgument,
448        required_application_ids: &[ApplicationId],
449        creator: impl Into<Option<ChainId>>,
450    ) -> Result<ApplicationId<Abi>> {
451        let json_parameters = serde_json::to_string(parameters)?;
452        let json_argument = serde_json::to_string(argument)?;
453        let mut command = self.command().await?;
454        command
455            .arg("create-application")
456            .arg(module_id.forget_abi().to_string())
457            .args(["--json-parameters", &json_parameters])
458            .args(["--json-argument", &json_argument])
459            .args(creator.into().iter().map(ChainId::to_string));
460        if !required_application_ids.is_empty() {
461            command.arg("--required-application-ids");
462            command.args(
463                required_application_ids
464                    .iter()
465                    .map(ApplicationId::to_string),
466            );
467        }
468        let stdout = command.spawn_and_wait_for_stdout().await?;
469        Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
470    }
471
472    /// Runs `linera service`.
473    pub async fn run_node_service(
474        &self,
475        port: impl Into<Option<u16>>,
476        process_inbox: ProcessInbox,
477    ) -> Result<NodeService> {
478        self.run_node_service_with_options(port, process_inbox, &[], &[], false)
479            .await
480    }
481
482    /// Runs `linera service` with optional task processor configuration.
483    pub async fn run_node_service_with_options(
484        &self,
485        port: impl Into<Option<u16>>,
486        process_inbox: ProcessInbox,
487        operator_application_ids: &[ApplicationId],
488        operators: &[(String, PathBuf)],
489        read_only: bool,
490    ) -> Result<NodeService> {
491        self.run_node_service_with_all_options(
492            port,
493            process_inbox,
494            operator_application_ids,
495            operators,
496            read_only,
497            &[],
498            &[],
499        )
500        .await
501    }
502
503    /// Runs `linera service` with all available options.
504    #[allow(clippy::too_many_arguments)]
505    pub async fn run_node_service_with_all_options(
506        &self,
507        port: impl Into<Option<u16>>,
508        process_inbox: ProcessInbox,
509        operator_application_ids: &[ApplicationId],
510        operators: &[(String, PathBuf)],
511        read_only: bool,
512        allowed_subscriptions: &[String],
513        subscription_ttls: &[(String, u64)],
514    ) -> Result<NodeService> {
515        let port = port.into().unwrap_or(8080);
516        let mut command = self.command().await?;
517        command.arg("service");
518        if let ProcessInbox::Skip = process_inbox {
519            command.arg("--listener-skip-process-inbox");
520        }
521        if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
522            command.args(var.split_whitespace());
523        }
524        for app_id in operator_application_ids {
525            command.args(["--operator-application-ids", &app_id.to_string()]);
526        }
527        for (name, path) in operators {
528            command.args(["--operators", &format!("{}={}", name, path.display())]);
529        }
530        if read_only {
531            command.arg("--read-only");
532        }
533        for query in allowed_subscriptions {
534            command.args(["--allow-subscription", query]);
535        }
536        for (name, secs) in subscription_ttls {
537            command.args(["--subscription-ttl-secs", &format!("{name}={secs}")]);
538        }
539        let child = command
540            .args(["--port".to_string(), port.to_string()])
541            .spawn_into()?;
542        let client = reqwest_client();
543        for i in 0..10 {
544            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
545            let request = client
546                .get(format!("http://localhost:{}/", port))
547                .send()
548                .await;
549            if request.is_ok() {
550                tracing::info!("Node service has started");
551                return Ok(NodeService::new(port, child));
552            } else {
553                tracing::warn!("Waiting for node service to start");
554            }
555        }
556        bail!("Failed to start node service");
557    }
558
559    /// Runs `linera service` with a controller application.
560    pub async fn run_node_service_with_controller(
561        &self,
562        port: impl Into<Option<u16>>,
563        process_inbox: ProcessInbox,
564        controller_id: &ApplicationId,
565        operators: &[(String, PathBuf)],
566    ) -> Result<NodeService> {
567        let port = port.into().unwrap_or(8080);
568        let mut command = self.command().await?;
569        command.arg("service");
570        if let ProcessInbox::Skip = process_inbox {
571            command.arg("--listener-skip-process-inbox");
572        }
573        if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
574            command.args(var.split_whitespace());
575        }
576        command.args(["--controller-id", &controller_id.to_string()]);
577        for (name, path) in operators {
578            command.args(["--operators", &format!("{}={}", name, path.display())]);
579        }
580        let child = command
581            .args(["--port".to_string(), port.to_string()])
582            .spawn_into()?;
583        let client = reqwest_client();
584        for i in 0..10 {
585            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
586            let request = client
587                .get(format!("http://localhost:{}/", port))
588                .send()
589                .await;
590            if request.is_ok() {
591                tracing::info!("Node service has started");
592                return Ok(NodeService::new(port, child));
593            } else {
594                tracing::warn!("Waiting for node service to start");
595            }
596        }
597        bail!("Failed to start node service");
598    }
599
600    /// Runs `linera validator query`
601    pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
602        let mut command = self.command().await?;
603        command.arg("validator").arg("query").arg(address);
604        let stdout = command.spawn_and_wait_for_stdout().await?;
605
606        // Parse the genesis config hash from the output.
607        // It's on a line like "Genesis config hash: <hash>"
608        let hash = stdout
609            .lines()
610            .find_map(|line| {
611                line.strip_prefix("Genesis config hash: ")
612                    .and_then(|hash_str| hash_str.trim().parse().ok())
613            })
614            .context("error while parsing the result of `linera validator query`")?;
615        Ok(hash)
616    }
617
618    /// Runs `linera validator list`.
619    pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
620        let mut command = self.command().await?;
621        command.arg("validator").arg("list");
622        if let Some(chain_id) = chain_id {
623            command.args(["--chain-id", &chain_id.to_string()]);
624        }
625        command.spawn_and_wait_for_stdout().await?;
626        Ok(())
627    }
628
629    /// Runs `linera sync-validator`.
630    pub async fn sync_validator(
631        &self,
632        chain_ids: impl IntoIterator<Item = &ChainId>,
633        validator_address: impl Into<String>,
634    ) -> Result<()> {
635        let mut command = self.command().await?;
636        command
637            .arg("validator")
638            .arg("sync")
639            .arg(validator_address.into());
640        let mut chain_ids = chain_ids.into_iter().peekable();
641        if chain_ids.peek().is_some() {
642            command
643                .arg("--chains")
644                .args(chain_ids.map(ChainId::to_string));
645        }
646        command.spawn_and_wait_for_stdout().await?;
647        Ok(())
648    }
649
650    /// Runs `linera faucet`.
651    pub async fn run_faucet(
652        &self,
653        port: impl Into<Option<u16>>,
654        chain_id: Option<ChainId>,
655        amount: Amount,
656    ) -> Result<FaucetService> {
657        let port = port.into().unwrap_or(8080);
658        let temp_dir = tempfile::tempdir()
659            .context("Failed to create temporary directory for faucet storage")?;
660        let storage_path = temp_dir.path().join("faucet_storage.sqlite");
661        let mut command = self.command().await?;
662        let command = command
663            .arg("faucet")
664            .args(["--port".to_string(), port.to_string()])
665            .args(["--amount".to_string(), amount.to_string()])
666            .args([
667                "--storage-path".to_string(),
668                storage_path.to_string_lossy().to_string(),
669            ]);
670        if let Some(chain_id) = chain_id {
671            command.arg(chain_id.to_string());
672        }
673        let child = command.spawn_into()?;
674        let client = reqwest_client();
675        for i in 0..10 {
676            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
677            let request = client
678                .get(format!("http://localhost:{}/", port))
679                .send()
680                .await;
681            if request.is_ok() {
682                tracing::info!("Faucet has started");
683                return Ok(FaucetService::new(port, child, temp_dir));
684            } else {
685                tracing::debug!("Waiting for faucet to start");
686            }
687        }
688        bail!("Failed to start faucet");
689    }
690
691    /// Runs `linera local-balance`.
692    pub async fn local_balance(&self, account: Account) -> Result<Amount> {
693        let stdout = self
694            .command()
695            .await?
696            .arg("local-balance")
697            .arg(account.to_string())
698            .spawn_and_wait_for_stdout()
699            .await?;
700        let amount = stdout
701            .trim()
702            .parse()
703            .context("error while parsing the result of `linera local-balance`")?;
704        Ok(amount)
705    }
706
707    /// Runs `linera query-balance`.
708    pub async fn query_balance(&self, account: Account) -> Result<Amount> {
709        let stdout = self
710            .command()
711            .await?
712            .arg("query-balance")
713            .arg(account.to_string())
714            .spawn_and_wait_for_stdout()
715            .await?;
716        let amount = stdout
717            .trim()
718            .parse()
719            .context("error while parsing the result of `linera query-balance`")?;
720        Ok(amount)
721    }
722
723    /// Runs `linera query-application` and parses the JSON result.
724    pub async fn query_application_json<T: DeserializeOwned>(
725        &self,
726        chain_id: ChainId,
727        application_id: ApplicationId,
728        query: impl AsRef<str>,
729    ) -> Result<T> {
730        let query = query.as_ref().trim();
731        let name = query
732            .split_once(|ch: char| !ch.is_alphanumeric())
733            .map_or(query, |(name, _)| name);
734        let stdout = self
735            .command()
736            .await?
737            .arg("query-application")
738            .arg("--chain-id")
739            .arg(chain_id.to_string())
740            .arg("--application-id")
741            .arg(application_id.to_string())
742            .arg(query)
743            .spawn_and_wait_for_stdout()
744            .await?;
745        let data: serde_json::Value =
746            serde_json::from_str(stdout.trim()).context("invalid JSON from query-application")?;
747        serde_json::from_value(data[name].clone())
748            .with_context(|| format!("{name} field missing in query-application response"))
749    }
750
751    /// Runs `linera sync`.
752    pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
753        self.command()
754            .await?
755            .arg("sync")
756            .arg(chain_id.to_string())
757            .spawn_and_wait_for_stdout()
758            .await?;
759        Ok(())
760    }
761
762    /// Runs `linera process-inbox`.
763    pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
764        self.command()
765            .await?
766            .arg("process-inbox")
767            .arg(chain_id.to_string())
768            .spawn_and_wait_for_stdout()
769            .await?;
770        Ok(())
771    }
772
773    /// Runs `linera transfer`.
774    pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
775        self.command()
776            .await?
777            .arg("transfer")
778            .arg(amount.to_string())
779            .args(["--from", &from.to_string()])
780            .args(["--to", &to.to_string()])
781            .spawn_and_wait_for_stdout()
782            .await?;
783        Ok(())
784    }
785
786    /// Runs `linera transfer` with no logging.
787    pub async fn transfer_with_silent_logs(
788        &self,
789        amount: Amount,
790        from: ChainId,
791        to: ChainId,
792    ) -> Result<()> {
793        self.command()
794            .await?
795            .env("RUST_LOG", "off")
796            .arg("transfer")
797            .arg(amount.to_string())
798            .args(["--from", &from.to_string()])
799            .args(["--to", &to.to_string()])
800            .spawn_and_wait_for_stdout()
801            .await?;
802        Ok(())
803    }
804
805    /// Runs `linera transfer` with owner accounts.
806    pub async fn transfer_with_accounts(
807        &self,
808        amount: Amount,
809        from: Account,
810        to: Account,
811    ) -> Result<()> {
812        self.command()
813            .await?
814            .arg("transfer")
815            .arg(amount.to_string())
816            .args(["--from", &from.to_string()])
817            .args(["--to", &to.to_string()])
818            .spawn_and_wait_for_stdout()
819            .await?;
820        Ok(())
821    }
822
823    fn benchmark_command_internal(command: &mut Command, args: &BenchmarkCommand) -> Result<()> {
824        let mut formatted_args = to_args(&args)?;
825        let subcommand = formatted_args.remove(0);
826        // The subcommand is followed by the flattened options, which are preceded by "options".
827        // So remove that as well.
828        formatted_args.remove(0);
829        let options = formatted_args
830            .chunks_exact(2)
831            .flat_map(|pair| {
832                let option = format!("--{}", pair[0]);
833                match pair[1].as_str() {
834                    "true" => vec![option],
835                    "false" => vec![],
836                    _ => vec![option, pair[1].clone()],
837                }
838            })
839            .collect::<Vec<_>>();
840        command
841            .args([
842                "--max-pending-message-bundles",
843                &args.transactions_per_block().to_string(),
844            ])
845            .arg("benchmark")
846            .arg(subcommand)
847            .args(options);
848        Ok(())
849    }
850
851    async fn benchmark_command_with_envs(
852        &self,
853        args: BenchmarkCommand,
854        envs: &[(&str, &str)],
855    ) -> Result<Command> {
856        let mut command = self
857            .command_with_envs_and_arguments(envs, self.required_command_arguments())
858            .await?;
859        Self::benchmark_command_internal(&mut command, &args)?;
860        Ok(command)
861    }
862
863    async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
864        let mut command = self
865            .command_with_arguments(self.required_command_arguments())
866            .await?;
867        Self::benchmark_command_internal(&mut command, &args)?;
868        Ok(command)
869    }
870
871    /// Runs `linera benchmark`.
872    pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
873        let mut command = self.benchmark_command(args).await?;
874        command.spawn_and_wait_for_stdout().await?;
875        Ok(())
876    }
877
878    /// Runs `linera benchmark`, but detached: don't wait for the command to finish, just spawn it
879    /// and return the child process, and the handles to the stdout and stderr.
880    pub async fn benchmark_detached(
881        &self,
882        args: BenchmarkCommand,
883        tx: oneshot::Sender<()>,
884    ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
885        let mut child = self
886            .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
887            .await?
888            .kill_on_drop(true)
889            .stdin(Stdio::piped())
890            .stdout(Stdio::piped())
891            .stderr(Stdio::piped())
892            .spawn()?;
893
894        let pid = child.id().expect("failed to get pid");
895        let stdout = child.stdout.take().expect("stdout not open");
896        let stdout_handle = tokio::spawn(async move {
897            let mut lines = BufReader::new(stdout).lines();
898            while let Ok(Some(line)) = lines.next_line().await {
899                println!("benchmark{{pid={pid}}} {line}");
900            }
901        });
902
903        let stderr = child.stderr.take().expect("stderr not open");
904        let stderr_handle = tokio::spawn(async move {
905            let mut lines = BufReader::new(stderr).lines();
906            let mut tx = Some(tx);
907            while let Ok(Some(line)) = lines.next_line().await {
908                if line.contains("Ready to start benchmark") {
909                    tx.take()
910                        .expect("Should only send signal once")
911                        .send(())
912                        .expect("failed to send ready signal to main thread");
913                } else {
914                    println!("benchmark{{pid={pid}}} {line}");
915                }
916            }
917        });
918        Ok((child, stdout_handle, stderr_handle))
919    }
920
921    async fn open_chain_internal(
922        &self,
923        from: ChainId,
924        owner: Option<AccountOwner>,
925        initial_balance: Amount,
926        super_owner: bool,
927    ) -> Result<(ChainId, AccountOwner)> {
928        let mut command = self.command().await?;
929        command
930            .arg("open-chain")
931            .args(["--from", &from.to_string()])
932            .args(["--initial-balance", &initial_balance.to_string()]);
933
934        if let Some(owner) = owner {
935            command.args(["--owner", &owner.to_string()]);
936        }
937
938        if super_owner {
939            command.arg("--super-owner");
940        }
941
942        let stdout = command.spawn_and_wait_for_stdout().await?;
943        let mut split = stdout.split('\n');
944        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
945        let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
946        if let Some(owner) = owner {
947            assert_eq!(owner, new_owner);
948        }
949        Ok((chain_id, new_owner))
950    }
951
952    /// Runs `linera open-chain --super-owner`.
953    pub async fn open_chain_super_owner(
954        &self,
955        from: ChainId,
956        owner: Option<AccountOwner>,
957        initial_balance: Amount,
958    ) -> Result<(ChainId, AccountOwner)> {
959        self.open_chain_internal(from, owner, initial_balance, true)
960            .await
961    }
962
963    /// Runs `linera open-chain`.
964    pub async fn open_chain(
965        &self,
966        from: ChainId,
967        owner: Option<AccountOwner>,
968        initial_balance: Amount,
969    ) -> Result<(ChainId, AccountOwner)> {
970        self.open_chain_internal(from, owner, initial_balance, false)
971            .await
972    }
973
974    /// Runs `linera open-chain` then `linera assign`.
975    pub async fn open_and_assign(
976        &self,
977        client: &ClientWrapper,
978        initial_balance: Amount,
979    ) -> Result<ChainId> {
980        let our_chain = self
981            .load_wallet()?
982            .default_chain()
983            .context("no default chain found")?;
984        let owner = client.keygen().await?;
985        let (new_chain, _) = self
986            .open_chain(our_chain, Some(owner), initial_balance)
987            .await?;
988        client.assign(owner, new_chain).await?;
989        Ok(new_chain)
990    }
991
992    pub async fn open_multi_owner_chain(
993        &self,
994        from: ChainId,
995        owners: BTreeMap<AccountOwner, u64>,
996        multi_leader_rounds: u32,
997        balance: Amount,
998        base_timeout_ms: u64,
999    ) -> Result<ChainId> {
1000        let mut command = self.command().await?;
1001        command
1002            .arg("open-multi-owner-chain")
1003            .args(["--from", &from.to_string()])
1004            .arg("--owners")
1005            .arg(serde_json::to_string(&owners)?)
1006            .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
1007        command
1008            .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
1009            .args(["--initial-balance", &balance.to_string()]);
1010
1011        let stdout = command.spawn_and_wait_for_stdout().await?;
1012        let mut split = stdout.split('\n');
1013        let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
1014
1015        Ok(chain_id)
1016    }
1017
1018    pub async fn change_ownership(
1019        &self,
1020        chain_id: ChainId,
1021        super_owners: Vec<AccountOwner>,
1022        owners: Vec<AccountOwner>,
1023    ) -> Result<()> {
1024        let mut command = self.command().await?;
1025        command
1026            .arg("change-ownership")
1027            .args(["--chain-id", &chain_id.to_string()]);
1028        command
1029            .arg("--super-owners")
1030            .arg(serde_json::to_string(&super_owners)?);
1031        command.arg("--owners").arg(serde_json::to_string(
1032            &owners
1033                .into_iter()
1034                .zip(std::iter::repeat(100u64))
1035                .collect::<BTreeMap<_, _>>(),
1036        )?);
1037        command.spawn_and_wait_for_stdout().await?;
1038        Ok(())
1039    }
1040
1041    pub async fn change_application_permissions(
1042        &self,
1043        chain_id: ChainId,
1044        application_permissions: ApplicationPermissions,
1045    ) -> Result<()> {
1046        let mut command = self.command().await?;
1047        command
1048            .arg("change-application-permissions")
1049            .args(["--chain-id", &chain_id.to_string()]);
1050        command.arg("--manage-chain").arg(serde_json::to_string(
1051            &application_permissions.manage_chain,
1052        )?);
1053        // TODO: add other fields
1054        command.spawn_and_wait_for_stdout().await?;
1055        Ok(())
1056    }
1057
1058    /// Runs `linera wallet follow-chain CHAIN_ID`.
1059    pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
1060        let mut command = self.command().await?;
1061        command
1062            .args(["wallet", "follow-chain"])
1063            .arg(chain_id.to_string());
1064        if sync {
1065            command.arg("--sync");
1066        }
1067        command.spawn_and_wait_for_stdout().await?;
1068        Ok(())
1069    }
1070
1071    /// Runs `linera wallet forget-chain CHAIN_ID`.
1072    pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
1073        let mut command = self.command().await?;
1074        command
1075            .args(["wallet", "forget-chain"])
1076            .arg(chain_id.to_string());
1077        command.spawn_and_wait_for_stdout().await?;
1078        Ok(())
1079    }
1080
1081    /// Runs `linera wallet set-default CHAIN_ID`.
1082    pub async fn set_default_chain(&self, chain_id: ChainId) -> Result<()> {
1083        let mut command = self.command().await?;
1084        command
1085            .args(["wallet", "set-default"])
1086            .arg(chain_id.to_string());
1087        command.spawn_and_wait_for_stdout().await?;
1088        Ok(())
1089    }
1090
1091    pub async fn retry_pending_block(
1092        &self,
1093        chain_id: Option<ChainId>,
1094    ) -> Result<Option<CryptoHash>> {
1095        let mut command = self.command().await?;
1096        command.arg("retry-pending-block");
1097        if let Some(chain_id) = chain_id {
1098            command.arg(chain_id.to_string());
1099        }
1100        let stdout = command.spawn_and_wait_for_stdout().await?;
1101        let stdout = stdout.trim();
1102        if stdout.is_empty() {
1103            Ok(None)
1104        } else {
1105            Ok(Some(CryptoHash::from_str(stdout)?))
1106        }
1107    }
1108
1109    /// Runs `linera publish-data-blob`.
1110    pub async fn publish_data_blob(
1111        &self,
1112        path: &Path,
1113        chain_id: Option<ChainId>,
1114    ) -> Result<CryptoHash> {
1115        let mut command = self.command().await?;
1116        command.arg("publish-data-blob").arg(path);
1117        if let Some(chain_id) = chain_id {
1118            command.arg(chain_id.to_string());
1119        }
1120        let stdout = command.spawn_and_wait_for_stdout().await?;
1121        let stdout = stdout.trim();
1122        Ok(CryptoHash::from_str(stdout)?)
1123    }
1124
1125    /// Runs `linera read-data-blob`.
1126    pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
1127        let mut command = self.command().await?;
1128        command.arg("read-data-blob").arg(hash.to_string());
1129        if let Some(chain_id) = chain_id {
1130            command.arg(chain_id.to_string());
1131        }
1132        command.spawn_and_wait_for_stdout().await?;
1133        Ok(())
1134    }
1135
1136    pub fn load_wallet(&self) -> Result<Wallet> {
1137        Ok(Wallet::read(&self.wallet_path())?)
1138    }
1139
1140    pub fn load_keystore(&self) -> Result<InMemorySigner> {
1141        util::read_json(self.keystore_path())
1142    }
1143
1144    pub fn wallet_path(&self) -> PathBuf {
1145        self.path_provider.path().join(&self.wallet)
1146    }
1147
1148    pub fn keystore_path(&self) -> PathBuf {
1149        self.path_provider.path().join(&self.keystore)
1150    }
1151
1152    pub fn storage_path(&self) -> &str {
1153        &self.storage
1154    }
1155
1156    pub fn get_owner(&self) -> Option<AccountOwner> {
1157        let wallet = self.load_wallet().ok()?;
1158        wallet
1159            .get(wallet.default_chain()?)
1160            .expect("default chain must be in wallet")
1161            .owner
1162    }
1163
1164    pub fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
1165        self.load_wallet()
1166            .ok()
1167            .is_some_and(|wallet| wallet.get(chain).is_some())
1168    }
1169
1170    pub async fn set_validator(
1171        &self,
1172        validator_key: &(String, String),
1173        port: usize,
1174        votes: usize,
1175    ) -> Result<()> {
1176        let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1177        self.command()
1178            .await?
1179            .arg("validator")
1180            .arg("add")
1181            .args(["--public-key", &validator_key.0])
1182            .args(["--account-key", &validator_key.1])
1183            .args(["--address", &address])
1184            .args(["--votes", &votes.to_string()])
1185            .spawn_and_wait_for_stdout()
1186            .await?;
1187        Ok(())
1188    }
1189
1190    pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
1191        self.command()
1192            .await?
1193            .arg("validator")
1194            .arg("remove")
1195            .args(["--public-key", validator_key])
1196            .spawn_and_wait_for_stdout()
1197            .await?;
1198        Ok(())
1199    }
1200
1201    pub async fn change_validators(
1202        &self,
1203        add_validators: &[(String, String, usize, usize)], // (public_key, account_key, port, votes)
1204        modify_validators: &[(String, String, usize, usize)], // (public_key, account_key, port, votes)
1205        remove_validators: &[String],
1206    ) -> Result<()> {
1207        use std::str::FromStr;
1208
1209        use linera_base::crypto::{AccountPublicKey, ValidatorPublicKey};
1210
1211        // Build a map that will be serialized to JSON
1212        // Use the exact types that deserialization expects
1213        let mut changes = std::collections::HashMap::new();
1214
1215        // Add/modify validators
1216        for (public_key_str, account_key_str, port, votes) in
1217            add_validators.iter().chain(modify_validators.iter())
1218        {
1219            let public_key = ValidatorPublicKey::from_str(public_key_str)
1220                .with_context(|| format!("Invalid validator public key: {}", public_key_str))?;
1221
1222            let account_key = AccountPublicKey::from_str(account_key_str)
1223                .with_context(|| format!("Invalid account public key: {}", account_key_str))?;
1224
1225            let address = format!("{}:127.0.0.1:{}", self.network.short(), port)
1226                .parse()
1227                .unwrap();
1228
1229            // Create ValidatorChange struct
1230            let change = crate::cli::validator::Change {
1231                account_key,
1232                address,
1233                votes: crate::cli::validator::Votes(
1234                    std::num::NonZero::new(*votes as u64).context("Votes must be non-zero")?,
1235                ),
1236            };
1237
1238            changes.insert(public_key, Some(change));
1239        }
1240
1241        // Remove validators (set to None)
1242        for validator_key_str in remove_validators {
1243            let public_key = ValidatorPublicKey::from_str(validator_key_str)
1244                .with_context(|| format!("Invalid validator public key: {}", validator_key_str))?;
1245            changes.insert(public_key, None);
1246        }
1247
1248        // Create temporary file with JSON
1249        let temp_file = tempfile::NamedTempFile::new()
1250            .context("Failed to create temporary file for validator changes")?;
1251        serde_json::to_writer(&temp_file, &changes)
1252            .context("Failed to write validator changes to file")?;
1253        let temp_path = temp_file.path();
1254
1255        self.command()
1256            .await?
1257            .arg("validator")
1258            .arg("update")
1259            .arg(temp_path)
1260            .arg("--yes") // Skip confirmation prompt
1261            .spawn_and_wait_for_stdout()
1262            .await?;
1263
1264        Ok(())
1265    }
1266
1267    pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
1268        self.command()
1269            .await?
1270            .arg("revoke-epochs")
1271            .arg(epoch.to_string())
1272            .spawn_and_wait_for_stdout()
1273            .await?;
1274        Ok(())
1275    }
1276
1277    /// Runs `linera keygen`.
1278    pub async fn keygen(&self) -> Result<AccountOwner> {
1279        let stdout = self
1280            .command()
1281            .await?
1282            .arg("keygen")
1283            .spawn_and_wait_for_stdout()
1284            .await?;
1285        AccountOwner::from_str(stdout.as_str().trim())
1286    }
1287
1288    /// Returns the default chain.
1289    pub fn default_chain(&self) -> Option<ChainId> {
1290        self.load_wallet().ok()?.default_chain()
1291    }
1292
1293    /// Runs `linera assign`.
1294    pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
1295        let _stdout = self
1296            .command()
1297            .await?
1298            .arg("assign")
1299            .args(["--owner", &owner.to_string()])
1300            .args(["--chain-id", &chain_id.to_string()])
1301            .spawn_and_wait_for_stdout()
1302            .await?;
1303        Ok(())
1304    }
1305
1306    /// Runs `linera set-preferred-owner` for `chain_id`.
1307    pub async fn set_preferred_owner(
1308        &self,
1309        chain_id: ChainId,
1310        owner: Option<AccountOwner>,
1311    ) -> Result<()> {
1312        let mut owner_arg = vec!["--owner".to_string()];
1313        if let Some(owner) = owner {
1314            owner_arg.push(owner.to_string());
1315        };
1316        self.command()
1317            .await?
1318            .arg("set-preferred-owner")
1319            .args(["--chain-id", &chain_id.to_string()])
1320            .args(owner_arg)
1321            .spawn_and_wait_for_stdout()
1322            .await?;
1323        Ok(())
1324    }
1325
1326    pub async fn build_application(
1327        &self,
1328        path: &Path,
1329        name: &str,
1330        is_workspace: bool,
1331    ) -> Result<(PathBuf, PathBuf)> {
1332        Command::new("cargo")
1333            .current_dir(self.path_provider.path())
1334            .arg("build")
1335            .arg("--release")
1336            .args(["--target", "wasm32-unknown-unknown"])
1337            .arg("--manifest-path")
1338            .arg(path.join("Cargo.toml"))
1339            .spawn_and_wait_for_stdout()
1340            .await?;
1341
1342        let release_dir = match is_workspace {
1343            true => path.join("../target/wasm32-unknown-unknown/release"),
1344            false => path.join("target/wasm32-unknown-unknown/release"),
1345        };
1346
1347        let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1348        let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1349
1350        let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1351        let service_size = fs_err::tokio::metadata(&service).await?.len();
1352        tracing::info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1353
1354        Ok((contract, service))
1355    }
1356}
1357
1358impl Drop for ClientWrapper {
1359    fn drop(&mut self) {
1360        use std::process::Command as SyncCommand;
1361
1362        if self.on_drop != OnClientDrop::CloseChains {
1363            return;
1364        }
1365
1366        let Ok(binary_path) = self.binary_path.lock() else {
1367            tracing::error!(
1368                "Failed to close chains because a thread panicked with a lock to `binary_path`"
1369            );
1370            return;
1371        };
1372
1373        let Some(binary_path) = binary_path.as_ref() else {
1374            tracing::warn!(
1375                "Assuming no chains need to be closed, because the command binary was never \
1376                resolved and therefore presumably never called"
1377            );
1378            return;
1379        };
1380
1381        let working_directory = self.path_provider.path();
1382        let mut wallet_show_command = SyncCommand::new(binary_path);
1383
1384        for argument in self.command_arguments() {
1385            wallet_show_command.arg(&*argument);
1386        }
1387
1388        let Ok(wallet_show_output) = wallet_show_command
1389            .current_dir(working_directory)
1390            .args(["wallet", "show", "--short", "--owned"])
1391            .output()
1392        else {
1393            tracing::warn!("Failed to execute `wallet show --short` to list chains to close");
1394            return;
1395        };
1396
1397        if !wallet_show_output.status.success() {
1398            tracing::warn!("Failed to list chains in the wallet to close them");
1399            return;
1400        }
1401
1402        let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1403            tracing::warn!(
1404                "Failed to close chains because `linera wallet show --short` \
1405                returned a non-UTF-8 output"
1406            );
1407            return;
1408        };
1409
1410        let chain_ids = chain_list_string
1411            .split('\n')
1412            .map(|line| line.trim())
1413            .filter(|line| !line.is_empty());
1414
1415        for chain_id in chain_ids {
1416            let mut close_chain_command = SyncCommand::new(binary_path);
1417
1418            for argument in self.command_arguments() {
1419                close_chain_command.arg(&*argument);
1420            }
1421
1422            close_chain_command.current_dir(working_directory);
1423
1424            match close_chain_command.args(["close-chain", chain_id]).status() {
1425                Ok(status) if status.success() => (),
1426                Ok(failure) => tracing::warn!("Failed to close chain {chain_id}: {failure}"),
1427                Err(error) => tracing::warn!("Failed to close chain {chain_id}: {error}"),
1428            }
1429        }
1430    }
1431}
1432
1433#[cfg(with_testing)]
1434impl ClientWrapper {
1435    pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1436        self.build_application(Self::example_path(name)?.as_path(), name, true)
1437            .await
1438    }
1439
1440    pub fn example_path(name: &str) -> Result<PathBuf> {
1441        Ok(env::current_dir()?.join("../examples/").join(name))
1442    }
1443}
1444
1445fn truncate_query_output(input: &str) -> String {
1446    let max_len = 1000;
1447    if input.len() < max_len {
1448        input.to_string()
1449    } else {
1450        format!("{} ...", input.get(..max_len).unwrap())
1451    }
1452}
1453
1454fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1455    let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1456    let max_len = 200;
1457    if query.len() < max_len {
1458        query
1459    } else {
1460        format!("{} ...", query.get(..max_len).unwrap())
1461    }
1462}
1463
1464/// A running node service.
1465/// Logs a warning if a child process has already exited when its wrapper is dropped.
1466/// On Unix, includes the signal number if the process was killed (e.g. signal 9 = OOM).
1467fn log_unexpected_exit(child: &mut Child, service_kind: &str, port: u16) {
1468    match child.try_wait() {
1469        Ok(Some(status)) => {
1470            #[cfg(unix)]
1471            {
1472                use std::os::unix::process::ExitStatusExt as _;
1473                if let Some(signal) = status.signal() {
1474                    tracing::error!(
1475                        port,
1476                        signal,
1477                        "The {service_kind} service was killed by signal {signal}",
1478                    );
1479                    return;
1480                }
1481            }
1482            if !status.success() {
1483                tracing::error!(
1484                    port,
1485                    %status,
1486                    "The {service_kind} service exited unexpectedly with {status}",
1487                );
1488            }
1489        }
1490        Ok(None) => {} // Still running — normal case when terminate() was called.
1491        Err(error) => {
1492            tracing::warn!(
1493                port,
1494                %error,
1495                "Failed to check {service_kind} service status",
1496            );
1497        }
1498    }
1499}
1500
1501pub struct NodeService {
1502    port: u16,
1503    child: Child,
1504    terminated: bool,
1505}
1506
1507impl Drop for NodeService {
1508    fn drop(&mut self) {
1509        if !self.terminated {
1510            log_unexpected_exit(&mut self.child, "node", self.port);
1511        }
1512    }
1513}
1514
1515impl NodeService {
1516    fn new(port: u16, child: Child) -> Self {
1517        Self {
1518            port,
1519            child,
1520            terminated: false,
1521        }
1522    }
1523
1524    pub async fn terminate(mut self) -> Result<()> {
1525        self.terminated = true;
1526        self.child.kill().await.context("terminating node service")
1527    }
1528
1529    pub fn port(&self) -> u16 {
1530        self.port
1531    }
1532
1533    pub fn ensure_is_running(&mut self) -> Result<()> {
1534        self.child.ensure_is_running()
1535    }
1536
1537    pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1538        let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1539        let mut data = self.query_node(query).await?;
1540        Ok(serde_json::from_value(data["processInbox"].take())?)
1541    }
1542
1543    pub async fn sync(&self, chain_id: &ChainId) -> Result<u64> {
1544        let query = format!("mutation {{ sync(chainId: \"{chain_id}\") }}");
1545        let mut data = self.query_node(query).await?;
1546        Ok(serde_json::from_value(data["sync"].take())?)
1547    }
1548
1549    pub async fn transfer(
1550        &self,
1551        chain_id: ChainId,
1552        owner: AccountOwner,
1553        recipient: Account,
1554        amount: Amount,
1555    ) -> Result<CryptoHash> {
1556        let json_owner = owner.to_value();
1557        let json_recipient = recipient.to_value();
1558        let query = format!(
1559            "mutation {{ transfer(\
1560                 chainId: \"{chain_id}\", \
1561                 owner: {json_owner}, \
1562                 recipient: {json_recipient}, \
1563                 amount: \"{amount}\") \
1564             }}"
1565        );
1566        let data = self.query_node(query).await?;
1567        serde_json::from_value(data["transfer"].clone())
1568            .context("missing transfer field in response")
1569    }
1570
1571    pub async fn balance(&self, account: &Account) -> Result<Amount> {
1572        let chain = account.chain_id;
1573        let owner = account.owner;
1574        if matches!(owner, AccountOwner::CHAIN) {
1575            let query = format!(
1576                "query {{ chain(chainId:\"{chain}\") {{
1577                    executionState {{ system {{ balance }} }}
1578                }} }}"
1579            );
1580            let response = self.query_node(query).await?;
1581            let balance = &response["chain"]["executionState"]["system"]["balance"]
1582                .as_str()
1583                .unwrap();
1584            return Ok(Amount::from_str(balance)?);
1585        }
1586        let query = format!(
1587            "query {{ chain(chainId:\"{chain}\") {{
1588                executionState {{ system {{ balances {{
1589                    entry(key:\"{owner}\") {{ value }}
1590                }} }} }}
1591            }} }}"
1592        );
1593        let response = self.query_node(query).await?;
1594        let balances = &response["chain"]["executionState"]["system"]["balances"];
1595        let balance = balances["entry"]["value"].as_str();
1596        match balance {
1597            None => Ok(Amount::ZERO),
1598            Some(amount) => Ok(Amount::from_str(amount)?),
1599        }
1600    }
1601
1602    pub fn make_application<A: ContractAbi>(
1603        &self,
1604        chain_id: &ChainId,
1605        application_id: &ApplicationId<A>,
1606    ) -> Result<ApplicationWrapper<A>> {
1607        let application_id = application_id.forget_abi().to_string();
1608        let link = format!(
1609            "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1610            self.port
1611        );
1612        Ok(ApplicationWrapper::from(link))
1613    }
1614
1615    pub async fn publish_data_blob(
1616        &self,
1617        chain_id: &ChainId,
1618        bytes: Vec<u8>,
1619    ) -> Result<CryptoHash> {
1620        let query = format!(
1621            "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1622            chain_id.to_value(),
1623            bytes.to_value(),
1624        );
1625        let data = self.query_node(query).await?;
1626        serde_json::from_value(data["publishDataBlob"].clone())
1627            .context("missing publishDataBlob field in response")
1628    }
1629
1630    pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1631        &self,
1632        chain_id: &ChainId,
1633        contract: PathBuf,
1634        service: PathBuf,
1635        vm_runtime: VmRuntime,
1636    ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1637        let contract_code = Bytecode::load_from_file(&contract).await?;
1638        let service_code = Bytecode::load_from_file(&service).await?;
1639        let query = format!(
1640            "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1641            chain_id.to_value(),
1642            contract_code.to_value(),
1643            service_code.to_value(),
1644            vm_runtime.to_value(),
1645        );
1646        let data = self.query_node(query).await?;
1647        let module_str = data["publishModule"]
1648            .as_str()
1649            .context("module ID not found")?;
1650        let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1651        Ok(module_id.with_abi())
1652    }
1653
1654    pub async fn query_committees(
1655        &self,
1656        chain_id: &ChainId,
1657    ) -> Result<BTreeMap<Epoch, CryptoHash>> {
1658        let query = format!(
1659            "query {{ chain(chainId:\"{chain_id}\") {{
1660                executionState {{ system {{ committees }} }}
1661            }} }}"
1662        );
1663        let mut response = self.query_node(query).await?;
1664        let committees = response["chain"]["executionState"]["system"]["committees"].take();
1665        Ok(serde_json::from_value(committees)?)
1666    }
1667
1668    pub async fn events_from_index(
1669        &self,
1670        chain_id: &ChainId,
1671        stream_id: &StreamId,
1672        start_index: u32,
1673    ) -> Result<Vec<IndexAndEvent>> {
1674        let query = format!(
1675            "query {{
1676               eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1677               {{ index event }}
1678             }}",
1679            stream_id.to_value()
1680        );
1681        let mut response = self.query_node(query).await?;
1682        let response = response["eventsFromIndex"].take();
1683        Ok(serde_json::from_value(response)?)
1684    }
1685
1686    pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1687        let n_try = 5;
1688        let query = query.as_ref();
1689        for i in 0..n_try {
1690            linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1691            let url = format!("http://localhost:{}/", self.port);
1692            let client = reqwest_client();
1693            let result = client
1694                .post(url)
1695                .json(&json!({ "query": query }))
1696                .send()
1697                .await;
1698            if matches!(result, Err(ref error) if error.is_timeout()) {
1699                tracing::warn!(
1700                    "Timeout when sending query {} to the node service",
1701                    truncate_query_output(query)
1702                );
1703                continue;
1704            }
1705            let response = result.with_context(|| {
1706                format!(
1707                    "query_node: failed to post query={}",
1708                    truncate_query_output(query)
1709                )
1710            })?;
1711            ensure!(
1712                response.status().is_success(),
1713                "Query \"{}\" failed: {}",
1714                truncate_query_output(query),
1715                response
1716                    .text()
1717                    .await
1718                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1719            );
1720            let value: Value = response.json().await.context("invalid JSON")?;
1721            if let Some(errors) = value.get("errors") {
1722                tracing::warn!(
1723                    "Query \"{}\" failed: {}",
1724                    truncate_query_output(query),
1725                    errors
1726                );
1727            } else {
1728                return Ok(value["data"].clone());
1729            }
1730        }
1731        bail!(
1732            "Query \"{}\" failed after {} retries.",
1733            truncate_query_output(query),
1734            n_try
1735        );
1736    }
1737
1738    pub async fn create_application<
1739        Abi: ContractAbi,
1740        Parameters: Serialize,
1741        InstantiationArgument: Serialize,
1742    >(
1743        &self,
1744        chain_id: &ChainId,
1745        module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1746        parameters: &Parameters,
1747        argument: &InstantiationArgument,
1748        required_application_ids: &[ApplicationId],
1749    ) -> Result<ApplicationId<Abi>> {
1750        let module_id = module_id.forget_abi();
1751        let json_required_applications_ids = required_application_ids
1752            .iter()
1753            .map(ApplicationId::to_string)
1754            .collect::<Vec<_>>()
1755            .to_value();
1756        // Convert to `serde_json::Value` then `async_graphql::Value` via the trait `InputType`.
1757        let new_parameters = serde_json::to_value(parameters)
1758            .context("could not create parameters JSON")?
1759            .to_value();
1760        let new_argument = serde_json::to_value(argument)
1761            .context("could not create argument JSON")?
1762            .to_value();
1763        let query = format!(
1764            "mutation {{ createApplication(\
1765                 chainId: \"{chain_id}\",
1766                 moduleId: \"{module_id}\", \
1767                 parameters: {new_parameters}, \
1768                 instantiationArgument: {new_argument}, \
1769                 requiredApplicationIds: {json_required_applications_ids}) \
1770             }}"
1771        );
1772        let data = self.query_node(query).await?;
1773        let app_id_str = data["createApplication"]
1774            .as_str()
1775            .context("missing createApplication string in response")?
1776            .trim();
1777        Ok(app_id_str
1778            .parse::<ApplicationId>()
1779            .context("invalid application ID")?
1780            .with_abi())
1781    }
1782
1783    /// Obtains the hash and height of the `chain`'s tip block, as known by this node service.
1784    pub async fn chain_tip(&self, chain: ChainId) -> Result<Option<(CryptoHash, BlockHeight)>> {
1785        let query = format!(
1786            r#"query {{ block(chainId: "{chain}") {{
1787                hash
1788                block {{ header {{ height }} }}
1789            }} }}"#
1790        );
1791
1792        let mut response = self.query_node(&query).await?;
1793
1794        match (
1795            mem::take(&mut response["block"]["hash"]),
1796            mem::take(&mut response["block"]["block"]["header"]["height"]),
1797        ) {
1798            (Value::Null, Value::Null) => Ok(None),
1799            (Value::String(hash), Value::Number(height)) => Ok(Some((
1800                hash.parse()
1801                    .context("Received an invalid hash {hash:?} for chain tip")?,
1802                BlockHeight(height.as_u64().unwrap()),
1803            ))),
1804            invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
1805        }
1806    }
1807
1808    /// Subscribes to the node service and returns a stream of notifications about a chain.
1809    pub async fn notifications(
1810        &self,
1811        chain_id: ChainId,
1812    ) -> Result<Pin<Box<impl Stream<Item = Result<Notification>>>>> {
1813        let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
1814        let url = format!("ws://localhost:{}/ws", self.port);
1815        let mut request = url.into_client_request()?;
1816        request.headers_mut().insert(
1817            "Sec-WebSocket-Protocol",
1818            HeaderValue::from_str("graphql-transport-ws")?,
1819        );
1820        let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1821        let init_json = json!({
1822          "type": "connection_init",
1823          "payload": {}
1824        });
1825        websocket.send(init_json.to_string().into()).await?;
1826        let text = websocket
1827            .next()
1828            .await
1829            .context("Failed to establish connection")??
1830            .into_text()?;
1831        ensure!(
1832            text == "{\"type\":\"connection_ack\"}",
1833            "Unexpected response: {text}"
1834        );
1835        let query_json = json!({
1836          "id": "1",
1837          "type": "start",
1838          "payload": {
1839            "query": query,
1840            "variables": {},
1841            "operationName": null
1842          }
1843        });
1844        websocket.send(query_json.to_string().into()).await?;
1845        Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
1846            |message| async {
1847                let text = message.into_text()?;
1848                let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1849                if let Some(errors) = value["payload"].get("errors") {
1850                    bail!("Notification subscription failed: {errors:?}");
1851                }
1852                serde_json::from_value(value["payload"]["data"]["notifications"].clone())
1853                    .context("Failed to deserialize notification")
1854            },
1855        )))
1856    }
1857
1858    /// Subscribes to query results via the `queryResult` GraphQL subscription.
1859    pub async fn query_result(
1860        &self,
1861        name: &str,
1862        chain_id: ChainId,
1863        application_id: &ApplicationId,
1864    ) -> Result<Pin<Box<impl Stream<Item = Result<Value>>>>> {
1865        let query = format!(
1866            r#"subscription {{ queryResult(name: "{name}", chainId: "{chain_id}", applicationId: "{application_id}") }}"#,
1867        );
1868        let url = format!("ws://localhost:{}/ws", self.port);
1869        let mut request = url.into_client_request()?;
1870        request.headers_mut().insert(
1871            "Sec-WebSocket-Protocol",
1872            HeaderValue::from_str("graphql-transport-ws")?,
1873        );
1874        let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1875        let init_json = json!({
1876          "type": "connection_init",
1877          "payload": {}
1878        });
1879        websocket.send(init_json.to_string().into()).await?;
1880        let text = websocket
1881            .next()
1882            .await
1883            .context("Failed to establish connection")??
1884            .into_text()?;
1885        ensure!(
1886            text == "{\"type\":\"connection_ack\"}",
1887            "Unexpected response: {text}"
1888        );
1889        let query_json = json!({
1890          "id": "1",
1891          "type": "start",
1892          "payload": {
1893            "query": query,
1894            "variables": {},
1895            "operationName": null
1896          }
1897        });
1898        websocket.send(query_json.to_string().into()).await?;
1899        Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
1900            |message| async {
1901                let text = message.into_text()?;
1902                let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1903                if let Some(errors) = value["payload"].get("errors") {
1904                    bail!("Query result subscription failed: {errors:?}");
1905                }
1906                Ok(value["payload"]["data"]["queryResult"].clone())
1907            },
1908        )))
1909    }
1910}
1911
1912/// A running faucet service.
1913pub struct FaucetService {
1914    port: u16,
1915    child: Child,
1916    _temp_dir: tempfile::TempDir,
1917    terminated: bool,
1918}
1919
1920impl Drop for FaucetService {
1921    fn drop(&mut self) {
1922        if !self.terminated {
1923            log_unexpected_exit(&mut self.child, "faucet", self.port);
1924        }
1925    }
1926}
1927
1928impl FaucetService {
1929    fn new(port: u16, child: Child, temp_dir: tempfile::TempDir) -> Self {
1930        Self {
1931            port,
1932            child,
1933            _temp_dir: temp_dir,
1934            terminated: false,
1935        }
1936    }
1937
1938    pub async fn terminate(mut self) -> Result<()> {
1939        self.terminated = true;
1940        self.child
1941            .kill()
1942            .await
1943            .context("terminating faucet service")
1944    }
1945
1946    pub fn ensure_is_running(&mut self) -> Result<()> {
1947        self.child.ensure_is_running()
1948    }
1949
1950    pub fn instance(&self) -> Faucet {
1951        Faucet::new(format!("http://localhost:{}/", self.port))
1952    }
1953}
1954
1955/// A running `Application` to be queried in GraphQL.
1956pub struct ApplicationWrapper<A> {
1957    uri: String,
1958    _phantom: PhantomData<A>,
1959}
1960
1961impl<A> ApplicationWrapper<A> {
1962    pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
1963        let query = query.as_ref();
1964        let value = self.run_json_query(json!({ "query": query })).await?;
1965        Ok(value["data"].clone())
1966    }
1967
1968    pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
1969        const MAX_RETRIES: usize = 5;
1970
1971        for i in 0.. {
1972            let client = reqwest_client();
1973            let result = client.post(&self.uri).json(&query).send().await;
1974            let response = match result {
1975                Ok(response) => response,
1976                Err(error) if i < MAX_RETRIES => {
1977                    tracing::warn!(
1978                        "Failed to post query \"{}\": {error}; retrying",
1979                        truncate_query_output_serialize(&query),
1980                    );
1981                    continue;
1982                }
1983                Err(error) => {
1984                    let query = truncate_query_output_serialize(&query);
1985                    return Err(error)
1986                        .with_context(|| format!("run_json_query: failed to post query={query}"));
1987                }
1988            };
1989            ensure!(
1990                response.status().is_success(),
1991                "Query \"{}\" failed: {}",
1992                truncate_query_output_serialize(&query),
1993                response
1994                    .text()
1995                    .await
1996                    .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1997            );
1998            let value: Value = response.json().await.context("invalid JSON")?;
1999            if let Some(errors) = value.get("errors") {
2000                bail!(
2001                    "Query \"{}\" failed: {}",
2002                    truncate_query_output_serialize(&query),
2003                    errors
2004                );
2005            }
2006            return Ok(value);
2007        }
2008        unreachable!()
2009    }
2010
2011    pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
2012        let query = query.as_ref();
2013        self.run_graphql_query(&format!("query {{ {query} }}"))
2014            .await
2015    }
2016
2017    pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
2018        let query = query.as_ref().trim();
2019        let name = query
2020            .split_once(|ch: char| !ch.is_alphanumeric())
2021            .map_or(query, |(name, _)| name);
2022        let data = self.query(query).await?;
2023        serde_json::from_value(data[name].clone())
2024            .with_context(|| format!("{name} field missing in response"))
2025    }
2026
2027    pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
2028        let mutation = mutation.as_ref();
2029        self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
2030            .await
2031    }
2032
2033    pub async fn multiple_mutate(&self, mutations: &[String]) -> Result<Value> {
2034        let mut out = String::from("mutation {\n");
2035        for (index, mutation) in mutations.iter().enumerate() {
2036            out = format!("{}  u{}: {}\n", out, index, mutation);
2037        }
2038        out.push_str("}\n");
2039        self.run_graphql_query(&out).await
2040    }
2041}
2042
2043impl<A> From<String> for ApplicationWrapper<A> {
2044    fn from(uri: String) -> ApplicationWrapper<A> {
2045        ApplicationWrapper {
2046            uri,
2047            _phantom: PhantomData,
2048        }
2049    }
2050}
2051
2052/// Returns the timeout for tests that wait for notifications, either read from the env
2053/// variable `LINERA_TEST_NOTIFICATION_TIMEOUT_MS`, or the default value of 10 seconds.
2054#[cfg(with_testing)]
2055fn notification_timeout() -> Duration {
2056    const NOTIFICATION_TIMEOUT_MS_ENV: &str = "LINERA_TEST_NOTIFICATION_TIMEOUT_MS";
2057    const NOTIFICATION_TIMEOUT_MS_DEFAULT: u64 = 10_000;
2058
2059    match env::var(NOTIFICATION_TIMEOUT_MS_ENV) {
2060        Ok(var) => Duration::from_millis(var.parse().unwrap_or_else(|error| {
2061            panic!("{NOTIFICATION_TIMEOUT_MS_ENV} is not a valid number: {error}")
2062        })),
2063        Err(env::VarError::NotPresent) => Duration::from_millis(NOTIFICATION_TIMEOUT_MS_DEFAULT),
2064        Err(env::VarError::NotUnicode(_)) => {
2065            panic!("{NOTIFICATION_TIMEOUT_MS_ENV} must be valid Unicode")
2066        }
2067    }
2068}
2069
2070#[cfg(with_testing)]
2071pub trait NotificationsExt {
2072    /// Waits for a notification for which `f` returns `Some(t)`, and returns `t`.
2073    fn wait_for<T>(
2074        &mut self,
2075        f: impl FnMut(Notification) -> Option<T>,
2076    ) -> impl Future<Output = Result<T>>;
2077
2078    /// Waits for a `NewEvents` notification for the given block height. If no height is specified,
2079    /// any height is accepted.
2080    fn wait_for_events(
2081        &mut self,
2082        expected_height: impl Into<Option<BlockHeight>>,
2083    ) -> impl Future<Output = Result<BTreeSet<StreamId>>> {
2084        let expected_height = expected_height.into();
2085        self.wait_for(move |notification| {
2086            if let Reason::NewEvents {
2087                height,
2088                event_streams,
2089                ..
2090            } = notification.reason
2091            {
2092                if expected_height.is_none_or(|h| h == height) {
2093                    return Some(event_streams);
2094                }
2095            }
2096            None
2097        })
2098    }
2099
2100    /// Waits for a `NewBlock` notification for the given block height. If no height is specified,
2101    /// any height is accepted.
2102    fn wait_for_block(
2103        &mut self,
2104        expected_height: impl Into<Option<BlockHeight>>,
2105    ) -> impl Future<Output = Result<CryptoHash>> {
2106        let expected_height = expected_height.into();
2107        self.wait_for(move |notification| {
2108            if let Reason::NewBlock { height, hash, .. } = notification.reason {
2109                if expected_height.is_none_or(|h| h == height) {
2110                    return Some(hash);
2111                }
2112            }
2113            None
2114        })
2115    }
2116
2117    /// Waits for a `NewIncomingBundle` notification for the given sender chain and sender block
2118    /// height. If no height is specified, any height is accepted.
2119    fn wait_for_bundle(
2120        &mut self,
2121        expected_origin: ChainId,
2122        expected_height: impl Into<Option<BlockHeight>>,
2123    ) -> impl Future<Output = Result<()>> {
2124        let expected_height = expected_height.into();
2125        self.wait_for(move |notification| {
2126            if let Reason::NewIncomingBundle { height, origin } = notification.reason {
2127                if expected_height.is_none_or(|h| h == height) && origin == expected_origin {
2128                    return Some(());
2129                }
2130            }
2131            None
2132        })
2133    }
2134}
2135
2136#[cfg(with_testing)]
2137impl<S: Stream<Item = Result<Notification>>> NotificationsExt for Pin<Box<S>> {
2138    async fn wait_for<T>(&mut self, mut f: impl FnMut(Notification) -> Option<T>) -> Result<T> {
2139        let mut timeout = Box::pin(linera_base::time::timer::sleep(notification_timeout())).fuse();
2140        loop {
2141            let notification = futures::select! {
2142                () = timeout => bail!("Timeout waiting for notification"),
2143                notification = self.next().fuse() => notification.context("Stream closed")??,
2144            };
2145            if let Some(t) = f(notification) {
2146                return Ok(t);
2147            }
2148        }
2149    }
2150}