1use std::{
5 borrow::Cow,
6 collections::BTreeMap,
7 env,
8 marker::PhantomData,
9 mem,
10 path::{Path, PathBuf},
11 pin::Pin,
12 process::Stdio,
13 str::FromStr,
14 sync,
15 time::Duration,
16};
17
18use anyhow::{bail, ensure, Context, Result};
19use async_graphql::InputType;
20use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
21use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
22use heck::ToKebabCase;
23use linera_base::{
24 abi::ContractAbi,
25 command::{resolve_binary, CommandExt},
26 crypto::{CryptoHash, InMemorySigner},
27 data_types::{Amount, ApplicationPermissions, BlockHeight, Bytecode, Epoch},
28 identifiers::{
29 Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
30 },
31 vm::VmRuntime,
32};
33use linera_client::client_options::ResourceControlPolicyConfig;
34use linera_core::worker::Notification;
35use linera_faucet_client::Faucet;
36use serde::{de::DeserializeOwned, ser::Serialize};
37use serde_command_opts::to_args;
38use serde_json::{json, Value};
39use tempfile::TempDir;
40use tokio::{
41 io::{AsyncBufReadExt, BufReader},
42 process::{Child, Command},
43 sync::oneshot,
44 task::JoinHandle,
45};
46#[cfg(with_testing)]
47use {
48 futures::FutureExt as _,
49 linera_core::worker::Reason,
50 std::{collections::BTreeSet, future::Future},
51};
52
53use crate::{
54 cli::command::{BenchmarkCommand, ResourceControlPolicyOverrides},
55 cli_wrappers::{
56 local_net::{PathProvider, ProcessInbox},
57 Network,
58 },
59 util::{self, ChildExt},
60 Wallet,
61};
62
63const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
66
67fn reqwest_client() -> reqwest::Client {
68 reqwest::ClientBuilder::new()
69 .timeout(Duration::from_secs(60))
70 .build()
71 .unwrap()
72}
73
74pub struct ClientWrapper {
76 binary_path: sync::Mutex<Option<PathBuf>>,
77 testing_prng_seed: Option<u64>,
78 storage: String,
79 wallet: String,
80 keystore: String,
81 max_pending_message_bundles: usize,
82 network: Network,
83 pub path_provider: PathProvider,
84 on_drop: OnClientDrop,
85 extra_args: Vec<String>,
86}
87
88#[derive(Clone, Copy, Debug, Eq, PartialEq)]
90pub enum OnClientDrop {
91 CloseChains,
93 LeakChains,
95}
96
97impl ClientWrapper {
98 pub fn new(
99 path_provider: PathProvider,
100 network: Network,
101 testing_prng_seed: Option<u64>,
102 id: usize,
103 on_drop: OnClientDrop,
104 ) -> Self {
105 Self::new_with_extra_args(
106 path_provider,
107 network,
108 testing_prng_seed,
109 id,
110 on_drop,
111 vec!["--wait-for-outgoing-messages".to_string()],
112 None,
113 )
114 }
115
116 pub fn new_with_extra_args(
117 path_provider: PathProvider,
118 network: Network,
119 testing_prng_seed: Option<u64>,
120 id: usize,
121 on_drop: OnClientDrop,
122 extra_args: Vec<String>,
123 binary_dir: Option<PathBuf>,
124 ) -> Self {
125 let storage = format!(
126 "rocksdb:{}/client_{}.db",
127 path_provider.path().display(),
128 id
129 );
130 let wallet = format!("wallet_{id}.json");
131 let keystore = format!("keystore_{id}.json");
132 let binary_path = binary_dir.map(|dir| dir.join("linera"));
133 Self {
134 binary_path: sync::Mutex::new(binary_path),
135 testing_prng_seed,
136 storage,
137 wallet,
138 keystore,
139 max_pending_message_bundles: 10_000,
140 network,
141 path_provider,
142 on_drop,
143 extra_args,
144 }
145 }
146
147 pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
149 let tmp = TempDir::new()?;
150 let mut command = self.command().await?;
151 command
152 .current_dir(tmp.path())
153 .arg("project")
154 .arg("new")
155 .arg(project_name)
156 .arg("--linera-root")
157 .arg(linera_root)
158 .spawn_and_wait_for_stdout()
159 .await?;
160 Ok(tmp)
161 }
162
163 pub async fn project_publish<T: Serialize>(
165 &self,
166 path: PathBuf,
167 required_application_ids: Vec<String>,
168 publisher: impl Into<Option<ChainId>>,
169 argument: &T,
170 ) -> Result<String> {
171 let json_parameters = serde_json::to_string(&())?;
172 let json_argument = serde_json::to_string(argument)?;
173 let mut command = self.command().await?;
174 command
175 .arg("project")
176 .arg("publish-and-create")
177 .arg(path)
178 .args(publisher.into().iter().map(ChainId::to_string))
179 .args(["--json-parameters", &json_parameters])
180 .args(["--json-argument", &json_argument]);
181 if !required_application_ids.is_empty() {
182 command.arg("--required-application-ids");
183 command.args(required_application_ids);
184 }
185 let stdout = command.spawn_and_wait_for_stdout().await?;
186 Ok(stdout.trim().to_string())
187 }
188
189 pub async fn project_test(&self, path: &Path) -> Result<()> {
191 self.command()
192 .await
193 .context("failed to create project test command")?
194 .current_dir(path)
195 .arg("project")
196 .arg("test")
197 .spawn_and_wait_for_stdout()
198 .await?;
199 Ok(())
200 }
201
202 async fn command_with_envs_and_arguments(
203 &self,
204 envs: &[(&str, &str)],
205 arguments: impl IntoIterator<Item = Cow<'_, str>>,
206 ) -> Result<Command> {
207 let mut command = self.command_binary().await?;
208 command.current_dir(self.path_provider.path());
209 for (key, value) in envs {
210 command.env(key, value);
211 }
212 for argument in arguments {
213 command.arg(&*argument);
214 }
215 Ok(command)
216 }
217
218 async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
219 self.command_with_envs_and_arguments(envs, self.command_arguments())
220 .await
221 }
222
223 async fn command_with_arguments(
224 &self,
225 arguments: impl IntoIterator<Item = Cow<'_, str>>,
226 ) -> Result<Command> {
227 self.command_with_envs_and_arguments(
228 &[(
229 "RUST_LOG",
230 &std::env::var("RUST_LOG").unwrap_or_else(|_| String::from("linera=debug")),
231 )],
232 arguments,
233 )
234 .await
235 }
236
237 async fn command(&self) -> Result<Command> {
238 self.command_with_envs(&[(
239 "RUST_LOG",
240 &std::env::var("RUST_LOG").unwrap_or_else(|_| String::from("linera=debug")),
241 )])
242 .await
243 }
244
245 fn required_command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
246 [
247 "--wallet".into(),
248 self.wallet.as_str().into(),
249 "--keystore".into(),
250 self.keystore.as_str().into(),
251 "--storage".into(),
252 self.storage.as_str().into(),
253 "--send-timeout-ms".into(),
254 "500000".into(),
255 "--recv-timeout-ms".into(),
256 "500000".into(),
257 ]
258 .into_iter()
259 .chain(self.extra_args.iter().map(|s| s.as_str().into()))
260 }
261
262 fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
264 self.required_command_arguments().chain([
265 "--max-pending-message-bundles".into(),
266 self.max_pending_message_bundles.to_string().into(),
267 ])
268 }
269
270 async fn command_binary(&self) -> Result<Command> {
274 match self.command_with_cached_binary_path() {
275 Some(command) => Ok(command),
276 None => {
277 let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
278 let command = Command::new(&resolved_path);
279
280 self.set_cached_binary_path(resolved_path);
281
282 Ok(command)
283 }
284 }
285 }
286
287 fn command_with_cached_binary_path(&self) -> Option<Command> {
289 let binary_path = self.binary_path.lock().unwrap();
290
291 binary_path.as_ref().map(Command::new)
292 }
293
294 fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
302 let mut binary_path = self.binary_path.lock().unwrap();
303
304 if binary_path.is_none() {
305 *binary_path = Some(new_binary_path);
306 } else {
307 assert_eq!(*binary_path, Some(new_binary_path));
308 }
309 }
310
311 pub async fn create_genesis_config(
313 &self,
314 num_other_initial_chains: u32,
315 initial_funding: Amount,
316 policy_config: ResourceControlPolicyConfig,
317 http_allow_list: Option<Vec<String>>,
318 ) -> Result<()> {
319 let mut command = self.command().await?;
320 command
321 .args([
322 "create-genesis-config",
323 &num_other_initial_chains.to_string(),
324 ])
325 .args(["--initial-funding", &initial_funding.to_string()])
326 .args(["--committee", "committee.json"])
327 .args(["--genesis", "genesis.json"])
328 .args([
329 "--policy-config",
330 &policy_config.to_string().to_kebab_case(),
331 ]);
332 if let Some(allow_list) = http_allow_list {
333 command
334 .arg("--http-request-allow-list")
335 .arg(allow_list.join(","));
336 }
337 if let Some(seed) = self.testing_prng_seed {
338 command.arg("--testing-prng-seed").arg(seed.to_string());
339 }
340 command.spawn_and_wait_for_stdout().await?;
341 Ok(())
342 }
343
344 pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
347 let mut command = self.command().await?;
348 command.args(["wallet", "init"]);
349 match faucet {
350 None => command.args(["--genesis", "genesis.json"]),
351 Some(faucet) => command.args(["--faucet", faucet.url()]),
352 };
353 if let Some(seed) = self.testing_prng_seed {
354 command.arg("--testing-prng-seed").arg(seed.to_string());
355 }
356 command.spawn_and_wait_for_stdout().await?;
357 Ok(())
358 }
359
360 pub async fn request_chain(
362 &self,
363 faucet: &Faucet,
364 set_default: bool,
365 ) -> Result<(ChainId, AccountOwner)> {
366 let mut command = self.command().await?;
367 command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
368 if set_default {
369 command.arg("--set-default");
370 }
371 let stdout = command.spawn_and_wait_for_stdout().await?;
372 let mut lines = stdout.split_whitespace();
373 let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
374 let owner = lines.next().context("missing chain owner")?.parse()?;
375 Ok((chain_id, owner))
376 }
377
378 #[expect(clippy::too_many_arguments)]
380 pub async fn publish_and_create<
381 A: ContractAbi,
382 Parameters: Serialize,
383 InstantiationArgument: Serialize,
384 >(
385 &self,
386 contract: PathBuf,
387 service: PathBuf,
388 vm_runtime: VmRuntime,
389 parameters: &Parameters,
390 argument: &InstantiationArgument,
391 required_application_ids: &[ApplicationId],
392 publisher: impl Into<Option<ChainId>>,
393 ) -> Result<ApplicationId<A>> {
394 let json_parameters = serde_json::to_string(parameters)?;
395 let json_argument = serde_json::to_string(argument)?;
396 let mut command = self.command().await?;
397 let vm_runtime = format!("{vm_runtime}");
398 command
399 .arg("publish-and-create")
400 .args([contract, service])
401 .args(["--vm-runtime", &vm_runtime.to_lowercase()])
402 .args(publisher.into().iter().map(ChainId::to_string))
403 .args(["--json-parameters", &json_parameters])
404 .args(["--json-argument", &json_argument]);
405 if !required_application_ids.is_empty() {
406 command.arg("--required-application-ids");
407 command.args(
408 required_application_ids
409 .iter()
410 .map(ApplicationId::to_string),
411 );
412 }
413 let stdout = command.spawn_and_wait_for_stdout().await?;
414 Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
415 }
416
417 pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
419 &self,
420 contract: PathBuf,
421 service: PathBuf,
422 vm_runtime: VmRuntime,
423 publisher: impl Into<Option<ChainId>>,
424 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
425 self.publish_module_with_formats(contract, service, None, vm_runtime, publisher)
426 .await
427 }
428
429 pub async fn publish_module_with_formats<Abi, Parameters, InstantiationArgument>(
431 &self,
432 contract: PathBuf,
433 service: PathBuf,
434 formats: Option<PathBuf>,
435 vm_runtime: VmRuntime,
436 publisher: impl Into<Option<ChainId>>,
437 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
438 let mut command = self.command().await?;
439 command
440 .arg("publish-module")
441 .args([contract, service])
442 .args(["--vm-runtime", &format!("{vm_runtime}").to_lowercase()]);
443 if let Some(formats_path) = formats {
444 command.arg("--formats").arg(formats_path);
445 }
446 let stdout = command
447 .args(publisher.into().iter().map(ChainId::to_string))
448 .spawn_and_wait_for_stdout()
449 .await?;
450 let module_id: ModuleId = stdout.trim().parse()?;
451 Ok(module_id.with_abi())
452 }
453
454 pub async fn create_application<
456 Abi: ContractAbi,
457 Parameters: Serialize,
458 InstantiationArgument: Serialize,
459 >(
460 &self,
461 module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
462 parameters: &Parameters,
463 argument: &InstantiationArgument,
464 required_application_ids: &[ApplicationId],
465 creator: impl Into<Option<ChainId>>,
466 ) -> Result<ApplicationId<Abi>> {
467 let json_parameters = serde_json::to_string(parameters)?;
468 let json_argument = serde_json::to_string(argument)?;
469 let mut command = self.command().await?;
470 command
471 .arg("create-application")
472 .arg(module_id.forget_abi().to_string())
473 .args(["--json-parameters", &json_parameters])
474 .args(["--json-argument", &json_argument])
475 .args(creator.into().iter().map(ChainId::to_string));
476 if !required_application_ids.is_empty() {
477 command.arg("--required-application-ids");
478 command.args(
479 required_application_ids
480 .iter()
481 .map(ApplicationId::to_string),
482 );
483 }
484 let stdout = command.spawn_and_wait_for_stdout().await?;
485 Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
486 }
487
488 pub async fn run_node_service(
490 &self,
491 port: impl Into<Option<u16>>,
492 process_inbox: ProcessInbox,
493 ) -> Result<NodeService> {
494 self.run_node_service_with_options(port, process_inbox, &[], &[], false)
495 .await
496 }
497
498 pub async fn run_node_service_with_options(
500 &self,
501 port: impl Into<Option<u16>>,
502 process_inbox: ProcessInbox,
503 operator_application_ids: &[ApplicationId],
504 operators: &[(String, PathBuf)],
505 read_only: bool,
506 ) -> Result<NodeService> {
507 self.run_node_service_with_all_options(
508 port,
509 process_inbox,
510 operator_application_ids,
511 operators,
512 read_only,
513 &[],
514 &[],
515 )
516 .await
517 }
518
519 #[expect(clippy::too_many_arguments)]
521 pub async fn run_node_service_with_all_options(
522 &self,
523 port: impl Into<Option<u16>>,
524 process_inbox: ProcessInbox,
525 operator_application_ids: &[ApplicationId],
526 operators: &[(String, PathBuf)],
527 read_only: bool,
528 allowed_subscriptions: &[String],
529 subscription_ttls: &[(String, u64)],
530 ) -> Result<NodeService> {
531 let port = port.into().unwrap_or(8080);
532 let mut command = self.command().await?;
533 command.arg("service");
534 if let ProcessInbox::Skip = process_inbox {
535 command.arg("--listener-skip-process-inbox");
536 }
537 if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
538 command.args(var.split_whitespace());
539 }
540 for app_id in operator_application_ids {
541 command.args(["--operator-application-ids", &app_id.to_string()]);
542 }
543 for (name, path) in operators {
544 command.args(["--operators", &format!("{}={}", name, path.display())]);
545 }
546 if read_only {
547 command.arg("--read-only");
548 }
549 for query in allowed_subscriptions {
550 command.args(["--allow-subscription", query]);
551 }
552 for (name, secs) in subscription_ttls {
553 command.args(["--subscription-ttl-secs", &format!("{name}={secs}")]);
554 }
555 let child = command
556 .args(["--port".to_string(), port.to_string()])
557 .spawn_into()?;
558 let client = reqwest_client();
559 for i in 0..10 {
560 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
561 let request = client.get(format!("http://localhost:{port}/")).send().await;
562 if request.is_ok() {
563 tracing::info!("Node service has started");
564 return Ok(NodeService::new(port, child));
565 } else {
566 tracing::warn!("Waiting for node service to start");
567 }
568 }
569 bail!("Failed to start node service");
570 }
571
572 pub async fn run_node_service_with_controller(
574 &self,
575 port: impl Into<Option<u16>>,
576 process_inbox: ProcessInbox,
577 controller_id: &ApplicationId,
578 operators: &[(String, PathBuf)],
579 ) -> Result<NodeService> {
580 let port = port.into().unwrap_or(8080);
581 let mut command = self.command().await?;
582 command.arg("service");
583 if let ProcessInbox::Skip = process_inbox {
584 command.arg("--listener-skip-process-inbox");
585 }
586 if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
587 command.args(var.split_whitespace());
588 }
589 command.args(["--controller-id", &controller_id.to_string()]);
590 for (name, path) in operators {
591 command.args(["--operators", &format!("{}={}", name, path.display())]);
592 }
593 let child = command
594 .args(["--port".to_string(), port.to_string()])
595 .spawn_into()?;
596 let client = reqwest_client();
597 for i in 0..10 {
598 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
599 let request = client.get(format!("http://localhost:{port}/")).send().await;
600 if request.is_ok() {
601 tracing::info!("Node service has started");
602 return Ok(NodeService::new(port, child));
603 } else {
604 tracing::warn!("Waiting for node service to start");
605 }
606 }
607 bail!("Failed to start node service");
608 }
609
610 pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
612 let mut command = self.command().await?;
613 command.arg("validator").arg("query").arg(address);
614 let stdout = command.spawn_and_wait_for_stdout().await?;
615
616 let hash = stdout
619 .lines()
620 .find_map(|line| {
621 line.strip_prefix("Genesis config hash: ")
622 .and_then(|hash_str| hash_str.trim().parse().ok())
623 })
624 .context("error while parsing the result of `linera validator query`")?;
625 Ok(hash)
626 }
627
628 pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
630 let mut command = self.command().await?;
631 command.arg("validator").arg("list");
632 if let Some(chain_id) = chain_id {
633 command.args(["--chain-id", &chain_id.to_string()]);
634 }
635 command.spawn_and_wait_for_stdout().await?;
636 Ok(())
637 }
638
639 pub async fn sync_validator(
641 &self,
642 chain_ids: impl IntoIterator<Item = &ChainId>,
643 validator_address: impl Into<String>,
644 ) -> Result<()> {
645 let mut command = self.command().await?;
646 command
647 .arg("validator")
648 .arg("sync")
649 .arg(validator_address.into());
650 let mut chain_ids = chain_ids.into_iter().peekable();
651 if chain_ids.peek().is_some() {
652 command
653 .arg("--chains")
654 .args(chain_ids.map(ChainId::to_string));
655 }
656 command.spawn_and_wait_for_stdout().await?;
657 Ok(())
658 }
659
660 pub async fn validator_benchmark(
662 &self,
663 address: impl Into<String>,
664 chains: impl IntoIterator<Item = &ChainId>,
665 extra_args: &[&str],
666 ) -> Result<String> {
667 let mut command = self.command().await?;
668 command
669 .arg("validator")
670 .arg("benchmark")
671 .arg(address.into());
672 for chain in chains {
673 command.args(["--chain", &chain.to_string()]);
674 }
675 command.args(extra_args);
676 command.spawn_and_wait_for_stdout().await
677 }
678
679 pub async fn run_faucet(
681 &self,
682 port: impl Into<Option<u16>>,
683 chain_id: Option<ChainId>,
684 amount: Amount,
685 ) -> Result<FaucetService> {
686 let port = port.into().unwrap_or(8080);
687 let temp_dir = tempfile::tempdir()
688 .context("Failed to create temporary directory for faucet storage")?;
689 let storage_path = temp_dir.path().join("faucet_storage.sqlite");
690 let mut command = self.command().await?;
691 let command = command
692 .arg("faucet")
693 .args(["--port".to_string(), port.to_string()])
694 .args(["--amount".to_string(), amount.to_string()])
695 .args([
696 "--storage-path".to_string(),
697 storage_path.to_string_lossy().to_string(),
698 ]);
699 if let Some(chain_id) = chain_id {
700 command.arg(chain_id.to_string());
701 }
702 let child = command.spawn_into()?;
703 let client = reqwest_client();
704 for i in 0..10 {
705 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
706 let request = client.get(format!("http://localhost:{port}/")).send().await;
707 if request.is_ok() {
708 tracing::info!("Faucet has started");
709 return Ok(FaucetService::new(port, child, temp_dir));
710 } else {
711 tracing::debug!("Waiting for faucet to start");
712 }
713 }
714 bail!("Failed to start faucet");
715 }
716
717 pub async fn local_balance(&self, account: Account) -> Result<Amount> {
719 let stdout = self
720 .command()
721 .await?
722 .arg("local-balance")
723 .arg(account.to_string())
724 .spawn_and_wait_for_stdout()
725 .await?;
726 let amount = stdout
727 .trim()
728 .parse()
729 .context("error while parsing the result of `linera local-balance`")?;
730 Ok(amount)
731 }
732
733 pub async fn query_balance(&self, account: Account) -> Result<Amount> {
735 let stdout = self
736 .command()
737 .await?
738 .arg("query-balance")
739 .arg(account.to_string())
740 .spawn_and_wait_for_stdout()
741 .await?;
742 let amount = stdout
743 .trim()
744 .parse()
745 .context("error while parsing the result of `linera query-balance`")?;
746 Ok(amount)
747 }
748
749 pub async fn query_application_json<T: DeserializeOwned>(
751 &self,
752 chain_id: ChainId,
753 application_id: ApplicationId,
754 query: impl AsRef<str>,
755 ) -> Result<T> {
756 let query = query.as_ref().trim();
757 let name = query
758 .split_once(|ch: char| !ch.is_alphanumeric())
759 .map_or(query, |(name, _)| name);
760 let stdout = self
761 .command()
762 .await?
763 .arg("query-application")
764 .arg("--chain-id")
765 .arg(chain_id.to_string())
766 .arg("--application-id")
767 .arg(application_id.to_string())
768 .arg(query)
769 .spawn_and_wait_for_stdout()
770 .await?;
771 let data: serde_json::Value =
772 serde_json::from_str(stdout.trim()).context("invalid JSON from query-application")?;
773 serde_json::from_value(data[name].clone())
774 .with_context(|| format!("{name} field missing in query-application response"))
775 }
776
777 pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
779 self.command()
780 .await?
781 .arg("sync")
782 .arg(chain_id.to_string())
783 .spawn_and_wait_for_stdout()
784 .await?;
785 Ok(())
786 }
787
788 pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
790 self.command()
791 .await?
792 .arg("process-inbox")
793 .arg(chain_id.to_string())
794 .spawn_and_wait_for_stdout()
795 .await?;
796 Ok(())
797 }
798
799 pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
801 self.command()
802 .await?
803 .arg("transfer")
804 .arg(amount.to_string())
805 .args(["--from", &from.to_string()])
806 .args(["--to", &to.to_string()])
807 .spawn_and_wait_for_stdout()
808 .await?;
809 Ok(())
810 }
811
812 pub async fn transfer_with_silent_logs(
814 &self,
815 amount: Amount,
816 from: ChainId,
817 to: ChainId,
818 ) -> Result<()> {
819 self.command()
820 .await?
821 .env("RUST_LOG", "off")
822 .arg("transfer")
823 .arg(amount.to_string())
824 .args(["--from", &from.to_string()])
825 .args(["--to", &to.to_string()])
826 .spawn_and_wait_for_stdout()
827 .await?;
828 Ok(())
829 }
830
831 pub async fn transfer_with_accounts(
833 &self,
834 amount: Amount,
835 from: Account,
836 to: Account,
837 ) -> Result<()> {
838 self.command()
839 .await?
840 .arg("transfer")
841 .arg(amount.to_string())
842 .args(["--from", &from.to_string()])
843 .args(["--to", &to.to_string()])
844 .spawn_and_wait_for_stdout()
845 .await?;
846 Ok(())
847 }
848
849 fn benchmark_command_internal(command: &mut Command, args: &BenchmarkCommand) -> Result<()> {
850 let mut formatted_args = to_args(&args)?;
851 let subcommand = formatted_args.remove(0);
852 formatted_args.remove(0);
855 let options = formatted_args
856 .chunks_exact(2)
857 .flat_map(|pair| {
858 let option = format!("--{}", pair[0]);
859 match pair[1].as_str() {
860 "true" => vec![option],
861 "false" => vec![],
862 _ => vec![option, pair[1].clone()],
863 }
864 })
865 .collect::<Vec<_>>();
866 command
867 .args([
868 "--max-pending-message-bundles",
869 &args.transactions_per_block().to_string(),
870 ])
871 .arg("benchmark")
872 .arg(subcommand)
873 .args(options);
874 Ok(())
875 }
876
877 async fn benchmark_command_with_envs(
878 &self,
879 args: BenchmarkCommand,
880 envs: &[(&str, &str)],
881 ) -> Result<Command> {
882 let mut command = self
883 .command_with_envs_and_arguments(envs, self.required_command_arguments())
884 .await?;
885 Self::benchmark_command_internal(&mut command, &args)?;
886 Ok(command)
887 }
888
889 async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
890 let mut command = self
891 .command_with_arguments(self.required_command_arguments())
892 .await?;
893 Self::benchmark_command_internal(&mut command, &args)?;
894 Ok(command)
895 }
896
897 pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
899 let mut command = self.benchmark_command(args).await?;
900 command.spawn_and_wait_for_stdout().await?;
901 Ok(())
902 }
903
904 pub async fn benchmark_detached(
907 &self,
908 args: BenchmarkCommand,
909 tx: oneshot::Sender<()>,
910 ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
911 let mut child = self
912 .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
913 .await?
914 .kill_on_drop(true)
915 .stdin(Stdio::piped())
916 .stdout(Stdio::piped())
917 .stderr(Stdio::piped())
918 .spawn()?;
919
920 let pid = child.id().expect("failed to get pid");
921 let stdout = child.stdout.take().expect("stdout not open");
922 let stdout_handle = tokio::spawn(async move {
923 let mut lines = BufReader::new(stdout).lines();
924 while let Ok(Some(line)) = lines.next_line().await {
925 println!("benchmark{{pid={pid}}} {line}");
926 }
927 });
928
929 let stderr = child.stderr.take().expect("stderr not open");
930 let stderr_handle = tokio::spawn(async move {
931 let mut lines = BufReader::new(stderr).lines();
932 let mut tx = Some(tx);
933 while let Ok(Some(line)) = lines.next_line().await {
934 if line.contains("Ready to start benchmark") {
935 tx.take()
936 .expect("Should only send signal once")
937 .send(())
938 .expect("failed to send ready signal to main thread");
939 } else {
940 println!("benchmark{{pid={pid}}} {line}");
941 }
942 }
943 });
944 Ok((child, stdout_handle, stderr_handle))
945 }
946
947 async fn open_chain_internal(
948 &self,
949 from: ChainId,
950 owner: Option<AccountOwner>,
951 initial_balance: Amount,
952 super_owner: bool,
953 ) -> Result<(ChainId, AccountOwner)> {
954 let mut command = self.command().await?;
955 command
956 .arg("open-chain")
957 .args(["--from", &from.to_string()])
958 .args(["--initial-balance", &initial_balance.to_string()]);
959
960 if let Some(owner) = owner {
961 command.args(["--owner", &owner.to_string()]);
962 }
963
964 if super_owner {
965 command.arg("--super-owner");
966 }
967
968 let stdout = command.spawn_and_wait_for_stdout().await?;
969 let mut split = stdout.split('\n');
970 let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
971 let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
972 if let Some(owner) = owner {
973 assert_eq!(owner, new_owner);
974 }
975 Ok((chain_id, new_owner))
976 }
977
978 pub async fn open_chain_super_owner(
980 &self,
981 from: ChainId,
982 owner: Option<AccountOwner>,
983 initial_balance: Amount,
984 ) -> Result<(ChainId, AccountOwner)> {
985 self.open_chain_internal(from, owner, initial_balance, true)
986 .await
987 }
988
989 pub async fn open_chain(
991 &self,
992 from: ChainId,
993 owner: Option<AccountOwner>,
994 initial_balance: Amount,
995 ) -> Result<(ChainId, AccountOwner)> {
996 self.open_chain_internal(from, owner, initial_balance, false)
997 .await
998 }
999
1000 pub async fn open_and_assign(
1002 &self,
1003 client: &ClientWrapper,
1004 initial_balance: Amount,
1005 ) -> Result<ChainId> {
1006 let our_chain = self
1007 .load_wallet()?
1008 .default_chain()
1009 .context("no default chain found")?;
1010 let owner = client.keygen().await?;
1011 let (new_chain, _) = self
1012 .open_chain(our_chain, Some(owner), initial_balance)
1013 .await?;
1014 client.assign(owner, new_chain).await?;
1015 Ok(new_chain)
1016 }
1017
1018 pub async fn open_multi_owner_chain(
1019 &self,
1020 from: ChainId,
1021 owners: BTreeMap<AccountOwner, u64>,
1022 multi_leader_rounds: u32,
1023 balance: Amount,
1024 base_timeout_ms: u64,
1025 ) -> Result<ChainId> {
1026 let mut command = self.command().await?;
1027 command
1028 .arg("open-multi-owner-chain")
1029 .args(["--from", &from.to_string()])
1030 .arg("--owners")
1031 .arg(serde_json::to_string(&owners)?)
1032 .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
1033 command
1034 .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
1035 .args(["--initial-balance", &balance.to_string()]);
1036
1037 let stdout = command.spawn_and_wait_for_stdout().await?;
1038 let mut split = stdout.split('\n');
1039 let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
1040
1041 Ok(chain_id)
1042 }
1043
1044 pub async fn change_ownership(
1045 &self,
1046 chain_id: ChainId,
1047 super_owners: Vec<AccountOwner>,
1048 owners: Vec<AccountOwner>,
1049 ) -> Result<()> {
1050 let mut command = self.command().await?;
1051 command
1052 .arg("change-ownership")
1053 .args(["--chain-id", &chain_id.to_string()]);
1054 command
1055 .arg("--super-owners")
1056 .arg(serde_json::to_string(&super_owners)?);
1057 command.arg("--owners").arg(serde_json::to_string(
1058 &owners
1059 .into_iter()
1060 .zip(std::iter::repeat(100u64))
1061 .collect::<BTreeMap<_, _>>(),
1062 )?);
1063 command.spawn_and_wait_for_stdout().await?;
1064 Ok(())
1065 }
1066
1067 pub async fn change_application_permissions(
1068 &self,
1069 chain_id: ChainId,
1070 application_permissions: ApplicationPermissions,
1071 ) -> Result<()> {
1072 let mut command = self.command().await?;
1073 command
1074 .arg("change-application-permissions")
1075 .args(["--chain-id", &chain_id.to_string()]);
1076 command.arg("--manage-chain").arg(serde_json::to_string(
1077 &application_permissions.manage_chain,
1078 )?);
1079 command.spawn_and_wait_for_stdout().await?;
1081 Ok(())
1082 }
1083
1084 pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
1086 let mut command = self.command().await?;
1087 command
1088 .args(["wallet", "follow-chain"])
1089 .arg(chain_id.to_string());
1090 if sync {
1091 command.arg("--sync");
1092 }
1093 command.spawn_and_wait_for_stdout().await?;
1094 Ok(())
1095 }
1096
1097 pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
1099 let mut command = self.command().await?;
1100 command
1101 .args(["wallet", "forget-chain"])
1102 .arg(chain_id.to_string());
1103 command.spawn_and_wait_for_stdout().await?;
1104 Ok(())
1105 }
1106
1107 pub async fn set_default_chain(&self, chain_id: ChainId) -> Result<()> {
1109 let mut command = self.command().await?;
1110 command
1111 .args(["wallet", "set-default"])
1112 .arg(chain_id.to_string());
1113 command.spawn_and_wait_for_stdout().await?;
1114 Ok(())
1115 }
1116
1117 pub async fn retry_pending_block(
1118 &self,
1119 chain_id: Option<ChainId>,
1120 ) -> Result<Option<CryptoHash>> {
1121 let mut command = self.command().await?;
1122 command.arg("retry-pending-block");
1123 if let Some(chain_id) = chain_id {
1124 command.arg(chain_id.to_string());
1125 }
1126 let stdout = command.spawn_and_wait_for_stdout().await?;
1127 let stdout = stdout.trim();
1128 if stdout.is_empty() {
1129 Ok(None)
1130 } else {
1131 Ok(Some(CryptoHash::from_str(stdout)?))
1132 }
1133 }
1134
1135 pub async fn publish_data_blob(
1137 &self,
1138 path: &Path,
1139 chain_id: Option<ChainId>,
1140 ) -> Result<CryptoHash> {
1141 let mut command = self.command().await?;
1142 command.arg("publish-data-blob").arg(path);
1143 if let Some(chain_id) = chain_id {
1144 command.arg(chain_id.to_string());
1145 }
1146 let stdout = command.spawn_and_wait_for_stdout().await?;
1147 let stdout = stdout.trim();
1148 Ok(CryptoHash::from_str(stdout)?)
1149 }
1150
1151 pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
1153 let mut command = self.command().await?;
1154 command.arg("read-data-blob").arg(hash.to_string());
1155 if let Some(chain_id) = chain_id {
1156 command.arg(chain_id.to_string());
1157 }
1158 command.spawn_and_wait_for_stdout().await?;
1159 Ok(())
1160 }
1161
1162 pub fn load_wallet(&self) -> Result<Wallet> {
1163 Ok(Wallet::read(&self.wallet_path())?)
1164 }
1165
1166 pub fn load_keystore(&self) -> Result<InMemorySigner> {
1167 util::read_json(self.keystore_path())
1168 }
1169
1170 pub fn wallet_path(&self) -> PathBuf {
1171 self.path_provider.path().join(&self.wallet)
1172 }
1173
1174 pub fn keystore_path(&self) -> PathBuf {
1175 self.path_provider.path().join(&self.keystore)
1176 }
1177
1178 pub fn storage_path(&self) -> &str {
1179 &self.storage
1180 }
1181
1182 pub fn get_owner(&self) -> Option<AccountOwner> {
1183 let wallet = self.load_wallet().ok()?;
1184 wallet
1185 .get(wallet.default_chain()?)
1186 .expect("default chain must be in wallet")
1187 .owner
1188 }
1189
1190 pub fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
1191 self.load_wallet()
1192 .ok()
1193 .is_some_and(|wallet| wallet.get(chain).is_some())
1194 }
1195
1196 pub async fn set_validator(
1197 &self,
1198 validator_key: &(String, String),
1199 port: usize,
1200 votes: usize,
1201 ) -> Result<()> {
1202 let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1203 self.command()
1204 .await?
1205 .arg("validator")
1206 .arg("add")
1207 .args(["--public-key", &validator_key.0])
1208 .args(["--account-key", &validator_key.1])
1209 .args(["--address", &address])
1210 .args(["--votes", &votes.to_string()])
1211 .spawn_and_wait_for_stdout()
1212 .await?;
1213 Ok(())
1214 }
1215
1216 pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
1217 self.command()
1218 .await?
1219 .arg("validator")
1220 .arg("remove")
1221 .args(["--public-key", validator_key])
1222 .spawn_and_wait_for_stdout()
1223 .await?;
1224 Ok(())
1225 }
1226
1227 pub async fn change_validators(
1228 &self,
1229 add_validators: &[(String, String, usize, usize)], modify_validators: &[(String, String, usize, usize)], remove_validators: &[String],
1232 ) -> Result<()> {
1233 use std::str::FromStr;
1234
1235 use linera_base::crypto::{AccountPublicKey, ValidatorPublicKey};
1236
1237 let mut changes = std::collections::HashMap::new();
1240
1241 for (public_key_str, account_key_str, port, votes) in
1243 add_validators.iter().chain(modify_validators.iter())
1244 {
1245 let public_key = ValidatorPublicKey::from_str(public_key_str)
1246 .with_context(|| format!("Invalid validator public key: {public_key_str}"))?;
1247
1248 let account_key = AccountPublicKey::from_str(account_key_str)
1249 .with_context(|| format!("Invalid account public key: {account_key_str}"))?;
1250
1251 let address = format!("{}:127.0.0.1:{}", self.network.short(), port)
1252 .parse()
1253 .unwrap();
1254
1255 let change = crate::cli::validator::Change {
1257 account_key,
1258 address,
1259 votes: crate::cli::validator::Votes(
1260 std::num::NonZero::new(*votes as u64).context("Votes must be non-zero")?,
1261 ),
1262 };
1263
1264 changes.insert(public_key, Some(change));
1265 }
1266
1267 for validator_key_str in remove_validators {
1269 let public_key = ValidatorPublicKey::from_str(validator_key_str)
1270 .with_context(|| format!("Invalid validator public key: {validator_key_str}"))?;
1271 changes.insert(public_key, None);
1272 }
1273
1274 let temp_file = tempfile::NamedTempFile::new()
1276 .context("Failed to create temporary file for validator changes")?;
1277 serde_json::to_writer(&temp_file, &changes)
1278 .context("Failed to write validator changes to file")?;
1279 let temp_path = temp_file.path();
1280
1281 self.command()
1282 .await?
1283 .arg("validator")
1284 .arg("update")
1285 .arg(temp_path)
1286 .arg("--yes") .spawn_and_wait_for_stdout()
1288 .await?;
1289
1290 Ok(())
1291 }
1292
1293 pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
1294 self.command()
1295 .await?
1296 .arg("revoke-epochs")
1297 .arg(epoch.to_string())
1298 .spawn_and_wait_for_stdout()
1299 .await?;
1300 Ok(())
1301 }
1302
1303 pub async fn set_resource_control_policy(
1304 &self,
1305 overrides: ResourceControlPolicyOverrides,
1306 ) -> Result<()> {
1307 let mut command = self.command().await?;
1308 command.arg("resource-control-policy");
1309 let ResourceControlPolicyOverrides {
1310 wasm_fuel_unit,
1311 evm_fuel_unit,
1312 read_operation,
1313 write_operation,
1314 byte_runtime,
1315 byte_read,
1316 byte_written,
1317 blob_read,
1318 blob_published,
1319 blob_byte_read,
1320 blob_byte_published,
1321 operation,
1322 operation_byte,
1323 message,
1324 message_byte,
1325 service_as_oracle_query,
1326 http_request,
1327 maximum_wasm_fuel_per_block,
1328 maximum_evm_fuel_per_block,
1329 maximum_service_oracle_execution_ms,
1330 maximum_block_size,
1331 maximum_blob_size,
1332 maximum_published_blobs,
1333 maximum_bytecode_size,
1334 maximum_block_proposal_size,
1335 maximum_bytes_read_per_block,
1336 maximum_bytes_written_per_block,
1337 maximum_oracle_response_bytes,
1338 maximum_http_response_bytes,
1339 http_request_timeout_ms,
1340 http_request_allow_list,
1341 free_application_ids,
1342 flags,
1343 } = overrides;
1344 if let Some(value) = wasm_fuel_unit {
1345 command.args(["--wasm-fuel-unit", &value.to_string()]);
1346 }
1347 if let Some(value) = evm_fuel_unit {
1348 command.args(["--evm-fuel-unit", &value.to_string()]);
1349 }
1350 if let Some(value) = read_operation {
1351 command.args(["--read-operation", &value.to_string()]);
1352 }
1353 if let Some(value) = write_operation {
1354 command.args(["--write-operation", &value.to_string()]);
1355 }
1356 if let Some(value) = byte_runtime {
1357 command.args(["--byte-runtime", &value.to_string()]);
1358 }
1359 if let Some(value) = byte_read {
1360 command.args(["--byte-read", &value.to_string()]);
1361 }
1362 if let Some(value) = byte_written {
1363 command.args(["--byte-written", &value.to_string()]);
1364 }
1365 if let Some(value) = blob_read {
1366 command.args(["--blob-read", &value.to_string()]);
1367 }
1368 if let Some(value) = blob_published {
1369 command.args(["--blob-published", &value.to_string()]);
1370 }
1371 if let Some(value) = blob_byte_read {
1372 command.args(["--blob-byte-read", &value.to_string()]);
1373 }
1374 if let Some(value) = blob_byte_published {
1375 command.args(["--blob-byte-published", &value.to_string()]);
1376 }
1377 if let Some(value) = operation {
1378 command.args(["--operation", &value.to_string()]);
1379 }
1380 if let Some(value) = operation_byte {
1381 command.args(["--operation-byte", &value.to_string()]);
1382 }
1383 if let Some(value) = message {
1384 command.args(["--message", &value.to_string()]);
1385 }
1386 if let Some(value) = message_byte {
1387 command.args(["--message-byte", &value.to_string()]);
1388 }
1389 if let Some(value) = service_as_oracle_query {
1390 command.args(["--service-as-oracle-query", &value.to_string()]);
1391 }
1392 if let Some(value) = http_request {
1393 command.args(["--http-request", &value.to_string()]);
1394 }
1395 if let Some(value) = maximum_wasm_fuel_per_block {
1396 command.args(["--maximum-wasm-fuel-per-block", &value.to_string()]);
1397 }
1398 if let Some(value) = maximum_evm_fuel_per_block {
1399 command.args(["--maximum-evm-fuel-per-block", &value.to_string()]);
1400 }
1401 if let Some(value) = maximum_service_oracle_execution_ms {
1402 command.args(["--maximum-service-oracle-execution-ms", &value.to_string()]);
1403 }
1404 if let Some(value) = maximum_block_size {
1405 command.args(["--maximum-block-size", &value.to_string()]);
1406 }
1407 if let Some(value) = maximum_blob_size {
1408 command.args(["--maximum-blob-size", &value.to_string()]);
1409 }
1410 if let Some(value) = maximum_published_blobs {
1411 command.args(["--maximum-published-blobs", &value.to_string()]);
1412 }
1413 if let Some(value) = maximum_bytecode_size {
1414 command.args(["--maximum-bytecode-size", &value.to_string()]);
1415 }
1416 if let Some(value) = maximum_block_proposal_size {
1417 command.args(["--maximum-block-proposal-size", &value.to_string()]);
1418 }
1419 if let Some(value) = maximum_bytes_read_per_block {
1420 command.args(["--maximum-bytes-read-per-block", &value.to_string()]);
1421 }
1422 if let Some(value) = maximum_bytes_written_per_block {
1423 command.args(["--maximum-bytes-written-per-block", &value.to_string()]);
1424 }
1425 if let Some(value) = maximum_oracle_response_bytes {
1426 command.args(["--maximum-oracle-response-bytes", &value.to_string()]);
1427 }
1428 if let Some(value) = maximum_http_response_bytes {
1429 command.args(["--maximum-http-response-bytes", &value.to_string()]);
1430 }
1431 if let Some(value) = http_request_timeout_ms {
1432 command.args(["--http-request-timeout-ms", &value.to_string()]);
1433 }
1434 if let Some(values) = http_request_allow_list {
1435 command.args(["--http-request-allow-list", &values.join(",")]);
1436 }
1437 if let Some(values) = free_application_ids {
1438 command.args(["--free-application-ids", &values.join(",")]);
1439 }
1440 if let Some(values) = flags {
1441 command.args(["--flags", &values.join(",")]);
1442 }
1443 command.spawn_and_wait_for_stdout().await?;
1444 Ok(())
1445 }
1446
1447 pub async fn keygen(&self) -> Result<AccountOwner> {
1449 let stdout = self
1450 .command()
1451 .await?
1452 .arg("keygen")
1453 .spawn_and_wait_for_stdout()
1454 .await?;
1455 AccountOwner::from_str(stdout.as_str().trim())
1456 }
1457
1458 pub fn default_chain(&self) -> Option<ChainId> {
1460 self.load_wallet().ok()?.default_chain()
1461 }
1462
1463 pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
1465 let _stdout = self
1466 .command()
1467 .await?
1468 .arg("assign")
1469 .args(["--owner", &owner.to_string()])
1470 .args(["--chain-id", &chain_id.to_string()])
1471 .spawn_and_wait_for_stdout()
1472 .await?;
1473 Ok(())
1474 }
1475
1476 pub async fn set_preferred_owner(
1478 &self,
1479 chain_id: ChainId,
1480 owner: Option<AccountOwner>,
1481 ) -> Result<()> {
1482 let mut owner_arg = vec!["--owner".to_string()];
1483 if let Some(owner) = owner {
1484 owner_arg.push(owner.to_string());
1485 };
1486 self.command()
1487 .await?
1488 .arg("set-preferred-owner")
1489 .args(["--chain-id", &chain_id.to_string()])
1490 .args(owner_arg)
1491 .spawn_and_wait_for_stdout()
1492 .await?;
1493 Ok(())
1494 }
1495
1496 pub async fn build_application(
1497 &self,
1498 path: &Path,
1499 name: &str,
1500 is_workspace: bool,
1501 ) -> Result<(PathBuf, PathBuf)> {
1502 Command::new("cargo")
1503 .current_dir(self.path_provider.path())
1504 .arg("build")
1505 .arg("--release")
1506 .args(["--target", "wasm32-unknown-unknown"])
1507 .arg("--manifest-path")
1508 .arg(path.join("Cargo.toml"))
1509 .spawn_and_wait_for_stdout()
1510 .await?;
1511
1512 let release_dir = match is_workspace {
1513 true => path.join("../target/wasm32-unknown-unknown/release"),
1514 false => path.join("target/wasm32-unknown-unknown/release"),
1515 };
1516
1517 let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1518 let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1519
1520 let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1521 let service_size = fs_err::tokio::metadata(&service).await?.len();
1522 tracing::info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1523
1524 Ok((contract, service))
1525 }
1526}
1527
1528impl Drop for ClientWrapper {
1529 fn drop(&mut self) {
1530 use std::process::Command as SyncCommand;
1531
1532 if self.on_drop != OnClientDrop::CloseChains {
1533 return;
1534 }
1535
1536 let Ok(binary_path) = self.binary_path.lock() else {
1537 tracing::error!(
1538 "Failed to close chains because a thread panicked with a lock to `binary_path`"
1539 );
1540 return;
1541 };
1542
1543 let Some(binary_path) = binary_path.as_ref() else {
1544 tracing::warn!(
1545 "Assuming no chains need to be closed, because the command binary was never \
1546 resolved and therefore presumably never called"
1547 );
1548 return;
1549 };
1550
1551 let working_directory = self.path_provider.path();
1552 let mut wallet_show_command = SyncCommand::new(binary_path);
1553
1554 for argument in self.command_arguments() {
1555 wallet_show_command.arg(&*argument);
1556 }
1557
1558 let Ok(wallet_show_output) = wallet_show_command
1559 .current_dir(working_directory)
1560 .args(["wallet", "show", "--short", "--owned"])
1561 .output()
1562 else {
1563 tracing::warn!("Failed to execute `wallet show --short` to list chains to close");
1564 return;
1565 };
1566
1567 if !wallet_show_output.status.success() {
1568 tracing::warn!("Failed to list chains in the wallet to close them");
1569 return;
1570 }
1571
1572 let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1573 tracing::warn!(
1574 "Failed to close chains because `linera wallet show --short` \
1575 returned a non-UTF-8 output"
1576 );
1577 return;
1578 };
1579
1580 let chain_ids = chain_list_string
1581 .split('\n')
1582 .map(|line| line.trim())
1583 .filter(|line| !line.is_empty());
1584
1585 for chain_id in chain_ids {
1586 let mut close_chain_command = SyncCommand::new(binary_path);
1587
1588 for argument in self.command_arguments() {
1589 close_chain_command.arg(&*argument);
1590 }
1591
1592 close_chain_command.current_dir(working_directory);
1593
1594 match close_chain_command.args(["close-chain", chain_id]).status() {
1595 Ok(status) if status.success() => (),
1596 Ok(failure) => tracing::warn!("Failed to close chain {chain_id}: {failure}"),
1597 Err(error) => tracing::warn!("Failed to close chain {chain_id}: {error}"),
1598 }
1599 }
1600 }
1601}
1602
1603#[cfg(with_testing)]
1604impl ClientWrapper {
1605 pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1606 self.build_application(Self::example_path(name)?.as_path(), name, true)
1607 .await
1608 }
1609
1610 pub async fn build_test_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1611 self.build_application(Self::test_example_path(name)?.as_path(), name, true)
1612 .await
1613 }
1614
1615 pub fn example_path(name: &str) -> Result<PathBuf> {
1616 Ok(env::current_dir()?.join("../examples/").join(name))
1617 }
1618
1619 pub fn test_example_path(name: &str) -> Result<PathBuf> {
1620 Ok(env::current_dir()?
1621 .join("../linera-sdk/tests/fixtures/")
1622 .join(name))
1623 }
1624}
1625
1626fn truncate_query_output(input: &str) -> String {
1627 let max_len = 1000;
1628 if input.len() < max_len {
1629 input.to_string()
1630 } else {
1631 format!("{} ...", input.get(..max_len).unwrap())
1632 }
1633}
1634
1635fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1636 let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1637 let max_len = 200;
1638 if query.len() < max_len {
1639 query
1640 } else {
1641 format!("{} ...", query.get(..max_len).unwrap())
1642 }
1643}
1644
1645fn log_unexpected_exit(child: &mut Child, service_kind: &str, port: u16) {
1649 match child.try_wait() {
1650 Ok(Some(status)) => {
1651 #[cfg(unix)]
1652 {
1653 use std::os::unix::process::ExitStatusExt as _;
1654 if let Some(signal) = status.signal() {
1655 tracing::error!(
1656 port,
1657 signal,
1658 "The {service_kind} service was killed by signal {signal}",
1659 );
1660 return;
1661 }
1662 }
1663 if !status.success() {
1664 tracing::error!(
1665 port,
1666 %status,
1667 "The {service_kind} service exited unexpectedly with {status}",
1668 );
1669 }
1670 }
1671 Ok(None) => {} Err(error) => {
1673 tracing::warn!(
1674 port,
1675 %error,
1676 "Failed to check {service_kind} service status",
1677 );
1678 }
1679 }
1680}
1681
1682pub struct NodeService {
1683 port: u16,
1684 child: Child,
1685 terminated: bool,
1686}
1687
1688impl Drop for NodeService {
1689 fn drop(&mut self) {
1690 if !self.terminated {
1691 log_unexpected_exit(&mut self.child, "node", self.port);
1692 }
1693 }
1694}
1695
1696impl NodeService {
1697 fn new(port: u16, child: Child) -> Self {
1698 Self {
1699 port,
1700 child,
1701 terminated: false,
1702 }
1703 }
1704
1705 pub async fn terminate(mut self) -> Result<()> {
1706 self.terminated = true;
1707 self.child.kill().await.context("terminating node service")
1708 }
1709
1710 pub fn port(&self) -> u16 {
1711 self.port
1712 }
1713
1714 pub fn ensure_is_running(&mut self) -> Result<()> {
1715 self.child.ensure_is_running()
1716 }
1717
1718 pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1719 let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1720 let mut data = self.query_node(query).await?;
1721 Ok(serde_json::from_value(data["processInbox"].take())?)
1722 }
1723
1724 pub async fn sync(&self, chain_id: &ChainId) -> Result<u64> {
1725 let query = format!("mutation {{ sync(chainId: \"{chain_id}\") }}");
1726 let mut data = self.query_node(query).await?;
1727 Ok(serde_json::from_value(data["sync"].take())?)
1728 }
1729
1730 pub async fn transfer(
1731 &self,
1732 chain_id: ChainId,
1733 owner: AccountOwner,
1734 recipient: Account,
1735 amount: Amount,
1736 ) -> Result<CryptoHash> {
1737 let json_owner = owner.to_value();
1738 let json_recipient = recipient.to_value();
1739 let query = format!(
1740 "mutation {{ transfer(\
1741 chainId: \"{chain_id}\", \
1742 owner: {json_owner}, \
1743 recipient: {json_recipient}, \
1744 amount: \"{amount}\") \
1745 }}"
1746 );
1747 let data = self.query_node(query).await?;
1748 serde_json::from_value(data["transfer"].clone())
1749 .context("missing transfer field in response")
1750 }
1751
1752 pub async fn balance(&self, account: &Account) -> Result<Amount> {
1753 let chain = account.chain_id;
1754 let owner = account.owner;
1755 if matches!(owner, AccountOwner::CHAIN) {
1756 let query = format!(
1757 "query {{ chain(chainId:\"{chain}\") {{
1758 executionState {{ system {{ balance }} }}
1759 }} }}"
1760 );
1761 let response = self.query_node(query).await?;
1762 let balance = &response["chain"]["executionState"]["system"]["balance"]
1763 .as_str()
1764 .unwrap();
1765 return Ok(Amount::from_str(balance)?);
1766 }
1767 let query = format!(
1768 "query {{ chain(chainId:\"{chain}\") {{
1769 executionState {{ system {{ balances {{
1770 entry(key:\"{owner}\") {{ value }}
1771 }} }} }}
1772 }} }}"
1773 );
1774 let response = self.query_node(query).await?;
1775 let balances = &response["chain"]["executionState"]["system"]["balances"];
1776 let balance = balances["entry"]["value"].as_str();
1777 match balance {
1778 None => Ok(Amount::ZERO),
1779 Some(amount) => Ok(Amount::from_str(amount)?),
1780 }
1781 }
1782
1783 pub fn make_application<A: ContractAbi>(
1784 &self,
1785 chain_id: &ChainId,
1786 application_id: &ApplicationId<A>,
1787 ) -> Result<ApplicationWrapper<A>> {
1788 let application_id = application_id.forget_abi().to_string();
1789 let link = format!(
1790 "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1791 self.port
1792 );
1793 Ok(ApplicationWrapper::from(link))
1794 }
1795
1796 pub async fn publish_data_blob(
1797 &self,
1798 chain_id: &ChainId,
1799 bytes: Vec<u8>,
1800 ) -> Result<CryptoHash> {
1801 let query = format!(
1802 "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1803 chain_id.to_value(),
1804 bytes.to_value(),
1805 );
1806 let data = self.query_node(query).await?;
1807 serde_json::from_value(data["publishDataBlob"].clone())
1808 .context("missing publishDataBlob field in response")
1809 }
1810
1811 pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1812 &self,
1813 chain_id: &ChainId,
1814 contract: PathBuf,
1815 service: PathBuf,
1816 vm_runtime: VmRuntime,
1817 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1818 let contract_code = Bytecode::load_from_file(&contract).await?;
1819 let service_code = Bytecode::load_from_file(&service).await?;
1820 let query = format!(
1821 "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1822 chain_id.to_value(),
1823 contract_code.to_value(),
1824 service_code.to_value(),
1825 vm_runtime.to_value(),
1826 );
1827 let data = self.query_node(query).await?;
1828 let module_str = data["publishModule"]
1829 .as_str()
1830 .context("module ID not found")?;
1831 let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1832 Ok(module_id.with_abi())
1833 }
1834
1835 pub async fn query_committee_hash(&self, chain_id: &ChainId) -> Result<Option<CryptoHash>> {
1836 let query = format!(
1837 "query {{ chain(chainId:\"{chain_id}\") {{
1838 executionState {{ system {{ committeeHash }} }}
1839 }} }}"
1840 );
1841 let mut response = self.query_node(query).await?;
1842 let hash = response["chain"]["executionState"]["system"]["committeeHash"].take();
1843 Ok(serde_json::from_value(hash)?)
1844 }
1845
1846 pub async fn query_chain_epoch(&self, chain_id: &ChainId) -> Result<Epoch> {
1847 let query = format!(
1848 "query {{ chain(chainId:\"{chain_id}\") {{
1849 executionState {{ system {{ epoch }} }}
1850 }} }}"
1851 );
1852 let mut response = self.query_node(query).await?;
1853 let epoch = response["chain"]["executionState"]["system"]["epoch"].take();
1854 Ok(serde_json::from_value(epoch)?)
1855 }
1856
1857 pub async fn events_from_index(
1858 &self,
1859 chain_id: &ChainId,
1860 stream_id: &StreamId,
1861 start_index: u32,
1862 ) -> Result<Vec<IndexAndEvent>> {
1863 let query = format!(
1864 "query {{
1865 eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1866 {{ index event }}
1867 }}",
1868 stream_id.to_value()
1869 );
1870 let mut response = self.query_node(query).await?;
1871 let response = response["eventsFromIndex"].take();
1872 Ok(serde_json::from_value(response)?)
1873 }
1874
1875 pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1876 let n_try = 5;
1877 let query = query.as_ref();
1878 for i in 0..n_try {
1879 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1880 let url = format!("http://localhost:{}/", self.port);
1881 let client = reqwest_client();
1882 let result = client
1883 .post(url)
1884 .json(&json!({ "query": query }))
1885 .send()
1886 .await;
1887 if matches!(result, Err(ref error) if error.is_timeout()) {
1888 tracing::warn!(
1889 "Timeout when sending query {} to the node service",
1890 truncate_query_output(query)
1891 );
1892 continue;
1893 }
1894 let response = result.with_context(|| {
1895 format!(
1896 "query_node: failed to post query={}",
1897 truncate_query_output(query)
1898 )
1899 })?;
1900 ensure!(
1901 response.status().is_success(),
1902 "Query \"{}\" failed: {}",
1903 truncate_query_output(query),
1904 response
1905 .text()
1906 .await
1907 .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1908 );
1909 let value: Value = response.json().await.context("invalid JSON")?;
1910 if let Some(errors) = value.get("errors") {
1911 tracing::warn!(
1912 "Query \"{}\" failed: {}",
1913 truncate_query_output(query),
1914 errors
1915 );
1916 } else {
1917 return Ok(value["data"].clone());
1918 }
1919 }
1920 bail!(
1921 "Query \"{}\" failed after {} retries.",
1922 truncate_query_output(query),
1923 n_try
1924 );
1925 }
1926
1927 pub async fn create_application<
1928 Abi: ContractAbi,
1929 Parameters: Serialize,
1930 InstantiationArgument: Serialize,
1931 >(
1932 &self,
1933 chain_id: &ChainId,
1934 module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1935 parameters: &Parameters,
1936 argument: &InstantiationArgument,
1937 required_application_ids: &[ApplicationId],
1938 ) -> Result<ApplicationId<Abi>> {
1939 let module_id = module_id.forget_abi();
1940 let json_required_applications_ids = required_application_ids
1941 .iter()
1942 .map(ApplicationId::to_string)
1943 .collect::<Vec<_>>()
1944 .to_value();
1945 let new_parameters = serde_json::to_value(parameters)
1947 .context("could not create parameters JSON")?
1948 .to_value();
1949 let new_argument = serde_json::to_value(argument)
1950 .context("could not create argument JSON")?
1951 .to_value();
1952 let query = format!(
1953 "mutation {{ createApplication(\
1954 chainId: \"{chain_id}\",
1955 moduleId: \"{module_id}\", \
1956 parameters: {new_parameters}, \
1957 instantiationArgument: {new_argument}, \
1958 requiredApplicationIds: {json_required_applications_ids}) \
1959 }}"
1960 );
1961 let data = self.query_node(query).await?;
1962 let app_id_str = data["createApplication"]
1963 .as_str()
1964 .context("missing createApplication string in response")?
1965 .trim();
1966 Ok(app_id_str
1967 .parse::<ApplicationId>()
1968 .context("invalid application ID")?
1969 .with_abi())
1970 }
1971
1972 pub async fn chain_tip(&self, chain: ChainId) -> Result<Option<(CryptoHash, BlockHeight)>> {
1974 let query = format!(
1975 r#"query {{ block(chainId: "{chain}") {{
1976 hash
1977 block {{ header {{ height }} }}
1978 }} }}"#
1979 );
1980
1981 let mut response = self.query_node(&query).await?;
1982
1983 match (
1984 mem::take(&mut response["block"]["hash"]),
1985 mem::take(&mut response["block"]["block"]["header"]["height"]),
1986 ) {
1987 (Value::Null, Value::Null) => Ok(None),
1988 (Value::String(hash), Value::Number(height)) => Ok(Some((
1989 hash.parse()
1990 .context("Received an invalid hash {hash:?} for chain tip")?,
1991 BlockHeight(height.as_u64().unwrap()),
1992 ))),
1993 invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
1994 }
1995 }
1996
1997 pub async fn notifications(
1999 &self,
2000 chain_id: ChainId,
2001 ) -> Result<Pin<Box<impl Stream<Item = Result<Notification>>>>> {
2002 let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
2003 let url = format!("ws://localhost:{}/ws", self.port);
2004 let mut request = url.into_client_request()?;
2005 request.headers_mut().insert(
2006 "Sec-WebSocket-Protocol",
2007 HeaderValue::from_str("graphql-transport-ws")?,
2008 );
2009 let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
2010 let init_json = json!({
2011 "type": "connection_init",
2012 "payload": {}
2013 });
2014 websocket.send(init_json.to_string().into()).await?;
2015 let text = websocket
2016 .next()
2017 .await
2018 .context("Failed to establish connection")??
2019 .into_text()?;
2020 ensure!(
2021 text == "{\"type\":\"connection_ack\"}",
2022 "Unexpected response: {text}"
2023 );
2024 let query_json = json!({
2025 "id": "1",
2026 "type": "start",
2027 "payload": {
2028 "query": query,
2029 "variables": {},
2030 "operationName": null
2031 }
2032 });
2033 websocket.send(query_json.to_string().into()).await?;
2034 Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
2035 |message| async {
2036 let text = message.into_text()?;
2037 let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
2038 if let Some(errors) = value["payload"].get("errors") {
2039 bail!("Notification subscription failed: {errors:?}");
2040 }
2041 serde_json::from_value(value["payload"]["data"]["notifications"].clone())
2042 .context("Failed to deserialize notification")
2043 },
2044 )))
2045 }
2046
2047 pub async fn query_result(
2049 &self,
2050 name: &str,
2051 chain_id: ChainId,
2052 application_id: &ApplicationId,
2053 ) -> Result<Pin<Box<impl Stream<Item = Result<Value>>>>> {
2054 let query = format!(
2055 r#"subscription {{ queryResult(name: "{name}", chainId: "{chain_id}", applicationId: "{application_id}") }}"#,
2056 );
2057 let url = format!("ws://localhost:{}/ws", self.port);
2058 let mut request = url.into_client_request()?;
2059 request.headers_mut().insert(
2060 "Sec-WebSocket-Protocol",
2061 HeaderValue::from_str("graphql-transport-ws")?,
2062 );
2063 let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
2064 let init_json = json!({
2065 "type": "connection_init",
2066 "payload": {}
2067 });
2068 websocket.send(init_json.to_string().into()).await?;
2069 let text = websocket
2070 .next()
2071 .await
2072 .context("Failed to establish connection")??
2073 .into_text()?;
2074 ensure!(
2075 text == "{\"type\":\"connection_ack\"}",
2076 "Unexpected response: {text}"
2077 );
2078 let query_json = json!({
2079 "id": "1",
2080 "type": "start",
2081 "payload": {
2082 "query": query,
2083 "variables": {},
2084 "operationName": null
2085 }
2086 });
2087 websocket.send(query_json.to_string().into()).await?;
2088 Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
2089 |message| async {
2090 let text = message.into_text()?;
2091 let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
2092 if let Some(errors) = value["payload"].get("errors") {
2093 bail!("Query result subscription failed: {errors:?}");
2094 }
2095 Ok(value["payload"]["data"]["queryResult"].clone())
2096 },
2097 )))
2098 }
2099}
2100
2101pub struct FaucetService {
2103 port: u16,
2104 child: Child,
2105 _temp_dir: tempfile::TempDir,
2106 terminated: bool,
2107}
2108
2109impl Drop for FaucetService {
2110 fn drop(&mut self) {
2111 if !self.terminated {
2112 log_unexpected_exit(&mut self.child, "faucet", self.port);
2113 }
2114 }
2115}
2116
2117impl FaucetService {
2118 fn new(port: u16, child: Child, temp_dir: tempfile::TempDir) -> Self {
2119 Self {
2120 port,
2121 child,
2122 _temp_dir: temp_dir,
2123 terminated: false,
2124 }
2125 }
2126
2127 pub async fn terminate(mut self) -> Result<()> {
2128 self.terminated = true;
2129 self.child
2130 .kill()
2131 .await
2132 .context("terminating faucet service")
2133 }
2134
2135 pub fn ensure_is_running(&mut self) -> Result<()> {
2136 self.child.ensure_is_running()
2137 }
2138
2139 pub fn instance(&self) -> Faucet {
2140 Faucet::new(format!("http://localhost:{}/", self.port))
2141 }
2142}
2143
2144pub struct ApplicationWrapper<A> {
2146 uri: String,
2147 _phantom: PhantomData<A>,
2148}
2149
2150impl<A> ApplicationWrapper<A> {
2151 pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
2152 let query = query.as_ref();
2153 let value = self.run_json_query(json!({ "query": query })).await?;
2154 Ok(value["data"].clone())
2155 }
2156
2157 pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
2158 const MAX_RETRIES: usize = 5;
2159
2160 for i in 0.. {
2161 let client = reqwest_client();
2162 let result = client.post(&self.uri).json(&query).send().await;
2163 let response = match result {
2164 Ok(response) => response,
2165 Err(error) if i < MAX_RETRIES => {
2166 tracing::warn!(
2167 "Failed to post query \"{}\": {error}; retrying",
2168 truncate_query_output_serialize(&query),
2169 );
2170 continue;
2171 }
2172 Err(error) => {
2173 let query = truncate_query_output_serialize(&query);
2174 return Err(error)
2175 .with_context(|| format!("run_json_query: failed to post query={query}"));
2176 }
2177 };
2178 ensure!(
2179 response.status().is_success(),
2180 "Query \"{}\" failed: {}",
2181 truncate_query_output_serialize(&query),
2182 response
2183 .text()
2184 .await
2185 .unwrap_or_else(|error| format!("Could not get response text: {error}"))
2186 );
2187 let value: Value = response.json().await.context("invalid JSON")?;
2188 if let Some(errors) = value.get("errors") {
2189 bail!(
2190 "Query \"{}\" failed: {}",
2191 truncate_query_output_serialize(&query),
2192 errors
2193 );
2194 }
2195 return Ok(value);
2196 }
2197 unreachable!()
2198 }
2199
2200 pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
2201 let query = query.as_ref();
2202 self.run_graphql_query(&format!("query {{ {query} }}"))
2203 .await
2204 }
2205
2206 pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
2207 let query = query.as_ref().trim();
2208 let name = query
2209 .split_once(|ch: char| !ch.is_alphanumeric())
2210 .map_or(query, |(name, _)| name);
2211 let data = self.query(query).await?;
2212 serde_json::from_value(data[name].clone())
2213 .with_context(|| format!("{name} field missing in response"))
2214 }
2215
2216 pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
2217 let mutation = mutation.as_ref();
2218 self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
2219 .await
2220 }
2221
2222 pub async fn multiple_mutate(&self, mutations: &[String]) -> Result<Value> {
2223 let mut out = String::from("mutation {\n");
2224 for (index, mutation) in mutations.iter().enumerate() {
2225 out = format!("{out} u{index}: {mutation}\n");
2226 }
2227 out.push_str("}\n");
2228 self.run_graphql_query(&out).await
2229 }
2230}
2231
2232impl<A> From<String> for ApplicationWrapper<A> {
2233 fn from(uri: String) -> ApplicationWrapper<A> {
2234 ApplicationWrapper {
2235 uri,
2236 _phantom: PhantomData,
2237 }
2238 }
2239}
2240
2241#[cfg(with_testing)]
2244fn notification_timeout() -> Duration {
2245 const NOTIFICATION_TIMEOUT_MS_ENV: &str = "LINERA_TEST_NOTIFICATION_TIMEOUT_MS";
2246 const NOTIFICATION_TIMEOUT_MS_DEFAULT: u64 = 10_000;
2247
2248 match env::var(NOTIFICATION_TIMEOUT_MS_ENV) {
2249 Ok(var) => Duration::from_millis(var.parse().unwrap_or_else(|error| {
2250 panic!("{NOTIFICATION_TIMEOUT_MS_ENV} is not a valid number: {error}")
2251 })),
2252 Err(env::VarError::NotPresent) => Duration::from_millis(NOTIFICATION_TIMEOUT_MS_DEFAULT),
2253 Err(env::VarError::NotUnicode(_)) => {
2254 panic!("{NOTIFICATION_TIMEOUT_MS_ENV} must be valid Unicode")
2255 }
2256 }
2257}
2258
2259#[cfg(with_testing)]
2260pub trait NotificationsExt {
2261 fn wait_for<T>(
2263 &mut self,
2264 f: impl FnMut(Notification) -> Option<T>,
2265 ) -> impl Future<Output = Result<T>>;
2266
2267 fn wait_for_events(
2270 &mut self,
2271 expected_height: impl Into<Option<BlockHeight>>,
2272 ) -> impl Future<Output = Result<BTreeSet<StreamId>>> {
2273 let expected_height = expected_height.into();
2274 self.wait_for(move |notification| {
2275 if let Reason::NewEvents {
2276 height,
2277 event_streams,
2278 ..
2279 } = notification.reason
2280 {
2281 if expected_height.is_none_or(|h| h == height) {
2282 return Some(event_streams);
2283 }
2284 }
2285 None
2286 })
2287 }
2288
2289 fn wait_for_block(
2292 &mut self,
2293 expected_height: impl Into<Option<BlockHeight>>,
2294 ) -> impl Future<Output = Result<CryptoHash>> {
2295 let expected_height = expected_height.into();
2296 self.wait_for(move |notification| {
2297 if let Reason::NewBlock { height, hash, .. } = notification.reason {
2298 if expected_height.is_none_or(|h| h == height) {
2299 return Some(hash);
2300 }
2301 }
2302 None
2303 })
2304 }
2305
2306 fn wait_for_bundle(
2309 &mut self,
2310 expected_origin: ChainId,
2311 expected_height: impl Into<Option<BlockHeight>>,
2312 ) -> impl Future<Output = Result<()>> {
2313 let expected_height = expected_height.into();
2314 self.wait_for(move |notification| {
2315 if let Reason::NewIncomingBundle { height, origin } = notification.reason {
2316 if expected_height.is_none_or(|h| h == height) && origin == expected_origin {
2317 return Some(());
2318 }
2319 }
2320 None
2321 })
2322 }
2323}
2324
2325#[cfg(with_testing)]
2326impl<S: Stream<Item = Result<Notification>>> NotificationsExt for Pin<Box<S>> {
2327 async fn wait_for<T>(&mut self, mut f: impl FnMut(Notification) -> Option<T>) -> Result<T> {
2328 let mut timeout = Box::pin(linera_base::time::timer::sleep(notification_timeout())).fuse();
2329 loop {
2330 let notification = futures::select! {
2331 () = timeout => bail!("Timeout waiting for notification"),
2332 notification = self.next().fuse() => notification.context("Stream closed")??,
2333 };
2334 if let Some(t) = f(notification) {
2335 return Ok(t);
2336 }
2337 }
2338 }
2339}