1use std::{
5 borrow::Cow,
6 collections::BTreeMap,
7 env,
8 marker::PhantomData,
9 mem,
10 path::{Path, PathBuf},
11 pin::Pin,
12 process::Stdio,
13 str::FromStr,
14 sync,
15 time::Duration,
16};
17
18use anyhow::{bail, ensure, Context, Result};
19use async_graphql::InputType;
20use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
21use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
22use heck::ToKebabCase;
23use linera_base::{
24 abi::ContractAbi,
25 command::{resolve_binary, CommandExt},
26 crypto::{CryptoHash, InMemorySigner},
27 data_types::{Amount, ApplicationPermissions, BlockHeight, Bytecode, Epoch},
28 identifiers::{
29 Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
30 },
31 vm::VmRuntime,
32};
33use linera_client::client_options::ResourceControlPolicyConfig;
34use linera_core::worker::Notification;
35use linera_faucet_client::Faucet;
36use serde::{de::DeserializeOwned, ser::Serialize};
37use serde_command_opts::to_args;
38use serde_json::{json, Value};
39use tempfile::TempDir;
40use tokio::{
41 io::{AsyncBufReadExt, BufReader},
42 process::{Child, Command},
43 sync::oneshot,
44 task::JoinHandle,
45};
46#[cfg(with_testing)]
47use {
48 futures::FutureExt as _,
49 linera_core::worker::Reason,
50 std::{collections::BTreeSet, future::Future},
51};
52
53use crate::{
54 cli::command::{BenchmarkCommand, ResourceControlPolicyOverrides},
55 cli_wrappers::{
56 local_net::{PathProvider, ProcessInbox},
57 Network,
58 },
59 util::{self, ChildExt},
60 Wallet,
61};
62
63const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
66
67fn reqwest_client() -> reqwest::Client {
68 reqwest::ClientBuilder::new()
69 .timeout(Duration::from_secs(60))
70 .build()
71 .unwrap()
72}
73
74pub struct ClientWrapper {
76 binary_path: sync::Mutex<Option<PathBuf>>,
77 testing_prng_seed: Option<u64>,
78 storage: String,
79 wallet: String,
80 keystore: String,
81 max_pending_message_bundles: usize,
82 network: Network,
83 pub path_provider: PathProvider,
85 on_drop: OnClientDrop,
86 extra_args: Vec<String>,
87}
88
89#[derive(Clone, Copy, Debug, Eq, PartialEq)]
91pub enum OnClientDrop {
92 CloseChains,
94 LeakChains,
96}
97
98impl ClientWrapper {
99 pub fn new(
101 path_provider: PathProvider,
102 network: Network,
103 testing_prng_seed: Option<u64>,
104 id: usize,
105 on_drop: OnClientDrop,
106 ) -> Self {
107 Self::new_with_extra_args(
108 path_provider,
109 network,
110 testing_prng_seed,
111 id,
112 on_drop,
113 vec!["--wait-for-outgoing-messages".to_string()],
114 None,
115 )
116 }
117
118 pub fn new_with_extra_args(
120 path_provider: PathProvider,
121 network: Network,
122 testing_prng_seed: Option<u64>,
123 id: usize,
124 on_drop: OnClientDrop,
125 extra_args: Vec<String>,
126 binary_dir: Option<PathBuf>,
127 ) -> Self {
128 let storage = format!(
129 "rocksdb:{}/client_{}.db",
130 path_provider.path().display(),
131 id
132 );
133 let wallet = format!("wallet_{id}.json");
134 let keystore = format!("keystore_{id}.json");
135 let binary_path = binary_dir.map(|dir| dir.join("linera"));
136 Self {
137 binary_path: sync::Mutex::new(binary_path),
138 testing_prng_seed,
139 storage,
140 wallet,
141 keystore,
142 max_pending_message_bundles: 10_000,
143 network,
144 path_provider,
145 on_drop,
146 extra_args,
147 }
148 }
149
150 pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
152 let tmp = TempDir::new()?;
153 let mut command = self.command().await?;
154 command
155 .current_dir(tmp.path())
156 .arg("project")
157 .arg("new")
158 .arg(project_name)
159 .arg("--linera-root")
160 .arg(linera_root)
161 .spawn_and_wait_for_stdout()
162 .await?;
163 Ok(tmp)
164 }
165
166 pub async fn project_publish<T: Serialize>(
168 &self,
169 path: PathBuf,
170 required_application_ids: Vec<String>,
171 publisher: impl Into<Option<ChainId>>,
172 argument: &T,
173 ) -> Result<String> {
174 let json_parameters = serde_json::to_string(&())?;
175 let json_argument = serde_json::to_string(argument)?;
176 let mut command = self.command().await?;
177 command
178 .arg("project")
179 .arg("publish-and-create")
180 .arg(path)
181 .args(publisher.into().iter().map(ChainId::to_string))
182 .args(["--json-parameters", &json_parameters])
183 .args(["--json-argument", &json_argument]);
184 if !required_application_ids.is_empty() {
185 command.arg("--required-application-ids");
186 command.args(required_application_ids);
187 }
188 let stdout = command.spawn_and_wait_for_stdout().await?;
189 Ok(stdout.trim().to_string())
190 }
191
192 pub async fn project_test(&self, path: &Path) -> Result<()> {
194 self.command()
195 .await
196 .context("failed to create project test command")?
197 .current_dir(path)
198 .arg("project")
199 .arg("test")
200 .spawn_and_wait_for_stdout()
201 .await?;
202 Ok(())
203 }
204
205 async fn command_with_envs_and_arguments(
206 &self,
207 envs: &[(&str, &str)],
208 arguments: impl IntoIterator<Item = Cow<'_, str>>,
209 ) -> Result<Command> {
210 let mut command = self.command_binary().await?;
211 command.current_dir(self.path_provider.path());
212 for (key, value) in envs {
213 command.env(key, value);
214 }
215 for argument in arguments {
216 command.arg(&*argument);
217 }
218 Ok(command)
219 }
220
221 async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
222 self.command_with_envs_and_arguments(envs, self.command_arguments())
223 .await
224 }
225
226 async fn command_with_arguments(
227 &self,
228 arguments: impl IntoIterator<Item = Cow<'_, str>>,
229 ) -> Result<Command> {
230 self.command_with_envs_and_arguments(
231 &[(
232 "RUST_LOG",
233 &std::env::var("RUST_LOG").unwrap_or_else(|_| String::from("linera=debug")),
234 )],
235 arguments,
236 )
237 .await
238 }
239
240 async fn command(&self) -> Result<Command> {
241 self.command_with_envs(&[(
242 "RUST_LOG",
243 &std::env::var("RUST_LOG").unwrap_or_else(|_| String::from("linera=debug")),
244 )])
245 .await
246 }
247
248 fn required_command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
249 [
250 "--wallet".into(),
251 self.wallet.as_str().into(),
252 "--keystore".into(),
253 self.keystore.as_str().into(),
254 "--storage".into(),
255 self.storage.as_str().into(),
256 "--send-timeout-ms".into(),
257 "500000".into(),
258 "--recv-timeout-ms".into(),
259 "500000".into(),
260 ]
261 .into_iter()
262 .chain(self.extra_args.iter().map(|s| s.as_str().into()))
263 }
264
265 fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
267 self.required_command_arguments().chain([
268 "--max-pending-message-bundles".into(),
269 self.max_pending_message_bundles.to_string().into(),
270 ])
271 }
272
273 async fn command_binary(&self) -> Result<Command> {
277 match self.command_with_cached_binary_path() {
278 Some(command) => Ok(command),
279 None => {
280 let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
281 let command = Command::new(&resolved_path);
282
283 self.set_cached_binary_path(resolved_path);
284
285 Ok(command)
286 }
287 }
288 }
289
290 fn command_with_cached_binary_path(&self) -> Option<Command> {
292 let binary_path = self.binary_path.lock().unwrap();
293
294 binary_path.as_ref().map(Command::new)
295 }
296
297 fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
305 let mut binary_path = self.binary_path.lock().unwrap();
306
307 if binary_path.is_none() {
308 *binary_path = Some(new_binary_path);
309 } else {
310 assert_eq!(*binary_path, Some(new_binary_path));
311 }
312 }
313
314 pub async fn create_genesis_config(
316 &self,
317 num_other_initial_chains: u32,
318 initial_funding: Amount,
319 policy_config: ResourceControlPolicyConfig,
320 http_allow_list: Option<Vec<String>>,
321 ) -> Result<()> {
322 let mut command = self.command().await?;
323 command
324 .args([
325 "create-genesis-config",
326 &num_other_initial_chains.to_string(),
327 ])
328 .args(["--initial-funding", &initial_funding.to_string()])
329 .args(["--committee", "committee.json"])
330 .args(["--genesis", "genesis.json"])
331 .args([
332 "--policy-config",
333 &policy_config.to_string().to_kebab_case(),
334 ]);
335 if let Some(allow_list) = http_allow_list {
336 command
337 .arg("--http-request-allow-list")
338 .arg(allow_list.join(","));
339 }
340 if let Some(seed) = self.testing_prng_seed {
341 command.arg("--testing-prng-seed").arg(seed.to_string());
342 }
343 command.spawn_and_wait_for_stdout().await?;
344 Ok(())
345 }
346
347 pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
350 let mut command = self.command().await?;
351 command.args(["wallet", "init"]);
352 match faucet {
353 None => command.args(["--genesis", "genesis.json"]),
354 Some(faucet) => command.args(["--faucet", faucet.url()]),
355 };
356 if let Some(seed) = self.testing_prng_seed {
357 command.arg("--testing-prng-seed").arg(seed.to_string());
358 }
359 command.spawn_and_wait_for_stdout().await?;
360 Ok(())
361 }
362
363 pub async fn request_chain(
365 &self,
366 faucet: &Faucet,
367 set_default: bool,
368 ) -> Result<(ChainId, AccountOwner)> {
369 let mut command = self.command().await?;
370 command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
371 if set_default {
372 command.arg("--set-default");
373 }
374 let stdout = command.spawn_and_wait_for_stdout().await?;
375 let mut lines = stdout.split_whitespace();
376 let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
377 let owner = lines.next().context("missing chain owner")?.parse()?;
378 Ok((chain_id, owner))
379 }
380
381 #[expect(clippy::too_many_arguments)]
383 pub async fn publish_and_create<
384 A: ContractAbi,
385 Parameters: Serialize,
386 InstantiationArgument: Serialize,
387 >(
388 &self,
389 contract: PathBuf,
390 service: PathBuf,
391 vm_runtime: VmRuntime,
392 parameters: &Parameters,
393 argument: &InstantiationArgument,
394 required_application_ids: &[ApplicationId],
395 publisher: impl Into<Option<ChainId>>,
396 ) -> Result<ApplicationId<A>> {
397 let json_parameters = serde_json::to_string(parameters)?;
398 let json_argument = serde_json::to_string(argument)?;
399 let mut command = self.command().await?;
400 let vm_runtime = format!("{vm_runtime}");
401 command
402 .arg("publish-and-create")
403 .args([contract, service])
404 .args(["--vm-runtime", &vm_runtime.to_lowercase()])
405 .args(publisher.into().iter().map(ChainId::to_string))
406 .args(["--json-parameters", &json_parameters])
407 .args(["--json-argument", &json_argument]);
408 if !required_application_ids.is_empty() {
409 command.arg("--required-application-ids");
410 command.args(
411 required_application_ids
412 .iter()
413 .map(ApplicationId::to_string),
414 );
415 }
416 let stdout = command.spawn_and_wait_for_stdout().await?;
417 Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
418 }
419
420 pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
422 &self,
423 contract: PathBuf,
424 service: PathBuf,
425 vm_runtime: VmRuntime,
426 publisher: impl Into<Option<ChainId>>,
427 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
428 self.publish_module_with_formats(contract, service, None, vm_runtime, publisher)
429 .await
430 }
431
432 pub async fn publish_module_with_formats<Abi, Parameters, InstantiationArgument>(
434 &self,
435 contract: PathBuf,
436 service: PathBuf,
437 formats: Option<PathBuf>,
438 vm_runtime: VmRuntime,
439 publisher: impl Into<Option<ChainId>>,
440 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
441 let mut command = self.command().await?;
442 command
443 .arg("publish-module")
444 .args([contract, service])
445 .args(["--vm-runtime", &format!("{vm_runtime}").to_lowercase()]);
446 if let Some(formats_path) = formats {
447 command.arg("--formats").arg(formats_path);
448 }
449 let stdout = command
450 .args(publisher.into().iter().map(ChainId::to_string))
451 .spawn_and_wait_for_stdout()
452 .await?;
453 let module_id: ModuleId = stdout.trim().parse()?;
454 Ok(module_id.with_abi())
455 }
456
457 pub async fn create_application<
459 Abi: ContractAbi,
460 Parameters: Serialize,
461 InstantiationArgument: Serialize,
462 >(
463 &self,
464 module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
465 parameters: &Parameters,
466 argument: &InstantiationArgument,
467 required_application_ids: &[ApplicationId],
468 creator: impl Into<Option<ChainId>>,
469 ) -> Result<ApplicationId<Abi>> {
470 let json_parameters = serde_json::to_string(parameters)?;
471 let json_argument = serde_json::to_string(argument)?;
472 let mut command = self.command().await?;
473 command
474 .arg("create-application")
475 .arg(module_id.forget_abi().to_string())
476 .args(["--json-parameters", &json_parameters])
477 .args(["--json-argument", &json_argument])
478 .args(creator.into().iter().map(ChainId::to_string));
479 if !required_application_ids.is_empty() {
480 command.arg("--required-application-ids");
481 command.args(
482 required_application_ids
483 .iter()
484 .map(ApplicationId::to_string),
485 );
486 }
487 let stdout = command.spawn_and_wait_for_stdout().await?;
488 Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
489 }
490
491 pub async fn run_node_service(
493 &self,
494 port: impl Into<Option<u16>>,
495 process_inbox: ProcessInbox,
496 ) -> Result<NodeService> {
497 self.run_node_service_with_options(port, process_inbox, &[], &[], false)
498 .await
499 }
500
501 pub async fn run_node_service_with_options(
503 &self,
504 port: impl Into<Option<u16>>,
505 process_inbox: ProcessInbox,
506 operator_application_ids: &[ApplicationId],
507 operators: &[(String, PathBuf)],
508 read_only: bool,
509 ) -> Result<NodeService> {
510 self.run_node_service_with_all_options(
511 port,
512 process_inbox,
513 operator_application_ids,
514 operators,
515 read_only,
516 &[],
517 &[],
518 )
519 .await
520 }
521
522 #[expect(clippy::too_many_arguments)]
524 pub async fn run_node_service_with_all_options(
525 &self,
526 port: impl Into<Option<u16>>,
527 process_inbox: ProcessInbox,
528 operator_application_ids: &[ApplicationId],
529 operators: &[(String, PathBuf)],
530 read_only: bool,
531 allowed_subscriptions: &[String],
532 subscription_ttls: &[(String, u64)],
533 ) -> Result<NodeService> {
534 let port = port.into().unwrap_or(8080);
535 let mut command = self.command().await?;
536 command.arg("service");
537 if let ProcessInbox::Skip = process_inbox {
538 command.arg("--listener-skip-process-inbox");
539 }
540 if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
541 command.args(var.split_whitespace());
542 }
543 for app_id in operator_application_ids {
544 command.args(["--operator-application-ids", &app_id.to_string()]);
545 }
546 for (name, path) in operators {
547 command.args(["--operators", &format!("{}={}", name, path.display())]);
548 }
549 if read_only {
550 command.arg("--read-only");
551 }
552 for query in allowed_subscriptions {
553 command.args(["--allow-subscription", query]);
554 }
555 for (name, secs) in subscription_ttls {
556 command.args(["--subscription-ttl-secs", &format!("{name}={secs}")]);
557 }
558 let child = command
559 .args(["--port".to_string(), port.to_string()])
560 .spawn_into()?;
561 let client = reqwest_client();
562 for i in 0..10 {
563 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
564 let request = client.get(format!("http://localhost:{port}/")).send().await;
565 if request.is_ok() {
566 tracing::info!("Node service has started");
567 return Ok(NodeService::new(port, child));
568 } else {
569 tracing::warn!("Waiting for node service to start");
570 }
571 }
572 bail!("Failed to start node service");
573 }
574
575 pub async fn run_node_service_with_controller(
577 &self,
578 port: impl Into<Option<u16>>,
579 process_inbox: ProcessInbox,
580 controller_id: &ApplicationId,
581 operators: &[(String, PathBuf)],
582 ) -> Result<NodeService> {
583 let port = port.into().unwrap_or(8080);
584 let mut command = self.command().await?;
585 command.arg("service");
586 if let ProcessInbox::Skip = process_inbox {
587 command.arg("--listener-skip-process-inbox");
588 }
589 if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
590 command.args(var.split_whitespace());
591 }
592 command.args(["--controller-id", &controller_id.to_string()]);
593 for (name, path) in operators {
594 command.args(["--operators", &format!("{}={}", name, path.display())]);
595 }
596 let child = command
597 .args(["--port".to_string(), port.to_string()])
598 .spawn_into()?;
599 let client = reqwest_client();
600 for i in 0..10 {
601 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
602 let request = client.get(format!("http://localhost:{port}/")).send().await;
603 if request.is_ok() {
604 tracing::info!("Node service has started");
605 return Ok(NodeService::new(port, child));
606 } else {
607 tracing::warn!("Waiting for node service to start");
608 }
609 }
610 bail!("Failed to start node service");
611 }
612
613 pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
615 let mut command = self.command().await?;
616 command.arg("validator").arg("query").arg(address);
617 let stdout = command.spawn_and_wait_for_stdout().await?;
618
619 let hash = stdout
622 .lines()
623 .find_map(|line| {
624 line.strip_prefix("Genesis config hash: ")
625 .and_then(|hash_str| hash_str.trim().parse().ok())
626 })
627 .context("error while parsing the result of `linera validator query`")?;
628 Ok(hash)
629 }
630
631 pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
633 let mut command = self.command().await?;
634 command.arg("validator").arg("list");
635 if let Some(chain_id) = chain_id {
636 command.args(["--chain-id", &chain_id.to_string()]);
637 }
638 command.spawn_and_wait_for_stdout().await?;
639 Ok(())
640 }
641
642 pub async fn sync_validator(
644 &self,
645 chain_ids: impl IntoIterator<Item = &ChainId>,
646 validator_address: impl Into<String>,
647 ) -> Result<()> {
648 let mut command = self.command().await?;
649 command
650 .arg("validator")
651 .arg("sync")
652 .arg(validator_address.into());
653 let mut chain_ids = chain_ids.into_iter().peekable();
654 if chain_ids.peek().is_some() {
655 command
656 .arg("--chains")
657 .args(chain_ids.map(ChainId::to_string));
658 }
659 command.spawn_and_wait_for_stdout().await?;
660 Ok(())
661 }
662
663 pub async fn validator_benchmark(
665 &self,
666 address: impl Into<String>,
667 chains: impl IntoIterator<Item = &ChainId>,
668 extra_args: &[&str],
669 ) -> Result<String> {
670 let mut command = self.command().await?;
671 command
672 .arg("validator")
673 .arg("benchmark")
674 .arg(address.into());
675 for chain in chains {
676 command.args(["--chain", &chain.to_string()]);
677 }
678 command.args(extra_args);
679 command.spawn_and_wait_for_stdout().await
680 }
681
682 pub async fn run_faucet(
684 &self,
685 port: impl Into<Option<u16>>,
686 chain_id: Option<ChainId>,
687 amount: Amount,
688 ) -> Result<FaucetService> {
689 let port = port.into().unwrap_or(8080);
690 let temp_dir = tempfile::tempdir()
691 .context("Failed to create temporary directory for faucet storage")?;
692 let storage_path = temp_dir.path().join("faucet_storage.sqlite");
693 let mut command = self.command().await?;
694 let command = command
695 .arg("faucet")
696 .args(["--port".to_string(), port.to_string()])
697 .args(["--amount".to_string(), amount.to_string()])
698 .args([
699 "--storage-path".to_string(),
700 storage_path.to_string_lossy().to_string(),
701 ]);
702 if let Some(chain_id) = chain_id {
703 command.arg(chain_id.to_string());
704 }
705 let child = command.spawn_into()?;
706 let client = reqwest_client();
707 for i in 0..10 {
708 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
709 let request = client.get(format!("http://localhost:{port}/")).send().await;
710 if request.is_ok() {
711 tracing::info!("Faucet has started");
712 return Ok(FaucetService::new(port, child, temp_dir));
713 } else {
714 tracing::debug!("Waiting for faucet to start");
715 }
716 }
717 bail!("Failed to start faucet");
718 }
719
720 pub async fn local_balance(&self, account: Account) -> Result<Amount> {
722 let stdout = self
723 .command()
724 .await?
725 .arg("local-balance")
726 .arg(account.to_string())
727 .spawn_and_wait_for_stdout()
728 .await?;
729 let amount = stdout
730 .trim()
731 .parse()
732 .context("error while parsing the result of `linera local-balance`")?;
733 Ok(amount)
734 }
735
736 pub async fn query_balance(&self, account: Account) -> Result<Amount> {
738 let stdout = self
739 .command()
740 .await?
741 .arg("query-balance")
742 .arg(account.to_string())
743 .spawn_and_wait_for_stdout()
744 .await?;
745 let amount = stdout
746 .trim()
747 .parse()
748 .context("error while parsing the result of `linera query-balance`")?;
749 Ok(amount)
750 }
751
752 pub async fn query_application_json<T: DeserializeOwned>(
754 &self,
755 chain_id: ChainId,
756 application_id: ApplicationId,
757 query: impl AsRef<str>,
758 ) -> Result<T> {
759 let query = query.as_ref().trim();
760 let name = query
761 .split_once(|ch: char| !ch.is_alphanumeric())
762 .map_or(query, |(name, _)| name);
763 let stdout = self
764 .command()
765 .await?
766 .arg("query-application")
767 .arg("--chain-id")
768 .arg(chain_id.to_string())
769 .arg("--application-id")
770 .arg(application_id.to_string())
771 .arg(query)
772 .spawn_and_wait_for_stdout()
773 .await?;
774 let data: serde_json::Value =
775 serde_json::from_str(stdout.trim()).context("invalid JSON from query-application")?;
776 serde_json::from_value(data[name].clone())
777 .with_context(|| format!("{name} field missing in query-application response"))
778 }
779
780 pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
782 self.command()
783 .await?
784 .arg("sync")
785 .arg(chain_id.to_string())
786 .spawn_and_wait_for_stdout()
787 .await?;
788 Ok(())
789 }
790
791 pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
793 self.command()
794 .await?
795 .arg("process-inbox")
796 .arg(chain_id.to_string())
797 .spawn_and_wait_for_stdout()
798 .await?;
799 Ok(())
800 }
801
802 pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
804 self.command()
805 .await?
806 .arg("transfer")
807 .arg(amount.to_string())
808 .args(["--from", &from.to_string()])
809 .args(["--to", &to.to_string()])
810 .spawn_and_wait_for_stdout()
811 .await?;
812 Ok(())
813 }
814
815 pub async fn transfer_with_silent_logs(
817 &self,
818 amount: Amount,
819 from: ChainId,
820 to: ChainId,
821 ) -> Result<()> {
822 self.command()
823 .await?
824 .env("RUST_LOG", "off")
825 .arg("transfer")
826 .arg(amount.to_string())
827 .args(["--from", &from.to_string()])
828 .args(["--to", &to.to_string()])
829 .spawn_and_wait_for_stdout()
830 .await?;
831 Ok(())
832 }
833
834 pub async fn transfer_with_accounts(
836 &self,
837 amount: Amount,
838 from: Account,
839 to: Account,
840 ) -> Result<()> {
841 self.command()
842 .await?
843 .arg("transfer")
844 .arg(amount.to_string())
845 .args(["--from", &from.to_string()])
846 .args(["--to", &to.to_string()])
847 .spawn_and_wait_for_stdout()
848 .await?;
849 Ok(())
850 }
851
852 fn benchmark_command_internal(command: &mut Command, args: &BenchmarkCommand) -> Result<()> {
853 let mut formatted_args = to_args(&args)?;
854 let subcommand = formatted_args.remove(0);
855 formatted_args.remove(0);
858 let options = formatted_args
859 .chunks_exact(2)
860 .flat_map(|pair| {
861 let option = format!("--{}", pair[0]);
862 match pair[1].as_str() {
863 "true" => vec![option],
864 "false" => vec![],
865 _ => vec![option, pair[1].clone()],
866 }
867 })
868 .collect::<Vec<_>>();
869 command
870 .args([
871 "--max-pending-message-bundles",
872 &args.transactions_per_block().to_string(),
873 ])
874 .arg("benchmark")
875 .arg(subcommand)
876 .args(options);
877 Ok(())
878 }
879
880 async fn benchmark_command_with_envs(
881 &self,
882 args: BenchmarkCommand,
883 envs: &[(&str, &str)],
884 ) -> Result<Command> {
885 let mut command = self
886 .command_with_envs_and_arguments(envs, self.required_command_arguments())
887 .await?;
888 Self::benchmark_command_internal(&mut command, &args)?;
889 Ok(command)
890 }
891
892 async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
893 let mut command = self
894 .command_with_arguments(self.required_command_arguments())
895 .await?;
896 Self::benchmark_command_internal(&mut command, &args)?;
897 Ok(command)
898 }
899
900 pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
902 let mut command = self.benchmark_command(args).await?;
903 command.spawn_and_wait_for_stdout().await?;
904 Ok(())
905 }
906
907 pub async fn benchmark_detached(
910 &self,
911 args: BenchmarkCommand,
912 tx: oneshot::Sender<()>,
913 ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
914 let mut child = self
915 .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
916 .await?
917 .kill_on_drop(true)
918 .stdin(Stdio::piped())
919 .stdout(Stdio::piped())
920 .stderr(Stdio::piped())
921 .spawn()?;
922
923 let pid = child.id().expect("failed to get pid");
924 let stdout = child.stdout.take().expect("stdout not open");
925 let stdout_handle = tokio::spawn(async move {
926 let mut lines = BufReader::new(stdout).lines();
927 while let Ok(Some(line)) = lines.next_line().await {
928 println!("benchmark{{pid={pid}}} {line}");
929 }
930 });
931
932 let stderr = child.stderr.take().expect("stderr not open");
933 let stderr_handle = tokio::spawn(async move {
934 let mut lines = BufReader::new(stderr).lines();
935 let mut tx = Some(tx);
936 while let Ok(Some(line)) = lines.next_line().await {
937 if line.contains("Ready to start benchmark") {
938 tx.take()
939 .expect("Should only send signal once")
940 .send(())
941 .expect("failed to send ready signal to main thread");
942 } else {
943 println!("benchmark{{pid={pid}}} {line}");
944 }
945 }
946 });
947 Ok((child, stdout_handle, stderr_handle))
948 }
949
950 async fn open_chain_internal(
951 &self,
952 from: ChainId,
953 owner: Option<AccountOwner>,
954 initial_balance: Amount,
955 super_owner: bool,
956 ) -> Result<(ChainId, AccountOwner)> {
957 let mut command = self.command().await?;
958 command
959 .arg("open-chain")
960 .args(["--from", &from.to_string()])
961 .args(["--initial-balance", &initial_balance.to_string()]);
962
963 if let Some(owner) = owner {
964 command.args(["--owner", &owner.to_string()]);
965 }
966
967 if super_owner {
968 command.arg("--super-owner");
969 }
970
971 let stdout = command.spawn_and_wait_for_stdout().await?;
972 let mut split = stdout.split('\n');
973 let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
974 let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
975 if let Some(owner) = owner {
976 assert_eq!(owner, new_owner);
977 }
978 Ok((chain_id, new_owner))
979 }
980
981 pub async fn open_chain_super_owner(
983 &self,
984 from: ChainId,
985 owner: Option<AccountOwner>,
986 initial_balance: Amount,
987 ) -> Result<(ChainId, AccountOwner)> {
988 self.open_chain_internal(from, owner, initial_balance, true)
989 .await
990 }
991
992 pub async fn open_chain(
994 &self,
995 from: ChainId,
996 owner: Option<AccountOwner>,
997 initial_balance: Amount,
998 ) -> Result<(ChainId, AccountOwner)> {
999 self.open_chain_internal(from, owner, initial_balance, false)
1000 .await
1001 }
1002
1003 pub async fn open_and_assign(
1005 &self,
1006 client: &ClientWrapper,
1007 initial_balance: Amount,
1008 ) -> Result<ChainId> {
1009 let our_chain = self
1010 .load_wallet()?
1011 .default_chain()
1012 .context("no default chain found")?;
1013 let owner = client.keygen().await?;
1014 let (new_chain, _) = self
1015 .open_chain(our_chain, Some(owner), initial_balance)
1016 .await?;
1017 client.assign(owner, new_chain).await?;
1018 Ok(new_chain)
1019 }
1020
1021 pub async fn open_multi_owner_chain(
1023 &self,
1024 from: ChainId,
1025 owners: BTreeMap<AccountOwner, u64>,
1026 multi_leader_rounds: u32,
1027 balance: Amount,
1028 base_timeout_ms: u64,
1029 ) -> Result<ChainId> {
1030 let mut command = self.command().await?;
1031 command
1032 .arg("open-multi-owner-chain")
1033 .args(["--from", &from.to_string()])
1034 .arg("--owners")
1035 .arg(serde_json::to_string(&owners)?)
1036 .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
1037 command
1038 .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
1039 .args(["--initial-balance", &balance.to_string()]);
1040
1041 let stdout = command.spawn_and_wait_for_stdout().await?;
1042 let mut split = stdout.split('\n');
1043 let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
1044
1045 Ok(chain_id)
1046 }
1047
1048 pub async fn change_ownership(
1050 &self,
1051 chain_id: ChainId,
1052 super_owners: Vec<AccountOwner>,
1053 owners: Vec<AccountOwner>,
1054 ) -> Result<()> {
1055 let mut command = self.command().await?;
1056 command
1057 .arg("change-ownership")
1058 .args(["--chain-id", &chain_id.to_string()]);
1059 command
1060 .arg("--super-owners")
1061 .arg(serde_json::to_string(&super_owners)?);
1062 command.arg("--owners").arg(serde_json::to_string(
1063 &owners
1064 .into_iter()
1065 .zip(std::iter::repeat(100u64))
1066 .collect::<BTreeMap<_, _>>(),
1067 )?);
1068 command.spawn_and_wait_for_stdout().await?;
1069 Ok(())
1070 }
1071
1072 pub async fn change_application_permissions(
1074 &self,
1075 chain_id: ChainId,
1076 application_permissions: ApplicationPermissions,
1077 ) -> Result<()> {
1078 let mut command = self.command().await?;
1079 command
1080 .arg("change-application-permissions")
1081 .args(["--chain-id", &chain_id.to_string()]);
1082 command.arg("--manage-chain").arg(serde_json::to_string(
1083 &application_permissions.manage_chain,
1084 )?);
1085 command.spawn_and_wait_for_stdout().await?;
1087 Ok(())
1088 }
1089
1090 pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
1092 let mut command = self.command().await?;
1093 command
1094 .args(["wallet", "follow-chain"])
1095 .arg(chain_id.to_string());
1096 if sync {
1097 command.arg("--sync");
1098 }
1099 command.spawn_and_wait_for_stdout().await?;
1100 Ok(())
1101 }
1102
1103 pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
1105 let mut command = self.command().await?;
1106 command
1107 .args(["wallet", "forget-chain"])
1108 .arg(chain_id.to_string());
1109 command.spawn_and_wait_for_stdout().await?;
1110 Ok(())
1111 }
1112
1113 pub async fn set_default_chain(&self, chain_id: ChainId) -> Result<()> {
1115 let mut command = self.command().await?;
1116 command
1117 .args(["wallet", "set-default"])
1118 .arg(chain_id.to_string());
1119 command.spawn_and_wait_for_stdout().await?;
1120 Ok(())
1121 }
1122
1123 pub async fn retry_pending_block(
1125 &self,
1126 chain_id: Option<ChainId>,
1127 ) -> Result<Option<CryptoHash>> {
1128 let mut command = self.command().await?;
1129 command.arg("retry-pending-block");
1130 if let Some(chain_id) = chain_id {
1131 command.arg(chain_id.to_string());
1132 }
1133 let stdout = command.spawn_and_wait_for_stdout().await?;
1134 let stdout = stdout.trim();
1135 if stdout.is_empty() {
1136 Ok(None)
1137 } else {
1138 Ok(Some(CryptoHash::from_str(stdout)?))
1139 }
1140 }
1141
1142 pub async fn publish_data_blob(
1144 &self,
1145 path: &Path,
1146 chain_id: Option<ChainId>,
1147 ) -> Result<CryptoHash> {
1148 let mut command = self.command().await?;
1149 command.arg("publish-data-blob").arg(path);
1150 if let Some(chain_id) = chain_id {
1151 command.arg(chain_id.to_string());
1152 }
1153 let stdout = command.spawn_and_wait_for_stdout().await?;
1154 let stdout = stdout.trim();
1155 Ok(CryptoHash::from_str(stdout)?)
1156 }
1157
1158 pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
1160 let mut command = self.command().await?;
1161 command.arg("read-data-blob").arg(hash.to_string());
1162 if let Some(chain_id) = chain_id {
1163 command.arg(chain_id.to_string());
1164 }
1165 command.spawn_and_wait_for_stdout().await?;
1166 Ok(())
1167 }
1168
1169 pub fn load_wallet(&self) -> Result<Wallet> {
1171 Ok(Wallet::read(&self.wallet_path())?)
1172 }
1173
1174 pub fn load_keystore(&self) -> Result<InMemorySigner> {
1176 util::read_json(self.keystore_path())
1177 }
1178
1179 pub fn wallet_path(&self) -> PathBuf {
1181 self.path_provider.path().join(&self.wallet)
1182 }
1183
1184 pub fn keystore_path(&self) -> PathBuf {
1186 self.path_provider.path().join(&self.keystore)
1187 }
1188
1189 pub fn storage_path(&self) -> &str {
1191 &self.storage
1192 }
1193
1194 pub fn get_owner(&self) -> Option<AccountOwner> {
1196 let wallet = self.load_wallet().ok()?;
1197 wallet
1198 .get(wallet.default_chain()?)
1199 .expect("default chain must be in wallet")
1200 .owner
1201 }
1202
1203 pub fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
1205 self.load_wallet()
1206 .ok()
1207 .is_some_and(|wallet| wallet.get(chain).is_some())
1208 }
1209
1210 pub async fn set_validator(
1212 &self,
1213 validator_key: &(String, String),
1214 port: usize,
1215 votes: usize,
1216 ) -> Result<()> {
1217 let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1218 self.command()
1219 .await?
1220 .arg("validator")
1221 .arg("add")
1222 .args(["--public-key", &validator_key.0])
1223 .args(["--account-key", &validator_key.1])
1224 .args(["--address", &address])
1225 .args(["--votes", &votes.to_string()])
1226 .spawn_and_wait_for_stdout()
1227 .await?;
1228 Ok(())
1229 }
1230
1231 pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
1233 self.command()
1234 .await?
1235 .arg("validator")
1236 .arg("remove")
1237 .args(["--public-key", validator_key])
1238 .spawn_and_wait_for_stdout()
1239 .await?;
1240 Ok(())
1241 }
1242
1243 pub async fn change_validators(
1245 &self,
1246 add_validators: &[(String, String, usize, usize)], modify_validators: &[(String, String, usize, usize)], remove_validators: &[String],
1249 ) -> Result<()> {
1250 use std::str::FromStr;
1251
1252 use linera_base::crypto::{AccountPublicKey, ValidatorPublicKey};
1253
1254 let mut changes = std::collections::HashMap::new();
1257
1258 for (public_key_str, account_key_str, port, votes) in
1260 add_validators.iter().chain(modify_validators.iter())
1261 {
1262 let public_key = ValidatorPublicKey::from_str(public_key_str)
1263 .with_context(|| format!("Invalid validator public key: {public_key_str}"))?;
1264
1265 let account_key = AccountPublicKey::from_str(account_key_str)
1266 .with_context(|| format!("Invalid account public key: {account_key_str}"))?;
1267
1268 let address = format!("{}:127.0.0.1:{}", self.network.short(), port)
1269 .parse()
1270 .unwrap();
1271
1272 let change = crate::cli::validator::Change {
1274 account_key,
1275 address,
1276 votes: crate::cli::validator::Votes(
1277 std::num::NonZero::new(*votes as u64).context("Votes must be non-zero")?,
1278 ),
1279 };
1280
1281 changes.insert(public_key, Some(change));
1282 }
1283
1284 for validator_key_str in remove_validators {
1286 let public_key = ValidatorPublicKey::from_str(validator_key_str)
1287 .with_context(|| format!("Invalid validator public key: {validator_key_str}"))?;
1288 changes.insert(public_key, None);
1289 }
1290
1291 let temp_file = tempfile::NamedTempFile::new()
1293 .context("Failed to create temporary file for validator changes")?;
1294 serde_json::to_writer(&temp_file, &changes)
1295 .context("Failed to write validator changes to file")?;
1296 let temp_path = temp_file.path();
1297
1298 self.command()
1299 .await?
1300 .arg("validator")
1301 .arg("update")
1302 .arg(temp_path)
1303 .arg("--yes") .spawn_and_wait_for_stdout()
1305 .await?;
1306
1307 Ok(())
1308 }
1309
1310 pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
1312 self.command()
1313 .await?
1314 .arg("revoke-epochs")
1315 .arg(epoch.to_string())
1316 .spawn_and_wait_for_stdout()
1317 .await?;
1318 Ok(())
1319 }
1320
1321 pub async fn set_resource_control_policy(
1323 &self,
1324 overrides: ResourceControlPolicyOverrides,
1325 ) -> Result<()> {
1326 let mut command = self.command().await?;
1327 command.arg("resource-control-policy");
1328 let ResourceControlPolicyOverrides {
1329 wasm_fuel_unit,
1330 evm_fuel_unit,
1331 read_operation,
1332 write_operation,
1333 byte_runtime,
1334 byte_read,
1335 byte_written,
1336 blob_read,
1337 blob_published,
1338 blob_byte_read,
1339 blob_byte_published,
1340 operation,
1341 operation_byte,
1342 message,
1343 message_byte,
1344 service_as_oracle_query,
1345 http_request,
1346 maximum_wasm_fuel_per_block,
1347 maximum_evm_fuel_per_block,
1348 maximum_service_oracle_execution_ms,
1349 maximum_block_size,
1350 maximum_blob_size,
1351 maximum_published_blobs,
1352 maximum_bytecode_size,
1353 maximum_block_proposal_size,
1354 maximum_bytes_read_per_block,
1355 maximum_bytes_written_per_block,
1356 maximum_oracle_response_bytes,
1357 maximum_http_response_bytes,
1358 http_request_timeout_ms,
1359 http_request_allow_list,
1360 free_application_ids,
1361 flags,
1362 } = overrides;
1363 if let Some(value) = wasm_fuel_unit {
1364 command.args(["--wasm-fuel-unit", &value.to_string()]);
1365 }
1366 if let Some(value) = evm_fuel_unit {
1367 command.args(["--evm-fuel-unit", &value.to_string()]);
1368 }
1369 if let Some(value) = read_operation {
1370 command.args(["--read-operation", &value.to_string()]);
1371 }
1372 if let Some(value) = write_operation {
1373 command.args(["--write-operation", &value.to_string()]);
1374 }
1375 if let Some(value) = byte_runtime {
1376 command.args(["--byte-runtime", &value.to_string()]);
1377 }
1378 if let Some(value) = byte_read {
1379 command.args(["--byte-read", &value.to_string()]);
1380 }
1381 if let Some(value) = byte_written {
1382 command.args(["--byte-written", &value.to_string()]);
1383 }
1384 if let Some(value) = blob_read {
1385 command.args(["--blob-read", &value.to_string()]);
1386 }
1387 if let Some(value) = blob_published {
1388 command.args(["--blob-published", &value.to_string()]);
1389 }
1390 if let Some(value) = blob_byte_read {
1391 command.args(["--blob-byte-read", &value.to_string()]);
1392 }
1393 if let Some(value) = blob_byte_published {
1394 command.args(["--blob-byte-published", &value.to_string()]);
1395 }
1396 if let Some(value) = operation {
1397 command.args(["--operation", &value.to_string()]);
1398 }
1399 if let Some(value) = operation_byte {
1400 command.args(["--operation-byte", &value.to_string()]);
1401 }
1402 if let Some(value) = message {
1403 command.args(["--message", &value.to_string()]);
1404 }
1405 if let Some(value) = message_byte {
1406 command.args(["--message-byte", &value.to_string()]);
1407 }
1408 if let Some(value) = service_as_oracle_query {
1409 command.args(["--service-as-oracle-query", &value.to_string()]);
1410 }
1411 if let Some(value) = http_request {
1412 command.args(["--http-request", &value.to_string()]);
1413 }
1414 if let Some(value) = maximum_wasm_fuel_per_block {
1415 command.args(["--maximum-wasm-fuel-per-block", &value.to_string()]);
1416 }
1417 if let Some(value) = maximum_evm_fuel_per_block {
1418 command.args(["--maximum-evm-fuel-per-block", &value.to_string()]);
1419 }
1420 if let Some(value) = maximum_service_oracle_execution_ms {
1421 command.args(["--maximum-service-oracle-execution-ms", &value.to_string()]);
1422 }
1423 if let Some(value) = maximum_block_size {
1424 command.args(["--maximum-block-size", &value.to_string()]);
1425 }
1426 if let Some(value) = maximum_blob_size {
1427 command.args(["--maximum-blob-size", &value.to_string()]);
1428 }
1429 if let Some(value) = maximum_published_blobs {
1430 command.args(["--maximum-published-blobs", &value.to_string()]);
1431 }
1432 if let Some(value) = maximum_bytecode_size {
1433 command.args(["--maximum-bytecode-size", &value.to_string()]);
1434 }
1435 if let Some(value) = maximum_block_proposal_size {
1436 command.args(["--maximum-block-proposal-size", &value.to_string()]);
1437 }
1438 if let Some(value) = maximum_bytes_read_per_block {
1439 command.args(["--maximum-bytes-read-per-block", &value.to_string()]);
1440 }
1441 if let Some(value) = maximum_bytes_written_per_block {
1442 command.args(["--maximum-bytes-written-per-block", &value.to_string()]);
1443 }
1444 if let Some(value) = maximum_oracle_response_bytes {
1445 command.args(["--maximum-oracle-response-bytes", &value.to_string()]);
1446 }
1447 if let Some(value) = maximum_http_response_bytes {
1448 command.args(["--maximum-http-response-bytes", &value.to_string()]);
1449 }
1450 if let Some(value) = http_request_timeout_ms {
1451 command.args(["--http-request-timeout-ms", &value.to_string()]);
1452 }
1453 if let Some(values) = http_request_allow_list {
1454 command.args(["--http-request-allow-list", &values.join(",")]);
1455 }
1456 if let Some(values) = free_application_ids {
1457 command.args(["--free-application-ids", &values.join(",")]);
1458 }
1459 if let Some(values) = flags {
1460 command.args(["--flags", &values.join(",")]);
1461 }
1462 command.spawn_and_wait_for_stdout().await?;
1463 Ok(())
1464 }
1465
1466 pub async fn keygen(&self) -> Result<AccountOwner> {
1468 let stdout = self
1469 .command()
1470 .await?
1471 .arg("keygen")
1472 .spawn_and_wait_for_stdout()
1473 .await?;
1474 AccountOwner::from_str(stdout.as_str().trim())
1475 }
1476
1477 pub fn default_chain(&self) -> Option<ChainId> {
1479 self.load_wallet().ok()?.default_chain()
1480 }
1481
1482 pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
1484 let _stdout = self
1485 .command()
1486 .await?
1487 .arg("assign")
1488 .args(["--owner", &owner.to_string()])
1489 .args(["--chain-id", &chain_id.to_string()])
1490 .spawn_and_wait_for_stdout()
1491 .await?;
1492 Ok(())
1493 }
1494
1495 pub async fn set_preferred_owner(
1497 &self,
1498 chain_id: ChainId,
1499 owner: Option<AccountOwner>,
1500 ) -> Result<()> {
1501 let mut owner_arg = vec!["--owner".to_string()];
1502 if let Some(owner) = owner {
1503 owner_arg.push(owner.to_string());
1504 };
1505 self.command()
1506 .await?
1507 .arg("set-preferred-owner")
1508 .args(["--chain-id", &chain_id.to_string()])
1509 .args(owner_arg)
1510 .spawn_and_wait_for_stdout()
1511 .await?;
1512 Ok(())
1513 }
1514
1515 pub async fn build_application(
1517 &self,
1518 path: &Path,
1519 name: &str,
1520 is_workspace: bool,
1521 ) -> Result<(PathBuf, PathBuf)> {
1522 Command::new("cargo")
1523 .current_dir(self.path_provider.path())
1524 .arg("build")
1525 .arg("--release")
1526 .args(["--target", "wasm32-unknown-unknown"])
1527 .arg("--manifest-path")
1528 .arg(path.join("Cargo.toml"))
1529 .spawn_and_wait_for_stdout()
1530 .await?;
1531
1532 let release_dir = match is_workspace {
1533 true => path.join("../target/wasm32-unknown-unknown/release"),
1534 false => path.join("target/wasm32-unknown-unknown/release"),
1535 };
1536
1537 let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1538 let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1539
1540 let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1541 let service_size = fs_err::tokio::metadata(&service).await?.len();
1542 tracing::info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1543
1544 Ok((contract, service))
1545 }
1546}
1547
1548impl Drop for ClientWrapper {
1549 fn drop(&mut self) {
1550 use std::process::Command as SyncCommand;
1551
1552 if self.on_drop != OnClientDrop::CloseChains {
1553 return;
1554 }
1555
1556 let Ok(binary_path) = self.binary_path.lock() else {
1557 tracing::error!(
1558 "Failed to close chains because a thread panicked with a lock to `binary_path`"
1559 );
1560 return;
1561 };
1562
1563 let Some(binary_path) = binary_path.as_ref() else {
1564 tracing::warn!(
1565 "Assuming no chains need to be closed, because the command binary was never \
1566 resolved and therefore presumably never called"
1567 );
1568 return;
1569 };
1570
1571 let working_directory = self.path_provider.path();
1572 let mut wallet_show_command = SyncCommand::new(binary_path);
1573
1574 for argument in self.command_arguments() {
1575 wallet_show_command.arg(&*argument);
1576 }
1577
1578 let Ok(wallet_show_output) = wallet_show_command
1579 .current_dir(working_directory)
1580 .args(["wallet", "show", "--short", "--owned"])
1581 .output()
1582 else {
1583 tracing::warn!("Failed to execute `wallet show --short` to list chains to close");
1584 return;
1585 };
1586
1587 if !wallet_show_output.status.success() {
1588 tracing::warn!("Failed to list chains in the wallet to close them");
1589 return;
1590 }
1591
1592 let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1593 tracing::warn!(
1594 "Failed to close chains because `linera wallet show --short` \
1595 returned a non-UTF-8 output"
1596 );
1597 return;
1598 };
1599
1600 let chain_ids = chain_list_string
1601 .split('\n')
1602 .map(|line| line.trim())
1603 .filter(|line| !line.is_empty());
1604
1605 for chain_id in chain_ids {
1606 let mut close_chain_command = SyncCommand::new(binary_path);
1607
1608 for argument in self.command_arguments() {
1609 close_chain_command.arg(&*argument);
1610 }
1611
1612 close_chain_command.current_dir(working_directory);
1613
1614 match close_chain_command.args(["close-chain", chain_id]).status() {
1615 Ok(status) if status.success() => (),
1616 Ok(failure) => tracing::warn!("Failed to close chain {chain_id}: {failure}"),
1617 Err(error) => tracing::warn!("Failed to close chain {chain_id}: {error}"),
1618 }
1619 }
1620 }
1621}
1622
1623#[cfg(with_testing)]
1624impl ClientWrapper {
1625 pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1627 self.build_application(Self::example_path(name)?.as_path(), name, true)
1628 .await
1629 }
1630
1631 pub async fn build_test_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1633 self.build_application(Self::test_example_path(name)?.as_path(), name, true)
1634 .await
1635 }
1636
1637 pub fn example_path(name: &str) -> Result<PathBuf> {
1639 Ok(env::current_dir()?.join("../examples/").join(name))
1640 }
1641
1642 pub fn test_example_path(name: &str) -> Result<PathBuf> {
1644 Ok(env::current_dir()?
1645 .join("../linera-sdk/tests/fixtures/")
1646 .join(name))
1647 }
1648}
1649
1650fn truncate_query_output(input: &str) -> String {
1651 let max_len = 1000;
1652 if input.len() < max_len {
1653 input.to_string()
1654 } else {
1655 format!("{} ...", input.get(..max_len).unwrap())
1656 }
1657}
1658
1659fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1660 let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1661 let max_len = 200;
1662 if query.len() < max_len {
1663 query
1664 } else {
1665 format!("{} ...", query.get(..max_len).unwrap())
1666 }
1667}
1668
1669fn log_unexpected_exit(child: &mut Child, service_kind: &str, port: u16) {
1673 match child.try_wait() {
1674 Ok(Some(status)) => {
1675 #[cfg(unix)]
1676 {
1677 use std::os::unix::process::ExitStatusExt as _;
1678 if let Some(signal) = status.signal() {
1679 tracing::error!(
1680 port,
1681 signal,
1682 "The {service_kind} service was killed by signal {signal}",
1683 );
1684 return;
1685 }
1686 }
1687 if !status.success() {
1688 tracing::error!(
1689 port,
1690 %status,
1691 "The {service_kind} service exited unexpectedly with {status}",
1692 );
1693 }
1694 }
1695 Ok(None) => {} Err(error) => {
1697 tracing::warn!(
1698 port,
1699 %error,
1700 "Failed to check {service_kind} service status",
1701 );
1702 }
1703 }
1704}
1705
1706pub struct NodeService {
1708 port: u16,
1709 child: Child,
1710 terminated: bool,
1711}
1712
1713impl Drop for NodeService {
1714 fn drop(&mut self) {
1715 if !self.terminated {
1716 log_unexpected_exit(&mut self.child, "node", self.port);
1717 }
1718 }
1719}
1720
1721impl NodeService {
1722 fn new(port: u16, child: Child) -> Self {
1723 Self {
1724 port,
1725 child,
1726 terminated: false,
1727 }
1728 }
1729
1730 pub async fn terminate(mut self) -> Result<()> {
1732 self.terminated = true;
1733 self.child.kill().await.context("terminating node service")
1734 }
1735
1736 pub fn port(&self) -> u16 {
1738 self.port
1739 }
1740
1741 pub fn ensure_is_running(&mut self) -> Result<()> {
1743 self.child.ensure_is_running()
1744 }
1745
1746 pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1748 let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1749 let mut data = self.query_node(query).await?;
1750 Ok(serde_json::from_value(data["processInbox"].take())?)
1751 }
1752
1753 pub async fn sync(&self, chain_id: &ChainId) -> Result<u64> {
1755 let query = format!("mutation {{ sync(chainId: \"{chain_id}\") }}");
1756 let mut data = self.query_node(query).await?;
1757 Ok(serde_json::from_value(data["sync"].take())?)
1758 }
1759
1760 pub async fn transfer(
1762 &self,
1763 chain_id: ChainId,
1764 owner: AccountOwner,
1765 recipient: Account,
1766 amount: Amount,
1767 ) -> Result<CryptoHash> {
1768 let json_owner = owner.to_value();
1769 let json_recipient = recipient.to_value();
1770 let query = format!(
1771 "mutation {{ transfer(\
1772 chainId: \"{chain_id}\", \
1773 owner: {json_owner}, \
1774 recipient: {json_recipient}, \
1775 amount: \"{amount}\") \
1776 }}"
1777 );
1778 let data = self.query_node(query).await?;
1779 serde_json::from_value(data["transfer"].clone())
1780 .context("missing transfer field in response")
1781 }
1782
1783 pub async fn balance(&self, account: &Account) -> Result<Amount> {
1785 let chain = account.chain_id;
1786 let owner = account.owner;
1787 if matches!(owner, AccountOwner::CHAIN) {
1788 let query = format!(
1789 "query {{ chain(chainId:\"{chain}\") {{
1790 executionState {{ system {{ balance }} }}
1791 }} }}"
1792 );
1793 let response = self.query_node(query).await?;
1794 let balance = &response["chain"]["executionState"]["system"]["balance"]
1795 .as_str()
1796 .unwrap();
1797 return Ok(Amount::from_str(balance)?);
1798 }
1799 let query = format!(
1800 "query {{ chain(chainId:\"{chain}\") {{
1801 executionState {{ system {{ balances {{
1802 entry(key:\"{owner}\") {{ value }}
1803 }} }} }}
1804 }} }}"
1805 );
1806 let response = self.query_node(query).await?;
1807 let balances = &response["chain"]["executionState"]["system"]["balances"];
1808 let balance = balances["entry"]["value"].as_str();
1809 match balance {
1810 None => Ok(Amount::ZERO),
1811 Some(amount) => Ok(Amount::from_str(amount)?),
1812 }
1813 }
1814
1815 pub fn make_application<A: ContractAbi>(
1817 &self,
1818 chain_id: &ChainId,
1819 application_id: &ApplicationId<A>,
1820 ) -> Result<ApplicationWrapper<A>> {
1821 let application_id = application_id.forget_abi().to_string();
1822 let link = format!(
1823 "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1824 self.port
1825 );
1826 Ok(ApplicationWrapper::from(link))
1827 }
1828
1829 pub async fn publish_data_blob(
1831 &self,
1832 chain_id: &ChainId,
1833 bytes: Vec<u8>,
1834 ) -> Result<CryptoHash> {
1835 let query = format!(
1836 "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1837 chain_id.to_value(),
1838 bytes.to_value(),
1839 );
1840 let data = self.query_node(query).await?;
1841 serde_json::from_value(data["publishDataBlob"].clone())
1842 .context("missing publishDataBlob field in response")
1843 }
1844
1845 pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1847 &self,
1848 chain_id: &ChainId,
1849 contract: PathBuf,
1850 service: PathBuf,
1851 vm_runtime: VmRuntime,
1852 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1853 let contract_code = Bytecode::load_from_file(&contract).await?;
1854 let service_code = Bytecode::load_from_file(&service).await?;
1855 let query = format!(
1856 "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1857 chain_id.to_value(),
1858 contract_code.to_value(),
1859 service_code.to_value(),
1860 vm_runtime.to_value(),
1861 );
1862 let data = self.query_node(query).await?;
1863 let module_str = data["publishModule"]
1864 .as_str()
1865 .context("module ID not found")?;
1866 let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1867 Ok(module_id.with_abi())
1868 }
1869
1870 pub async fn query_committee_hash(&self, chain_id: &ChainId) -> Result<Option<CryptoHash>> {
1872 let query = format!(
1873 "query {{ chain(chainId:\"{chain_id}\") {{
1874 executionState {{ system {{ committeeHash }} }}
1875 }} }}"
1876 );
1877 let mut response = self.query_node(query).await?;
1878 let hash = response["chain"]["executionState"]["system"]["committeeHash"].take();
1879 Ok(serde_json::from_value(hash)?)
1880 }
1881
1882 pub async fn query_chain_epoch(&self, chain_id: &ChainId) -> Result<Epoch> {
1884 let query = format!(
1885 "query {{ chain(chainId:\"{chain_id}\") {{
1886 executionState {{ system {{ epoch }} }}
1887 }} }}"
1888 );
1889 let mut response = self.query_node(query).await?;
1890 let epoch = response["chain"]["executionState"]["system"]["epoch"].take();
1891 Ok(serde_json::from_value(epoch)?)
1892 }
1893
1894 pub async fn events_from_index(
1896 &self,
1897 chain_id: &ChainId,
1898 stream_id: &StreamId,
1899 start_index: u32,
1900 ) -> Result<Vec<IndexAndEvent>> {
1901 let query = format!(
1902 "query {{
1903 eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1904 {{ index event }}
1905 }}",
1906 stream_id.to_value()
1907 );
1908 let mut response = self.query_node(query).await?;
1909 let response = response["eventsFromIndex"].take();
1910 Ok(serde_json::from_value(response)?)
1911 }
1912
1913 pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1915 let n_try = 5;
1916 let query = query.as_ref();
1917 for i in 0..n_try {
1918 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1919 let url = format!("http://localhost:{}/", self.port);
1920 let client = reqwest_client();
1921 let result = client
1922 .post(url)
1923 .json(&json!({ "query": query }))
1924 .send()
1925 .await;
1926 if matches!(result, Err(ref error) if error.is_timeout()) {
1927 tracing::warn!(
1928 "Timeout when sending query {} to the node service",
1929 truncate_query_output(query)
1930 );
1931 continue;
1932 }
1933 let response = result.with_context(|| {
1934 format!(
1935 "query_node: failed to post query={}",
1936 truncate_query_output(query)
1937 )
1938 })?;
1939 ensure!(
1940 response.status().is_success(),
1941 "Query \"{}\" failed: {}",
1942 truncate_query_output(query),
1943 response
1944 .text()
1945 .await
1946 .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1947 );
1948 let value: Value = response.json().await.context("invalid JSON")?;
1949 if let Some(errors) = value.get("errors") {
1950 tracing::warn!(
1951 "Query \"{}\" failed: {}",
1952 truncate_query_output(query),
1953 errors
1954 );
1955 } else {
1956 return Ok(value["data"].clone());
1957 }
1958 }
1959 bail!(
1960 "Query \"{}\" failed after {} retries.",
1961 truncate_query_output(query),
1962 n_try
1963 );
1964 }
1965
1966 pub async fn create_application<
1968 Abi: ContractAbi,
1969 Parameters: Serialize,
1970 InstantiationArgument: Serialize,
1971 >(
1972 &self,
1973 chain_id: &ChainId,
1974 module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1975 parameters: &Parameters,
1976 argument: &InstantiationArgument,
1977 required_application_ids: &[ApplicationId],
1978 ) -> Result<ApplicationId<Abi>> {
1979 let module_id = module_id.forget_abi();
1980 let json_required_applications_ids = required_application_ids
1981 .iter()
1982 .map(ApplicationId::to_string)
1983 .collect::<Vec<_>>()
1984 .to_value();
1985 let new_parameters = serde_json::to_value(parameters)
1987 .context("could not create parameters JSON")?
1988 .to_value();
1989 let new_argument = serde_json::to_value(argument)
1990 .context("could not create argument JSON")?
1991 .to_value();
1992 let query = format!(
1993 "mutation {{ createApplication(\
1994 chainId: \"{chain_id}\",
1995 moduleId: \"{module_id}\", \
1996 parameters: {new_parameters}, \
1997 instantiationArgument: {new_argument}, \
1998 requiredApplicationIds: {json_required_applications_ids}) \
1999 }}"
2000 );
2001 let data = self.query_node(query).await?;
2002 let app_id_str = data["createApplication"]
2003 .as_str()
2004 .context("missing createApplication string in response")?
2005 .trim();
2006 Ok(app_id_str
2007 .parse::<ApplicationId>()
2008 .context("invalid application ID")?
2009 .with_abi())
2010 }
2011
2012 pub async fn chain_tip(&self, chain: ChainId) -> Result<Option<(CryptoHash, BlockHeight)>> {
2014 let query = format!(
2015 r#"query {{ block(chainId: "{chain}") {{
2016 hash
2017 block {{ header {{ height }} }}
2018 }} }}"#
2019 );
2020
2021 let mut response = self.query_node(&query).await?;
2022
2023 match (
2024 mem::take(&mut response["block"]["hash"]),
2025 mem::take(&mut response["block"]["block"]["header"]["height"]),
2026 ) {
2027 (Value::Null, Value::Null) => Ok(None),
2028 (Value::String(hash), Value::Number(height)) => Ok(Some((
2029 hash.parse()
2030 .context("Received an invalid hash {hash:?} for chain tip")?,
2031 BlockHeight(height.as_u64().unwrap()),
2032 ))),
2033 invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
2034 }
2035 }
2036
2037 pub async fn notifications(
2039 &self,
2040 chain_id: ChainId,
2041 ) -> Result<Pin<Box<impl Stream<Item = Result<Notification>>>>> {
2042 let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
2043 let url = format!("ws://localhost:{}/ws", self.port);
2044 let mut request = url.into_client_request()?;
2045 request.headers_mut().insert(
2046 "Sec-WebSocket-Protocol",
2047 HeaderValue::from_str("graphql-transport-ws")?,
2048 );
2049 let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
2050 let init_json = json!({
2051 "type": "connection_init",
2052 "payload": {}
2053 });
2054 websocket.send(init_json.to_string().into()).await?;
2055 let text = websocket
2056 .next()
2057 .await
2058 .context("Failed to establish connection")??
2059 .into_text()?;
2060 ensure!(
2061 text == "{\"type\":\"connection_ack\"}",
2062 "Unexpected response: {text}"
2063 );
2064 let query_json = json!({
2065 "id": "1",
2066 "type": "start",
2067 "payload": {
2068 "query": query,
2069 "variables": {},
2070 "operationName": null
2071 }
2072 });
2073 websocket.send(query_json.to_string().into()).await?;
2074 Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
2075 |message| async {
2076 let text = message.into_text()?;
2077 let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
2078 if let Some(errors) = value["payload"].get("errors") {
2079 bail!("Notification subscription failed: {errors:?}");
2080 }
2081 serde_json::from_value(value["payload"]["data"]["notifications"].clone())
2082 .context("Failed to deserialize notification")
2083 },
2084 )))
2085 }
2086
2087 pub async fn query_result(
2089 &self,
2090 name: &str,
2091 chain_id: ChainId,
2092 application_id: &ApplicationId,
2093 ) -> Result<Pin<Box<impl Stream<Item = Result<Value>>>>> {
2094 let query = format!(
2095 r#"subscription {{ queryResult(name: "{name}", chainId: "{chain_id}", applicationId: "{application_id}") }}"#,
2096 );
2097 let url = format!("ws://localhost:{}/ws", self.port);
2098 let mut request = url.into_client_request()?;
2099 request.headers_mut().insert(
2100 "Sec-WebSocket-Protocol",
2101 HeaderValue::from_str("graphql-transport-ws")?,
2102 );
2103 let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
2104 let init_json = json!({
2105 "type": "connection_init",
2106 "payload": {}
2107 });
2108 websocket.send(init_json.to_string().into()).await?;
2109 let text = websocket
2110 .next()
2111 .await
2112 .context("Failed to establish connection")??
2113 .into_text()?;
2114 ensure!(
2115 text == "{\"type\":\"connection_ack\"}",
2116 "Unexpected response: {text}"
2117 );
2118 let query_json = json!({
2119 "id": "1",
2120 "type": "start",
2121 "payload": {
2122 "query": query,
2123 "variables": {},
2124 "operationName": null
2125 }
2126 });
2127 websocket.send(query_json.to_string().into()).await?;
2128 Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
2129 |message| async {
2130 let text = message.into_text()?;
2131 let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
2132 if let Some(errors) = value["payload"].get("errors") {
2133 bail!("Query result subscription failed: {errors:?}");
2134 }
2135 Ok(value["payload"]["data"]["queryResult"].clone())
2136 },
2137 )))
2138 }
2139}
2140
2141pub struct FaucetService {
2143 port: u16,
2144 child: Child,
2145 _temp_dir: tempfile::TempDir,
2146 terminated: bool,
2147}
2148
2149impl Drop for FaucetService {
2150 fn drop(&mut self) {
2151 if !self.terminated {
2152 log_unexpected_exit(&mut self.child, "faucet", self.port);
2153 }
2154 }
2155}
2156
2157impl FaucetService {
2158 fn new(port: u16, child: Child, temp_dir: tempfile::TempDir) -> Self {
2159 Self {
2160 port,
2161 child,
2162 _temp_dir: temp_dir,
2163 terminated: false,
2164 }
2165 }
2166
2167 pub async fn terminate(mut self) -> Result<()> {
2169 self.terminated = true;
2170 self.child
2171 .kill()
2172 .await
2173 .context("terminating faucet service")
2174 }
2175
2176 pub fn ensure_is_running(&mut self) -> Result<()> {
2178 self.child.ensure_is_running()
2179 }
2180
2181 pub fn instance(&self) -> Faucet {
2183 Faucet::new(format!("http://localhost:{}/", self.port))
2184 }
2185}
2186
2187pub struct ApplicationWrapper<A> {
2189 uri: String,
2190 _phantom: PhantomData<A>,
2191}
2192
2193impl<A> ApplicationWrapper<A> {
2194 pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
2196 let query = query.as_ref();
2197 let value = self.run_json_query(json!({ "query": query })).await?;
2198 Ok(value["data"].clone())
2199 }
2200
2201 pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
2203 const MAX_RETRIES: usize = 5;
2204
2205 for i in 0.. {
2206 let client = reqwest_client();
2207 let result = client.post(&self.uri).json(&query).send().await;
2208 let response = match result {
2209 Ok(response) => response,
2210 Err(error) if i < MAX_RETRIES => {
2211 tracing::warn!(
2212 "Failed to post query \"{}\": {error}; retrying",
2213 truncate_query_output_serialize(&query),
2214 );
2215 continue;
2216 }
2217 Err(error) => {
2218 let query = truncate_query_output_serialize(&query);
2219 return Err(error)
2220 .with_context(|| format!("run_json_query: failed to post query={query}"));
2221 }
2222 };
2223 ensure!(
2224 response.status().is_success(),
2225 "Query \"{}\" failed: {}",
2226 truncate_query_output_serialize(&query),
2227 response
2228 .text()
2229 .await
2230 .unwrap_or_else(|error| format!("Could not get response text: {error}"))
2231 );
2232 let value: Value = response.json().await.context("invalid JSON")?;
2233 if let Some(errors) = value.get("errors") {
2234 bail!(
2235 "Query \"{}\" failed: {}",
2236 truncate_query_output_serialize(&query),
2237 errors
2238 );
2239 }
2240 return Ok(value);
2241 }
2242 unreachable!()
2243 }
2244
2245 pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
2247 let query = query.as_ref();
2248 self.run_graphql_query(&format!("query {{ {query} }}"))
2249 .await
2250 }
2251
2252 pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
2254 let query = query.as_ref().trim();
2255 let name = query
2256 .split_once(|ch: char| !ch.is_alphanumeric())
2257 .map_or(query, |(name, _)| name);
2258 let data = self.query(query).await?;
2259 serde_json::from_value(data[name].clone())
2260 .with_context(|| format!("{name} field missing in response"))
2261 }
2262
2263 pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
2265 let mutation = mutation.as_ref();
2266 self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
2267 .await
2268 }
2269
2270 pub async fn multiple_mutate(&self, mutations: &[String]) -> Result<Value> {
2272 let mut out = String::from("mutation {\n");
2273 for (index, mutation) in mutations.iter().enumerate() {
2274 out = format!("{out} u{index}: {mutation}\n");
2275 }
2276 out.push_str("}\n");
2277 self.run_graphql_query(&out).await
2278 }
2279}
2280
2281impl<A> From<String> for ApplicationWrapper<A> {
2282 fn from(uri: String) -> ApplicationWrapper<A> {
2283 ApplicationWrapper {
2284 uri,
2285 _phantom: PhantomData,
2286 }
2287 }
2288}
2289
2290#[cfg(with_testing)]
2293fn notification_timeout() -> Duration {
2294 const NOTIFICATION_TIMEOUT_MS_ENV: &str = "LINERA_TEST_NOTIFICATION_TIMEOUT_MS";
2295 const NOTIFICATION_TIMEOUT_MS_DEFAULT: u64 = 10_000;
2296
2297 match env::var(NOTIFICATION_TIMEOUT_MS_ENV) {
2298 Ok(var) => Duration::from_millis(var.parse().unwrap_or_else(|error| {
2299 panic!("{NOTIFICATION_TIMEOUT_MS_ENV} is not a valid number: {error}")
2300 })),
2301 Err(env::VarError::NotPresent) => Duration::from_millis(NOTIFICATION_TIMEOUT_MS_DEFAULT),
2302 Err(env::VarError::NotUnicode(_)) => {
2303 panic!("{NOTIFICATION_TIMEOUT_MS_ENV} must be valid Unicode")
2304 }
2305 }
2306}
2307
2308#[cfg(with_testing)]
2310pub trait NotificationsExt {
2311 fn wait_for<T>(
2313 &mut self,
2314 f: impl FnMut(Notification) -> Option<T>,
2315 ) -> impl Future<Output = Result<T>>;
2316
2317 fn wait_for_events(
2320 &mut self,
2321 expected_height: impl Into<Option<BlockHeight>>,
2322 ) -> impl Future<Output = Result<BTreeSet<StreamId>>> {
2323 let expected_height = expected_height.into();
2324 self.wait_for(move |notification| {
2325 if let Reason::NewEvents {
2326 height,
2327 event_streams,
2328 ..
2329 } = notification.reason
2330 {
2331 if expected_height.is_none_or(|h| h == height) {
2332 return Some(event_streams);
2333 }
2334 }
2335 None
2336 })
2337 }
2338
2339 fn wait_for_block(
2342 &mut self,
2343 expected_height: impl Into<Option<BlockHeight>>,
2344 ) -> impl Future<Output = Result<CryptoHash>> {
2345 let expected_height = expected_height.into();
2346 self.wait_for(move |notification| {
2347 if let Reason::NewBlock { height, hash, .. } = notification.reason {
2348 if expected_height.is_none_or(|h| h == height) {
2349 return Some(hash);
2350 }
2351 }
2352 None
2353 })
2354 }
2355
2356 fn wait_for_bundle(
2359 &mut self,
2360 expected_origin: ChainId,
2361 expected_height: impl Into<Option<BlockHeight>>,
2362 ) -> impl Future<Output = Result<()>> {
2363 let expected_height = expected_height.into();
2364 self.wait_for(move |notification| {
2365 if let Reason::NewIncomingBundle { height, origin } = notification.reason {
2366 if expected_height.is_none_or(|h| h == height) && origin == expected_origin {
2367 return Some(());
2368 }
2369 }
2370 None
2371 })
2372 }
2373}
2374
2375#[cfg(with_testing)]
2376impl<S: Stream<Item = Result<Notification>>> NotificationsExt for Pin<Box<S>> {
2377 async fn wait_for<T>(&mut self, mut f: impl FnMut(Notification) -> Option<T>) -> Result<T> {
2378 let mut timeout = Box::pin(linera_base::time::timer::sleep(notification_timeout())).fuse();
2379 loop {
2380 let notification = futures::select! {
2381 () = timeout => bail!("Timeout waiting for notification"),
2382 notification = self.next().fuse() => notification.context("Stream closed")??,
2383 };
2384 if let Some(t) = f(notification) {
2385 return Ok(t);
2386 }
2387 }
2388 }
2389}