1use std::{
5 borrow::Cow,
6 collections::BTreeMap,
7 env,
8 marker::PhantomData,
9 mem,
10 path::{Path, PathBuf},
11 pin::Pin,
12 process::Stdio,
13 str::FromStr,
14 sync,
15 time::Duration,
16};
17
18use anyhow::{bail, ensure, Context, Result};
19use async_graphql::InputType;
20use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
21use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
22use heck::ToKebabCase;
23use linera_base::{
24 abi::ContractAbi,
25 command::{resolve_binary, CommandExt},
26 crypto::{CryptoHash, InMemorySigner},
27 data_types::{Amount, ApplicationPermissions, BlockHeight, Bytecode, Epoch},
28 identifiers::{
29 Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
30 },
31 vm::VmRuntime,
32};
33use linera_client::client_options::ResourceControlPolicyConfig;
34use linera_core::worker::Notification;
35use linera_faucet_client::Faucet;
36use serde::{de::DeserializeOwned, ser::Serialize};
37use serde_command_opts::to_args;
38use serde_json::{json, Value};
39use tempfile::TempDir;
40use tokio::{
41 io::{AsyncBufReadExt, BufReader},
42 process::{Child, Command},
43 sync::oneshot,
44 task::JoinHandle,
45};
46#[cfg(with_testing)]
47use {
48 futures::FutureExt as _,
49 linera_core::worker::Reason,
50 std::{collections::BTreeSet, future::Future},
51};
52
53use crate::{
54 cli::command::{BenchmarkCommand, ResourceControlPolicyOverrides},
55 cli_wrappers::{
56 local_net::{PathProvider, ProcessInbox},
57 Network,
58 },
59 util::{self, ChildExt},
60 Wallet,
61};
62
63const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
66
67fn reqwest_client() -> reqwest::Client {
68 reqwest::ClientBuilder::new()
69 .timeout(Duration::from_secs(60))
70 .build()
71 .unwrap()
72}
73
74pub struct ClientWrapper {
76 binary_path: sync::Mutex<Option<PathBuf>>,
77 testing_prng_seed: Option<u64>,
78 storage: String,
79 wallet: String,
80 keystore: String,
81 max_pending_message_bundles: usize,
82 network: Network,
83 pub path_provider: PathProvider,
84 on_drop: OnClientDrop,
85 extra_args: Vec<String>,
86}
87
88#[derive(Clone, Copy, Debug, Eq, PartialEq)]
90pub enum OnClientDrop {
91 CloseChains,
93 LeakChains,
95}
96
97impl ClientWrapper {
98 pub fn new(
99 path_provider: PathProvider,
100 network: Network,
101 testing_prng_seed: Option<u64>,
102 id: usize,
103 on_drop: OnClientDrop,
104 ) -> Self {
105 Self::new_with_extra_args(
106 path_provider,
107 network,
108 testing_prng_seed,
109 id,
110 on_drop,
111 vec!["--wait-for-outgoing-messages".to_string()],
112 None,
113 )
114 }
115
116 pub fn new_with_extra_args(
117 path_provider: PathProvider,
118 network: Network,
119 testing_prng_seed: Option<u64>,
120 id: usize,
121 on_drop: OnClientDrop,
122 extra_args: Vec<String>,
123 binary_dir: Option<PathBuf>,
124 ) -> Self {
125 let storage = format!(
126 "rocksdb:{}/client_{}.db",
127 path_provider.path().display(),
128 id
129 );
130 let wallet = format!("wallet_{id}.json");
131 let keystore = format!("keystore_{id}.json");
132 let binary_path = binary_dir.map(|dir| dir.join("linera"));
133 Self {
134 binary_path: sync::Mutex::new(binary_path),
135 testing_prng_seed,
136 storage,
137 wallet,
138 keystore,
139 max_pending_message_bundles: 10_000,
140 network,
141 path_provider,
142 on_drop,
143 extra_args,
144 }
145 }
146
147 pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
149 let tmp = TempDir::new()?;
150 let mut command = self.command().await?;
151 command
152 .current_dir(tmp.path())
153 .arg("project")
154 .arg("new")
155 .arg(project_name)
156 .arg("--linera-root")
157 .arg(linera_root)
158 .spawn_and_wait_for_stdout()
159 .await?;
160 Ok(tmp)
161 }
162
163 pub async fn project_publish<T: Serialize>(
165 &self,
166 path: PathBuf,
167 required_application_ids: Vec<String>,
168 publisher: impl Into<Option<ChainId>>,
169 argument: &T,
170 ) -> Result<String> {
171 let json_parameters = serde_json::to_string(&())?;
172 let json_argument = serde_json::to_string(argument)?;
173 let mut command = self.command().await?;
174 command
175 .arg("project")
176 .arg("publish-and-create")
177 .arg(path)
178 .args(publisher.into().iter().map(ChainId::to_string))
179 .args(["--json-parameters", &json_parameters])
180 .args(["--json-argument", &json_argument]);
181 if !required_application_ids.is_empty() {
182 command.arg("--required-application-ids");
183 command.args(required_application_ids);
184 }
185 let stdout = command.spawn_and_wait_for_stdout().await?;
186 Ok(stdout.trim().to_string())
187 }
188
189 pub async fn project_test(&self, path: &Path) -> Result<()> {
191 self.command()
192 .await
193 .context("failed to create project test command")?
194 .current_dir(path)
195 .arg("project")
196 .arg("test")
197 .spawn_and_wait_for_stdout()
198 .await?;
199 Ok(())
200 }
201
202 async fn command_with_envs_and_arguments(
203 &self,
204 envs: &[(&str, &str)],
205 arguments: impl IntoIterator<Item = Cow<'_, str>>,
206 ) -> Result<Command> {
207 let mut command = self.command_binary().await?;
208 command.current_dir(self.path_provider.path());
209 for (key, value) in envs {
210 command.env(key, value);
211 }
212 for argument in arguments {
213 command.arg(&*argument);
214 }
215 Ok(command)
216 }
217
218 async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
219 self.command_with_envs_and_arguments(envs, self.command_arguments())
220 .await
221 }
222
223 async fn command_with_arguments(
224 &self,
225 arguments: impl IntoIterator<Item = Cow<'_, str>>,
226 ) -> Result<Command> {
227 self.command_with_envs_and_arguments(
228 &[(
229 "RUST_LOG",
230 &std::env::var("RUST_LOG").unwrap_or_else(|_| String::from("linera=debug")),
231 )],
232 arguments,
233 )
234 .await
235 }
236
237 async fn command(&self) -> Result<Command> {
238 self.command_with_envs(&[(
239 "RUST_LOG",
240 &std::env::var("RUST_LOG").unwrap_or_else(|_| String::from("linera=debug")),
241 )])
242 .await
243 }
244
245 fn required_command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
246 [
247 "--wallet".into(),
248 self.wallet.as_str().into(),
249 "--keystore".into(),
250 self.keystore.as_str().into(),
251 "--storage".into(),
252 self.storage.as_str().into(),
253 "--send-timeout-ms".into(),
254 "500000".into(),
255 "--recv-timeout-ms".into(),
256 "500000".into(),
257 ]
258 .into_iter()
259 .chain(self.extra_args.iter().map(|s| s.as_str().into()))
260 }
261
262 fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
264 self.required_command_arguments().chain([
265 "--max-pending-message-bundles".into(),
266 self.max_pending_message_bundles.to_string().into(),
267 ])
268 }
269
270 async fn command_binary(&self) -> Result<Command> {
274 match self.command_with_cached_binary_path() {
275 Some(command) => Ok(command),
276 None => {
277 let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
278 let command = Command::new(&resolved_path);
279
280 self.set_cached_binary_path(resolved_path);
281
282 Ok(command)
283 }
284 }
285 }
286
287 fn command_with_cached_binary_path(&self) -> Option<Command> {
289 let binary_path = self.binary_path.lock().unwrap();
290
291 binary_path.as_ref().map(Command::new)
292 }
293
294 fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
302 let mut binary_path = self.binary_path.lock().unwrap();
303
304 if binary_path.is_none() {
305 *binary_path = Some(new_binary_path);
306 } else {
307 assert_eq!(*binary_path, Some(new_binary_path));
308 }
309 }
310
311 pub async fn create_genesis_config(
313 &self,
314 num_other_initial_chains: u32,
315 initial_funding: Amount,
316 policy_config: ResourceControlPolicyConfig,
317 http_allow_list: Option<Vec<String>>,
318 ) -> Result<()> {
319 let mut command = self.command().await?;
320 command
321 .args([
322 "create-genesis-config",
323 &num_other_initial_chains.to_string(),
324 ])
325 .args(["--initial-funding", &initial_funding.to_string()])
326 .args(["--committee", "committee.json"])
327 .args(["--genesis", "genesis.json"])
328 .args([
329 "--policy-config",
330 &policy_config.to_string().to_kebab_case(),
331 ]);
332 if let Some(allow_list) = http_allow_list {
333 command
334 .arg("--http-request-allow-list")
335 .arg(allow_list.join(","));
336 }
337 if let Some(seed) = self.testing_prng_seed {
338 command.arg("--testing-prng-seed").arg(seed.to_string());
339 }
340 command.spawn_and_wait_for_stdout().await?;
341 Ok(())
342 }
343
344 pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
347 let mut command = self.command().await?;
348 command.args(["wallet", "init"]);
349 match faucet {
350 None => command.args(["--genesis", "genesis.json"]),
351 Some(faucet) => command.args(["--faucet", faucet.url()]),
352 };
353 if let Some(seed) = self.testing_prng_seed {
354 command.arg("--testing-prng-seed").arg(seed.to_string());
355 }
356 command.spawn_and_wait_for_stdout().await?;
357 Ok(())
358 }
359
360 pub async fn request_chain(
362 &self,
363 faucet: &Faucet,
364 set_default: bool,
365 ) -> Result<(ChainId, AccountOwner)> {
366 let mut command = self.command().await?;
367 command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
368 if set_default {
369 command.arg("--set-default");
370 }
371 let stdout = command.spawn_and_wait_for_stdout().await?;
372 let mut lines = stdout.split_whitespace();
373 let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
374 let owner = lines.next().context("missing chain owner")?.parse()?;
375 Ok((chain_id, owner))
376 }
377
378 #[expect(clippy::too_many_arguments)]
380 pub async fn publish_and_create<
381 A: ContractAbi,
382 Parameters: Serialize,
383 InstantiationArgument: Serialize,
384 >(
385 &self,
386 contract: PathBuf,
387 service: PathBuf,
388 vm_runtime: VmRuntime,
389 parameters: &Parameters,
390 argument: &InstantiationArgument,
391 required_application_ids: &[ApplicationId],
392 publisher: impl Into<Option<ChainId>>,
393 ) -> Result<ApplicationId<A>> {
394 let json_parameters = serde_json::to_string(parameters)?;
395 let json_argument = serde_json::to_string(argument)?;
396 let mut command = self.command().await?;
397 let vm_runtime = format!("{vm_runtime}");
398 command
399 .arg("publish-and-create")
400 .args([contract, service])
401 .args(["--vm-runtime", &vm_runtime.to_lowercase()])
402 .args(publisher.into().iter().map(ChainId::to_string))
403 .args(["--json-parameters", &json_parameters])
404 .args(["--json-argument", &json_argument]);
405 if !required_application_ids.is_empty() {
406 command.arg("--required-application-ids");
407 command.args(
408 required_application_ids
409 .iter()
410 .map(ApplicationId::to_string),
411 );
412 }
413 let stdout = command.spawn_and_wait_for_stdout().await?;
414 Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
415 }
416
417 pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
419 &self,
420 contract: PathBuf,
421 service: PathBuf,
422 vm_runtime: VmRuntime,
423 publisher: impl Into<Option<ChainId>>,
424 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
425 self.publish_module_with_formats(contract, service, None, vm_runtime, publisher)
426 .await
427 }
428
429 pub async fn publish_module_with_formats<Abi, Parameters, InstantiationArgument>(
431 &self,
432 contract: PathBuf,
433 service: PathBuf,
434 formats: Option<PathBuf>,
435 vm_runtime: VmRuntime,
436 publisher: impl Into<Option<ChainId>>,
437 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
438 let mut command = self.command().await?;
439 command
440 .arg("publish-module")
441 .args([contract, service])
442 .args(["--vm-runtime", &format!("{vm_runtime}").to_lowercase()]);
443 if let Some(formats_path) = formats {
444 command.arg("--formats").arg(formats_path);
445 }
446 let stdout = command
447 .args(publisher.into().iter().map(ChainId::to_string))
448 .spawn_and_wait_for_stdout()
449 .await?;
450 let module_id: ModuleId = stdout.trim().parse()?;
451 Ok(module_id.with_abi())
452 }
453
454 pub async fn create_application<
456 Abi: ContractAbi,
457 Parameters: Serialize,
458 InstantiationArgument: Serialize,
459 >(
460 &self,
461 module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
462 parameters: &Parameters,
463 argument: &InstantiationArgument,
464 required_application_ids: &[ApplicationId],
465 creator: impl Into<Option<ChainId>>,
466 ) -> Result<ApplicationId<Abi>> {
467 let json_parameters = serde_json::to_string(parameters)?;
468 let json_argument = serde_json::to_string(argument)?;
469 let mut command = self.command().await?;
470 command
471 .arg("create-application")
472 .arg(module_id.forget_abi().to_string())
473 .args(["--json-parameters", &json_parameters])
474 .args(["--json-argument", &json_argument])
475 .args(creator.into().iter().map(ChainId::to_string));
476 if !required_application_ids.is_empty() {
477 command.arg("--required-application-ids");
478 command.args(
479 required_application_ids
480 .iter()
481 .map(ApplicationId::to_string),
482 );
483 }
484 let stdout = command.spawn_and_wait_for_stdout().await?;
485 Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
486 }
487
488 pub async fn run_node_service(
490 &self,
491 port: impl Into<Option<u16>>,
492 process_inbox: ProcessInbox,
493 ) -> Result<NodeService> {
494 self.run_node_service_with_options(port, process_inbox, &[], &[], false)
495 .await
496 }
497
498 pub async fn run_node_service_with_options(
500 &self,
501 port: impl Into<Option<u16>>,
502 process_inbox: ProcessInbox,
503 operator_application_ids: &[ApplicationId],
504 operators: &[(String, PathBuf)],
505 read_only: bool,
506 ) -> Result<NodeService> {
507 self.run_node_service_with_all_options(
508 port,
509 process_inbox,
510 operator_application_ids,
511 operators,
512 read_only,
513 &[],
514 &[],
515 )
516 .await
517 }
518
519 #[expect(clippy::too_many_arguments)]
521 pub async fn run_node_service_with_all_options(
522 &self,
523 port: impl Into<Option<u16>>,
524 process_inbox: ProcessInbox,
525 operator_application_ids: &[ApplicationId],
526 operators: &[(String, PathBuf)],
527 read_only: bool,
528 allowed_subscriptions: &[String],
529 subscription_ttls: &[(String, u64)],
530 ) -> Result<NodeService> {
531 let port = port.into().unwrap_or(8080);
532 let mut command = self.command().await?;
533 command.arg("service");
534 if let ProcessInbox::Skip = process_inbox {
535 command.arg("--listener-skip-process-inbox");
536 }
537 if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
538 command.args(var.split_whitespace());
539 }
540 for app_id in operator_application_ids {
541 command.args(["--operator-application-ids", &app_id.to_string()]);
542 }
543 for (name, path) in operators {
544 command.args(["--operators", &format!("{}={}", name, path.display())]);
545 }
546 if read_only {
547 command.arg("--read-only");
548 }
549 for query in allowed_subscriptions {
550 command.args(["--allow-subscription", query]);
551 }
552 for (name, secs) in subscription_ttls {
553 command.args(["--subscription-ttl-secs", &format!("{name}={secs}")]);
554 }
555 let child = command
556 .args(["--port".to_string(), port.to_string()])
557 .spawn_into()?;
558 let client = reqwest_client();
559 for i in 0..10 {
560 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
561 let request = client.get(format!("http://localhost:{port}/")).send().await;
562 if request.is_ok() {
563 tracing::info!("Node service has started");
564 return Ok(NodeService::new(port, child));
565 } else {
566 tracing::warn!("Waiting for node service to start");
567 }
568 }
569 bail!("Failed to start node service");
570 }
571
572 pub async fn run_node_service_with_controller(
574 &self,
575 port: impl Into<Option<u16>>,
576 process_inbox: ProcessInbox,
577 controller_id: &ApplicationId,
578 operators: &[(String, PathBuf)],
579 ) -> Result<NodeService> {
580 let port = port.into().unwrap_or(8080);
581 let mut command = self.command().await?;
582 command.arg("service");
583 if let ProcessInbox::Skip = process_inbox {
584 command.arg("--listener-skip-process-inbox");
585 }
586 if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
587 command.args(var.split_whitespace());
588 }
589 command.args(["--controller-id", &controller_id.to_string()]);
590 for (name, path) in operators {
591 command.args(["--operators", &format!("{}={}", name, path.display())]);
592 }
593 let child = command
594 .args(["--port".to_string(), port.to_string()])
595 .spawn_into()?;
596 let client = reqwest_client();
597 for i in 0..10 {
598 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
599 let request = client.get(format!("http://localhost:{port}/")).send().await;
600 if request.is_ok() {
601 tracing::info!("Node service has started");
602 return Ok(NodeService::new(port, child));
603 } else {
604 tracing::warn!("Waiting for node service to start");
605 }
606 }
607 bail!("Failed to start node service");
608 }
609
610 pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
612 let mut command = self.command().await?;
613 command.arg("validator").arg("query").arg(address);
614 let stdout = command.spawn_and_wait_for_stdout().await?;
615
616 let hash = stdout
619 .lines()
620 .find_map(|line| {
621 line.strip_prefix("Genesis config hash: ")
622 .and_then(|hash_str| hash_str.trim().parse().ok())
623 })
624 .context("error while parsing the result of `linera validator query`")?;
625 Ok(hash)
626 }
627
628 pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
630 let mut command = self.command().await?;
631 command.arg("validator").arg("list");
632 if let Some(chain_id) = chain_id {
633 command.args(["--chain-id", &chain_id.to_string()]);
634 }
635 command.spawn_and_wait_for_stdout().await?;
636 Ok(())
637 }
638
639 pub async fn sync_validator(
641 &self,
642 chain_ids: impl IntoIterator<Item = &ChainId>,
643 validator_address: impl Into<String>,
644 ) -> Result<()> {
645 let mut command = self.command().await?;
646 command
647 .arg("validator")
648 .arg("sync")
649 .arg(validator_address.into());
650 let mut chain_ids = chain_ids.into_iter().peekable();
651 if chain_ids.peek().is_some() {
652 command
653 .arg("--chains")
654 .args(chain_ids.map(ChainId::to_string));
655 }
656 command.spawn_and_wait_for_stdout().await?;
657 Ok(())
658 }
659
660 pub async fn run_faucet(
662 &self,
663 port: impl Into<Option<u16>>,
664 chain_id: Option<ChainId>,
665 amount: Amount,
666 ) -> Result<FaucetService> {
667 let port = port.into().unwrap_or(8080);
668 let temp_dir = tempfile::tempdir()
669 .context("Failed to create temporary directory for faucet storage")?;
670 let storage_path = temp_dir.path().join("faucet_storage.sqlite");
671 let mut command = self.command().await?;
672 let command = command
673 .arg("faucet")
674 .args(["--port".to_string(), port.to_string()])
675 .args(["--amount".to_string(), amount.to_string()])
676 .args([
677 "--storage-path".to_string(),
678 storage_path.to_string_lossy().to_string(),
679 ]);
680 if let Some(chain_id) = chain_id {
681 command.arg(chain_id.to_string());
682 }
683 let child = command.spawn_into()?;
684 let client = reqwest_client();
685 for i in 0..10 {
686 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
687 let request = client.get(format!("http://localhost:{port}/")).send().await;
688 if request.is_ok() {
689 tracing::info!("Faucet has started");
690 return Ok(FaucetService::new(port, child, temp_dir));
691 } else {
692 tracing::debug!("Waiting for faucet to start");
693 }
694 }
695 bail!("Failed to start faucet");
696 }
697
698 pub async fn local_balance(&self, account: Account) -> Result<Amount> {
700 let stdout = self
701 .command()
702 .await?
703 .arg("local-balance")
704 .arg(account.to_string())
705 .spawn_and_wait_for_stdout()
706 .await?;
707 let amount = stdout
708 .trim()
709 .parse()
710 .context("error while parsing the result of `linera local-balance`")?;
711 Ok(amount)
712 }
713
714 pub async fn query_balance(&self, account: Account) -> Result<Amount> {
716 let stdout = self
717 .command()
718 .await?
719 .arg("query-balance")
720 .arg(account.to_string())
721 .spawn_and_wait_for_stdout()
722 .await?;
723 let amount = stdout
724 .trim()
725 .parse()
726 .context("error while parsing the result of `linera query-balance`")?;
727 Ok(amount)
728 }
729
730 pub async fn query_application_json<T: DeserializeOwned>(
732 &self,
733 chain_id: ChainId,
734 application_id: ApplicationId,
735 query: impl AsRef<str>,
736 ) -> Result<T> {
737 let query = query.as_ref().trim();
738 let name = query
739 .split_once(|ch: char| !ch.is_alphanumeric())
740 .map_or(query, |(name, _)| name);
741 let stdout = self
742 .command()
743 .await?
744 .arg("query-application")
745 .arg("--chain-id")
746 .arg(chain_id.to_string())
747 .arg("--application-id")
748 .arg(application_id.to_string())
749 .arg(query)
750 .spawn_and_wait_for_stdout()
751 .await?;
752 let data: serde_json::Value =
753 serde_json::from_str(stdout.trim()).context("invalid JSON from query-application")?;
754 serde_json::from_value(data[name].clone())
755 .with_context(|| format!("{name} field missing in query-application response"))
756 }
757
758 pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
760 self.command()
761 .await?
762 .arg("sync")
763 .arg(chain_id.to_string())
764 .spawn_and_wait_for_stdout()
765 .await?;
766 Ok(())
767 }
768
769 pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
771 self.command()
772 .await?
773 .arg("process-inbox")
774 .arg(chain_id.to_string())
775 .spawn_and_wait_for_stdout()
776 .await?;
777 Ok(())
778 }
779
780 pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
782 self.command()
783 .await?
784 .arg("transfer")
785 .arg(amount.to_string())
786 .args(["--from", &from.to_string()])
787 .args(["--to", &to.to_string()])
788 .spawn_and_wait_for_stdout()
789 .await?;
790 Ok(())
791 }
792
793 pub async fn transfer_with_silent_logs(
795 &self,
796 amount: Amount,
797 from: ChainId,
798 to: ChainId,
799 ) -> Result<()> {
800 self.command()
801 .await?
802 .env("RUST_LOG", "off")
803 .arg("transfer")
804 .arg(amount.to_string())
805 .args(["--from", &from.to_string()])
806 .args(["--to", &to.to_string()])
807 .spawn_and_wait_for_stdout()
808 .await?;
809 Ok(())
810 }
811
812 pub async fn transfer_with_accounts(
814 &self,
815 amount: Amount,
816 from: Account,
817 to: Account,
818 ) -> Result<()> {
819 self.command()
820 .await?
821 .arg("transfer")
822 .arg(amount.to_string())
823 .args(["--from", &from.to_string()])
824 .args(["--to", &to.to_string()])
825 .spawn_and_wait_for_stdout()
826 .await?;
827 Ok(())
828 }
829
830 fn benchmark_command_internal(command: &mut Command, args: &BenchmarkCommand) -> Result<()> {
831 let mut formatted_args = to_args(&args)?;
832 let subcommand = formatted_args.remove(0);
833 formatted_args.remove(0);
836 let options = formatted_args
837 .chunks_exact(2)
838 .flat_map(|pair| {
839 let option = format!("--{}", pair[0]);
840 match pair[1].as_str() {
841 "true" => vec![option],
842 "false" => vec![],
843 _ => vec![option, pair[1].clone()],
844 }
845 })
846 .collect::<Vec<_>>();
847 command
848 .args([
849 "--max-pending-message-bundles",
850 &args.transactions_per_block().to_string(),
851 ])
852 .arg("benchmark")
853 .arg(subcommand)
854 .args(options);
855 Ok(())
856 }
857
858 async fn benchmark_command_with_envs(
859 &self,
860 args: BenchmarkCommand,
861 envs: &[(&str, &str)],
862 ) -> Result<Command> {
863 let mut command = self
864 .command_with_envs_and_arguments(envs, self.required_command_arguments())
865 .await?;
866 Self::benchmark_command_internal(&mut command, &args)?;
867 Ok(command)
868 }
869
870 async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
871 let mut command = self
872 .command_with_arguments(self.required_command_arguments())
873 .await?;
874 Self::benchmark_command_internal(&mut command, &args)?;
875 Ok(command)
876 }
877
878 pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
880 let mut command = self.benchmark_command(args).await?;
881 command.spawn_and_wait_for_stdout().await?;
882 Ok(())
883 }
884
885 pub async fn benchmark_detached(
888 &self,
889 args: BenchmarkCommand,
890 tx: oneshot::Sender<()>,
891 ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
892 let mut child = self
893 .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
894 .await?
895 .kill_on_drop(true)
896 .stdin(Stdio::piped())
897 .stdout(Stdio::piped())
898 .stderr(Stdio::piped())
899 .spawn()?;
900
901 let pid = child.id().expect("failed to get pid");
902 let stdout = child.stdout.take().expect("stdout not open");
903 let stdout_handle = tokio::spawn(async move {
904 let mut lines = BufReader::new(stdout).lines();
905 while let Ok(Some(line)) = lines.next_line().await {
906 println!("benchmark{{pid={pid}}} {line}");
907 }
908 });
909
910 let stderr = child.stderr.take().expect("stderr not open");
911 let stderr_handle = tokio::spawn(async move {
912 let mut lines = BufReader::new(stderr).lines();
913 let mut tx = Some(tx);
914 while let Ok(Some(line)) = lines.next_line().await {
915 if line.contains("Ready to start benchmark") {
916 tx.take()
917 .expect("Should only send signal once")
918 .send(())
919 .expect("failed to send ready signal to main thread");
920 } else {
921 println!("benchmark{{pid={pid}}} {line}");
922 }
923 }
924 });
925 Ok((child, stdout_handle, stderr_handle))
926 }
927
928 async fn open_chain_internal(
929 &self,
930 from: ChainId,
931 owner: Option<AccountOwner>,
932 initial_balance: Amount,
933 super_owner: bool,
934 ) -> Result<(ChainId, AccountOwner)> {
935 let mut command = self.command().await?;
936 command
937 .arg("open-chain")
938 .args(["--from", &from.to_string()])
939 .args(["--initial-balance", &initial_balance.to_string()]);
940
941 if let Some(owner) = owner {
942 command.args(["--owner", &owner.to_string()]);
943 }
944
945 if super_owner {
946 command.arg("--super-owner");
947 }
948
949 let stdout = command.spawn_and_wait_for_stdout().await?;
950 let mut split = stdout.split('\n');
951 let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
952 let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
953 if let Some(owner) = owner {
954 assert_eq!(owner, new_owner);
955 }
956 Ok((chain_id, new_owner))
957 }
958
959 pub async fn open_chain_super_owner(
961 &self,
962 from: ChainId,
963 owner: Option<AccountOwner>,
964 initial_balance: Amount,
965 ) -> Result<(ChainId, AccountOwner)> {
966 self.open_chain_internal(from, owner, initial_balance, true)
967 .await
968 }
969
970 pub async fn open_chain(
972 &self,
973 from: ChainId,
974 owner: Option<AccountOwner>,
975 initial_balance: Amount,
976 ) -> Result<(ChainId, AccountOwner)> {
977 self.open_chain_internal(from, owner, initial_balance, false)
978 .await
979 }
980
981 pub async fn open_and_assign(
983 &self,
984 client: &ClientWrapper,
985 initial_balance: Amount,
986 ) -> Result<ChainId> {
987 let our_chain = self
988 .load_wallet()?
989 .default_chain()
990 .context("no default chain found")?;
991 let owner = client.keygen().await?;
992 let (new_chain, _) = self
993 .open_chain(our_chain, Some(owner), initial_balance)
994 .await?;
995 client.assign(owner, new_chain).await?;
996 Ok(new_chain)
997 }
998
999 pub async fn open_multi_owner_chain(
1000 &self,
1001 from: ChainId,
1002 owners: BTreeMap<AccountOwner, u64>,
1003 multi_leader_rounds: u32,
1004 balance: Amount,
1005 base_timeout_ms: u64,
1006 ) -> Result<ChainId> {
1007 let mut command = self.command().await?;
1008 command
1009 .arg("open-multi-owner-chain")
1010 .args(["--from", &from.to_string()])
1011 .arg("--owners")
1012 .arg(serde_json::to_string(&owners)?)
1013 .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
1014 command
1015 .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
1016 .args(["--initial-balance", &balance.to_string()]);
1017
1018 let stdout = command.spawn_and_wait_for_stdout().await?;
1019 let mut split = stdout.split('\n');
1020 let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
1021
1022 Ok(chain_id)
1023 }
1024
1025 pub async fn change_ownership(
1026 &self,
1027 chain_id: ChainId,
1028 super_owners: Vec<AccountOwner>,
1029 owners: Vec<AccountOwner>,
1030 ) -> Result<()> {
1031 let mut command = self.command().await?;
1032 command
1033 .arg("change-ownership")
1034 .args(["--chain-id", &chain_id.to_string()]);
1035 command
1036 .arg("--super-owners")
1037 .arg(serde_json::to_string(&super_owners)?);
1038 command.arg("--owners").arg(serde_json::to_string(
1039 &owners
1040 .into_iter()
1041 .zip(std::iter::repeat(100u64))
1042 .collect::<BTreeMap<_, _>>(),
1043 )?);
1044 command.spawn_and_wait_for_stdout().await?;
1045 Ok(())
1046 }
1047
1048 pub async fn change_application_permissions(
1049 &self,
1050 chain_id: ChainId,
1051 application_permissions: ApplicationPermissions,
1052 ) -> Result<()> {
1053 let mut command = self.command().await?;
1054 command
1055 .arg("change-application-permissions")
1056 .args(["--chain-id", &chain_id.to_string()]);
1057 command.arg("--manage-chain").arg(serde_json::to_string(
1058 &application_permissions.manage_chain,
1059 )?);
1060 command.spawn_and_wait_for_stdout().await?;
1062 Ok(())
1063 }
1064
1065 pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
1067 let mut command = self.command().await?;
1068 command
1069 .args(["wallet", "follow-chain"])
1070 .arg(chain_id.to_string());
1071 if sync {
1072 command.arg("--sync");
1073 }
1074 command.spawn_and_wait_for_stdout().await?;
1075 Ok(())
1076 }
1077
1078 pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
1080 let mut command = self.command().await?;
1081 command
1082 .args(["wallet", "forget-chain"])
1083 .arg(chain_id.to_string());
1084 command.spawn_and_wait_for_stdout().await?;
1085 Ok(())
1086 }
1087
1088 pub async fn set_default_chain(&self, chain_id: ChainId) -> Result<()> {
1090 let mut command = self.command().await?;
1091 command
1092 .args(["wallet", "set-default"])
1093 .arg(chain_id.to_string());
1094 command.spawn_and_wait_for_stdout().await?;
1095 Ok(())
1096 }
1097
1098 pub async fn retry_pending_block(
1099 &self,
1100 chain_id: Option<ChainId>,
1101 ) -> Result<Option<CryptoHash>> {
1102 let mut command = self.command().await?;
1103 command.arg("retry-pending-block");
1104 if let Some(chain_id) = chain_id {
1105 command.arg(chain_id.to_string());
1106 }
1107 let stdout = command.spawn_and_wait_for_stdout().await?;
1108 let stdout = stdout.trim();
1109 if stdout.is_empty() {
1110 Ok(None)
1111 } else {
1112 Ok(Some(CryptoHash::from_str(stdout)?))
1113 }
1114 }
1115
1116 pub async fn publish_data_blob(
1118 &self,
1119 path: &Path,
1120 chain_id: Option<ChainId>,
1121 ) -> Result<CryptoHash> {
1122 let mut command = self.command().await?;
1123 command.arg("publish-data-blob").arg(path);
1124 if let Some(chain_id) = chain_id {
1125 command.arg(chain_id.to_string());
1126 }
1127 let stdout = command.spawn_and_wait_for_stdout().await?;
1128 let stdout = stdout.trim();
1129 Ok(CryptoHash::from_str(stdout)?)
1130 }
1131
1132 pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
1134 let mut command = self.command().await?;
1135 command.arg("read-data-blob").arg(hash.to_string());
1136 if let Some(chain_id) = chain_id {
1137 command.arg(chain_id.to_string());
1138 }
1139 command.spawn_and_wait_for_stdout().await?;
1140 Ok(())
1141 }
1142
1143 pub fn load_wallet(&self) -> Result<Wallet> {
1144 Ok(Wallet::read(&self.wallet_path())?)
1145 }
1146
1147 pub fn load_keystore(&self) -> Result<InMemorySigner> {
1148 util::read_json(self.keystore_path())
1149 }
1150
1151 pub fn wallet_path(&self) -> PathBuf {
1152 self.path_provider.path().join(&self.wallet)
1153 }
1154
1155 pub fn keystore_path(&self) -> PathBuf {
1156 self.path_provider.path().join(&self.keystore)
1157 }
1158
1159 pub fn storage_path(&self) -> &str {
1160 &self.storage
1161 }
1162
1163 pub fn get_owner(&self) -> Option<AccountOwner> {
1164 let wallet = self.load_wallet().ok()?;
1165 wallet
1166 .get(wallet.default_chain()?)
1167 .expect("default chain must be in wallet")
1168 .owner
1169 }
1170
1171 pub fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
1172 self.load_wallet()
1173 .ok()
1174 .is_some_and(|wallet| wallet.get(chain).is_some())
1175 }
1176
1177 pub async fn set_validator(
1178 &self,
1179 validator_key: &(String, String),
1180 port: usize,
1181 votes: usize,
1182 ) -> Result<()> {
1183 let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1184 self.command()
1185 .await?
1186 .arg("validator")
1187 .arg("add")
1188 .args(["--public-key", &validator_key.0])
1189 .args(["--account-key", &validator_key.1])
1190 .args(["--address", &address])
1191 .args(["--votes", &votes.to_string()])
1192 .spawn_and_wait_for_stdout()
1193 .await?;
1194 Ok(())
1195 }
1196
1197 pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
1198 self.command()
1199 .await?
1200 .arg("validator")
1201 .arg("remove")
1202 .args(["--public-key", validator_key])
1203 .spawn_and_wait_for_stdout()
1204 .await?;
1205 Ok(())
1206 }
1207
1208 pub async fn change_validators(
1209 &self,
1210 add_validators: &[(String, String, usize, usize)], modify_validators: &[(String, String, usize, usize)], remove_validators: &[String],
1213 ) -> Result<()> {
1214 use std::str::FromStr;
1215
1216 use linera_base::crypto::{AccountPublicKey, ValidatorPublicKey};
1217
1218 let mut changes = std::collections::HashMap::new();
1221
1222 for (public_key_str, account_key_str, port, votes) in
1224 add_validators.iter().chain(modify_validators.iter())
1225 {
1226 let public_key = ValidatorPublicKey::from_str(public_key_str)
1227 .with_context(|| format!("Invalid validator public key: {public_key_str}"))?;
1228
1229 let account_key = AccountPublicKey::from_str(account_key_str)
1230 .with_context(|| format!("Invalid account public key: {account_key_str}"))?;
1231
1232 let address = format!("{}:127.0.0.1:{}", self.network.short(), port)
1233 .parse()
1234 .unwrap();
1235
1236 let change = crate::cli::validator::Change {
1238 account_key,
1239 address,
1240 votes: crate::cli::validator::Votes(
1241 std::num::NonZero::new(*votes as u64).context("Votes must be non-zero")?,
1242 ),
1243 };
1244
1245 changes.insert(public_key, Some(change));
1246 }
1247
1248 for validator_key_str in remove_validators {
1250 let public_key = ValidatorPublicKey::from_str(validator_key_str)
1251 .with_context(|| format!("Invalid validator public key: {validator_key_str}"))?;
1252 changes.insert(public_key, None);
1253 }
1254
1255 let temp_file = tempfile::NamedTempFile::new()
1257 .context("Failed to create temporary file for validator changes")?;
1258 serde_json::to_writer(&temp_file, &changes)
1259 .context("Failed to write validator changes to file")?;
1260 let temp_path = temp_file.path();
1261
1262 self.command()
1263 .await?
1264 .arg("validator")
1265 .arg("update")
1266 .arg(temp_path)
1267 .arg("--yes") .spawn_and_wait_for_stdout()
1269 .await?;
1270
1271 Ok(())
1272 }
1273
1274 pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
1275 self.command()
1276 .await?
1277 .arg("revoke-epochs")
1278 .arg(epoch.to_string())
1279 .spawn_and_wait_for_stdout()
1280 .await?;
1281 Ok(())
1282 }
1283
1284 pub async fn set_resource_control_policy(
1285 &self,
1286 overrides: ResourceControlPolicyOverrides,
1287 ) -> Result<()> {
1288 let mut command = self.command().await?;
1289 command.arg("resource-control-policy");
1290 let ResourceControlPolicyOverrides {
1291 wasm_fuel_unit,
1292 evm_fuel_unit,
1293 read_operation,
1294 write_operation,
1295 byte_runtime,
1296 byte_read,
1297 byte_written,
1298 blob_read,
1299 blob_published,
1300 blob_byte_read,
1301 blob_byte_published,
1302 operation,
1303 operation_byte,
1304 message,
1305 message_byte,
1306 service_as_oracle_query,
1307 http_request,
1308 maximum_wasm_fuel_per_block,
1309 maximum_evm_fuel_per_block,
1310 maximum_service_oracle_execution_ms,
1311 maximum_block_size,
1312 maximum_blob_size,
1313 maximum_published_blobs,
1314 maximum_bytecode_size,
1315 maximum_block_proposal_size,
1316 maximum_bytes_read_per_block,
1317 maximum_bytes_written_per_block,
1318 maximum_oracle_response_bytes,
1319 maximum_http_response_bytes,
1320 http_request_timeout_ms,
1321 http_request_allow_list,
1322 free_application_ids,
1323 flags,
1324 } = overrides;
1325 if let Some(value) = wasm_fuel_unit {
1326 command.args(["--wasm-fuel-unit", &value.to_string()]);
1327 }
1328 if let Some(value) = evm_fuel_unit {
1329 command.args(["--evm-fuel-unit", &value.to_string()]);
1330 }
1331 if let Some(value) = read_operation {
1332 command.args(["--read-operation", &value.to_string()]);
1333 }
1334 if let Some(value) = write_operation {
1335 command.args(["--write-operation", &value.to_string()]);
1336 }
1337 if let Some(value) = byte_runtime {
1338 command.args(["--byte-runtime", &value.to_string()]);
1339 }
1340 if let Some(value) = byte_read {
1341 command.args(["--byte-read", &value.to_string()]);
1342 }
1343 if let Some(value) = byte_written {
1344 command.args(["--byte-written", &value.to_string()]);
1345 }
1346 if let Some(value) = blob_read {
1347 command.args(["--blob-read", &value.to_string()]);
1348 }
1349 if let Some(value) = blob_published {
1350 command.args(["--blob-published", &value.to_string()]);
1351 }
1352 if let Some(value) = blob_byte_read {
1353 command.args(["--blob-byte-read", &value.to_string()]);
1354 }
1355 if let Some(value) = blob_byte_published {
1356 command.args(["--blob-byte-published", &value.to_string()]);
1357 }
1358 if let Some(value) = operation {
1359 command.args(["--operation", &value.to_string()]);
1360 }
1361 if let Some(value) = operation_byte {
1362 command.args(["--operation-byte", &value.to_string()]);
1363 }
1364 if let Some(value) = message {
1365 command.args(["--message", &value.to_string()]);
1366 }
1367 if let Some(value) = message_byte {
1368 command.args(["--message-byte", &value.to_string()]);
1369 }
1370 if let Some(value) = service_as_oracle_query {
1371 command.args(["--service-as-oracle-query", &value.to_string()]);
1372 }
1373 if let Some(value) = http_request {
1374 command.args(["--http-request", &value.to_string()]);
1375 }
1376 if let Some(value) = maximum_wasm_fuel_per_block {
1377 command.args(["--maximum-wasm-fuel-per-block", &value.to_string()]);
1378 }
1379 if let Some(value) = maximum_evm_fuel_per_block {
1380 command.args(["--maximum-evm-fuel-per-block", &value.to_string()]);
1381 }
1382 if let Some(value) = maximum_service_oracle_execution_ms {
1383 command.args(["--maximum-service-oracle-execution-ms", &value.to_string()]);
1384 }
1385 if let Some(value) = maximum_block_size {
1386 command.args(["--maximum-block-size", &value.to_string()]);
1387 }
1388 if let Some(value) = maximum_blob_size {
1389 command.args(["--maximum-blob-size", &value.to_string()]);
1390 }
1391 if let Some(value) = maximum_published_blobs {
1392 command.args(["--maximum-published-blobs", &value.to_string()]);
1393 }
1394 if let Some(value) = maximum_bytecode_size {
1395 command.args(["--maximum-bytecode-size", &value.to_string()]);
1396 }
1397 if let Some(value) = maximum_block_proposal_size {
1398 command.args(["--maximum-block-proposal-size", &value.to_string()]);
1399 }
1400 if let Some(value) = maximum_bytes_read_per_block {
1401 command.args(["--maximum-bytes-read-per-block", &value.to_string()]);
1402 }
1403 if let Some(value) = maximum_bytes_written_per_block {
1404 command.args(["--maximum-bytes-written-per-block", &value.to_string()]);
1405 }
1406 if let Some(value) = maximum_oracle_response_bytes {
1407 command.args(["--maximum-oracle-response-bytes", &value.to_string()]);
1408 }
1409 if let Some(value) = maximum_http_response_bytes {
1410 command.args(["--maximum-http-response-bytes", &value.to_string()]);
1411 }
1412 if let Some(value) = http_request_timeout_ms {
1413 command.args(["--http-request-timeout-ms", &value.to_string()]);
1414 }
1415 if let Some(values) = http_request_allow_list {
1416 command.args(["--http-request-allow-list", &values.join(",")]);
1417 }
1418 if let Some(values) = free_application_ids {
1419 command.args(["--free-application-ids", &values.join(",")]);
1420 }
1421 if let Some(values) = flags {
1422 command.args(["--flags", &values.join(",")]);
1423 }
1424 command.spawn_and_wait_for_stdout().await?;
1425 Ok(())
1426 }
1427
1428 pub async fn keygen(&self) -> Result<AccountOwner> {
1430 let stdout = self
1431 .command()
1432 .await?
1433 .arg("keygen")
1434 .spawn_and_wait_for_stdout()
1435 .await?;
1436 AccountOwner::from_str(stdout.as_str().trim())
1437 }
1438
1439 pub fn default_chain(&self) -> Option<ChainId> {
1441 self.load_wallet().ok()?.default_chain()
1442 }
1443
1444 pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
1446 let _stdout = self
1447 .command()
1448 .await?
1449 .arg("assign")
1450 .args(["--owner", &owner.to_string()])
1451 .args(["--chain-id", &chain_id.to_string()])
1452 .spawn_and_wait_for_stdout()
1453 .await?;
1454 Ok(())
1455 }
1456
1457 pub async fn set_preferred_owner(
1459 &self,
1460 chain_id: ChainId,
1461 owner: Option<AccountOwner>,
1462 ) -> Result<()> {
1463 let mut owner_arg = vec!["--owner".to_string()];
1464 if let Some(owner) = owner {
1465 owner_arg.push(owner.to_string());
1466 };
1467 self.command()
1468 .await?
1469 .arg("set-preferred-owner")
1470 .args(["--chain-id", &chain_id.to_string()])
1471 .args(owner_arg)
1472 .spawn_and_wait_for_stdout()
1473 .await?;
1474 Ok(())
1475 }
1476
1477 pub async fn build_application(
1478 &self,
1479 path: &Path,
1480 name: &str,
1481 is_workspace: bool,
1482 ) -> Result<(PathBuf, PathBuf)> {
1483 Command::new("cargo")
1484 .current_dir(self.path_provider.path())
1485 .arg("build")
1486 .arg("--release")
1487 .args(["--target", "wasm32-unknown-unknown"])
1488 .arg("--manifest-path")
1489 .arg(path.join("Cargo.toml"))
1490 .spawn_and_wait_for_stdout()
1491 .await?;
1492
1493 let release_dir = match is_workspace {
1494 true => path.join("../target/wasm32-unknown-unknown/release"),
1495 false => path.join("target/wasm32-unknown-unknown/release"),
1496 };
1497
1498 let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1499 let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1500
1501 let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1502 let service_size = fs_err::tokio::metadata(&service).await?.len();
1503 tracing::info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1504
1505 Ok((contract, service))
1506 }
1507}
1508
1509impl Drop for ClientWrapper {
1510 fn drop(&mut self) {
1511 use std::process::Command as SyncCommand;
1512
1513 if self.on_drop != OnClientDrop::CloseChains {
1514 return;
1515 }
1516
1517 let Ok(binary_path) = self.binary_path.lock() else {
1518 tracing::error!(
1519 "Failed to close chains because a thread panicked with a lock to `binary_path`"
1520 );
1521 return;
1522 };
1523
1524 let Some(binary_path) = binary_path.as_ref() else {
1525 tracing::warn!(
1526 "Assuming no chains need to be closed, because the command binary was never \
1527 resolved and therefore presumably never called"
1528 );
1529 return;
1530 };
1531
1532 let working_directory = self.path_provider.path();
1533 let mut wallet_show_command = SyncCommand::new(binary_path);
1534
1535 for argument in self.command_arguments() {
1536 wallet_show_command.arg(&*argument);
1537 }
1538
1539 let Ok(wallet_show_output) = wallet_show_command
1540 .current_dir(working_directory)
1541 .args(["wallet", "show", "--short", "--owned"])
1542 .output()
1543 else {
1544 tracing::warn!("Failed to execute `wallet show --short` to list chains to close");
1545 return;
1546 };
1547
1548 if !wallet_show_output.status.success() {
1549 tracing::warn!("Failed to list chains in the wallet to close them");
1550 return;
1551 }
1552
1553 let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1554 tracing::warn!(
1555 "Failed to close chains because `linera wallet show --short` \
1556 returned a non-UTF-8 output"
1557 );
1558 return;
1559 };
1560
1561 let chain_ids = chain_list_string
1562 .split('\n')
1563 .map(|line| line.trim())
1564 .filter(|line| !line.is_empty());
1565
1566 for chain_id in chain_ids {
1567 let mut close_chain_command = SyncCommand::new(binary_path);
1568
1569 for argument in self.command_arguments() {
1570 close_chain_command.arg(&*argument);
1571 }
1572
1573 close_chain_command.current_dir(working_directory);
1574
1575 match close_chain_command.args(["close-chain", chain_id]).status() {
1576 Ok(status) if status.success() => (),
1577 Ok(failure) => tracing::warn!("Failed to close chain {chain_id}: {failure}"),
1578 Err(error) => tracing::warn!("Failed to close chain {chain_id}: {error}"),
1579 }
1580 }
1581 }
1582}
1583
1584#[cfg(with_testing)]
1585impl ClientWrapper {
1586 pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1587 self.build_application(Self::example_path(name)?.as_path(), name, true)
1588 .await
1589 }
1590
1591 pub async fn build_test_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1592 self.build_application(Self::test_example_path(name)?.as_path(), name, true)
1593 .await
1594 }
1595
1596 pub fn example_path(name: &str) -> Result<PathBuf> {
1597 Ok(env::current_dir()?.join("../examples/").join(name))
1598 }
1599
1600 pub fn test_example_path(name: &str) -> Result<PathBuf> {
1601 Ok(env::current_dir()?
1602 .join("../linera-sdk/tests/fixtures/")
1603 .join(name))
1604 }
1605}
1606
1607fn truncate_query_output(input: &str) -> String {
1608 let max_len = 1000;
1609 if input.len() < max_len {
1610 input.to_string()
1611 } else {
1612 format!("{} ...", input.get(..max_len).unwrap())
1613 }
1614}
1615
1616fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1617 let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1618 let max_len = 200;
1619 if query.len() < max_len {
1620 query
1621 } else {
1622 format!("{} ...", query.get(..max_len).unwrap())
1623 }
1624}
1625
1626fn log_unexpected_exit(child: &mut Child, service_kind: &str, port: u16) {
1630 match child.try_wait() {
1631 Ok(Some(status)) => {
1632 #[cfg(unix)]
1633 {
1634 use std::os::unix::process::ExitStatusExt as _;
1635 if let Some(signal) = status.signal() {
1636 tracing::error!(
1637 port,
1638 signal,
1639 "The {service_kind} service was killed by signal {signal}",
1640 );
1641 return;
1642 }
1643 }
1644 if !status.success() {
1645 tracing::error!(
1646 port,
1647 %status,
1648 "The {service_kind} service exited unexpectedly with {status}",
1649 );
1650 }
1651 }
1652 Ok(None) => {} Err(error) => {
1654 tracing::warn!(
1655 port,
1656 %error,
1657 "Failed to check {service_kind} service status",
1658 );
1659 }
1660 }
1661}
1662
1663pub struct NodeService {
1664 port: u16,
1665 child: Child,
1666 terminated: bool,
1667}
1668
1669impl Drop for NodeService {
1670 fn drop(&mut self) {
1671 if !self.terminated {
1672 log_unexpected_exit(&mut self.child, "node", self.port);
1673 }
1674 }
1675}
1676
1677impl NodeService {
1678 fn new(port: u16, child: Child) -> Self {
1679 Self {
1680 port,
1681 child,
1682 terminated: false,
1683 }
1684 }
1685
1686 pub async fn terminate(mut self) -> Result<()> {
1687 self.terminated = true;
1688 self.child.kill().await.context("terminating node service")
1689 }
1690
1691 pub fn port(&self) -> u16 {
1692 self.port
1693 }
1694
1695 pub fn ensure_is_running(&mut self) -> Result<()> {
1696 self.child.ensure_is_running()
1697 }
1698
1699 pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1700 let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1701 let mut data = self.query_node(query).await?;
1702 Ok(serde_json::from_value(data["processInbox"].take())?)
1703 }
1704
1705 pub async fn sync(&self, chain_id: &ChainId) -> Result<u64> {
1706 let query = format!("mutation {{ sync(chainId: \"{chain_id}\") }}");
1707 let mut data = self.query_node(query).await?;
1708 Ok(serde_json::from_value(data["sync"].take())?)
1709 }
1710
1711 pub async fn transfer(
1712 &self,
1713 chain_id: ChainId,
1714 owner: AccountOwner,
1715 recipient: Account,
1716 amount: Amount,
1717 ) -> Result<CryptoHash> {
1718 let json_owner = owner.to_value();
1719 let json_recipient = recipient.to_value();
1720 let query = format!(
1721 "mutation {{ transfer(\
1722 chainId: \"{chain_id}\", \
1723 owner: {json_owner}, \
1724 recipient: {json_recipient}, \
1725 amount: \"{amount}\") \
1726 }}"
1727 );
1728 let data = self.query_node(query).await?;
1729 serde_json::from_value(data["transfer"].clone())
1730 .context("missing transfer field in response")
1731 }
1732
1733 pub async fn balance(&self, account: &Account) -> Result<Amount> {
1734 let chain = account.chain_id;
1735 let owner = account.owner;
1736 if matches!(owner, AccountOwner::CHAIN) {
1737 let query = format!(
1738 "query {{ chain(chainId:\"{chain}\") {{
1739 executionState {{ system {{ balance }} }}
1740 }} }}"
1741 );
1742 let response = self.query_node(query).await?;
1743 let balance = &response["chain"]["executionState"]["system"]["balance"]
1744 .as_str()
1745 .unwrap();
1746 return Ok(Amount::from_str(balance)?);
1747 }
1748 let query = format!(
1749 "query {{ chain(chainId:\"{chain}\") {{
1750 executionState {{ system {{ balances {{
1751 entry(key:\"{owner}\") {{ value }}
1752 }} }} }}
1753 }} }}"
1754 );
1755 let response = self.query_node(query).await?;
1756 let balances = &response["chain"]["executionState"]["system"]["balances"];
1757 let balance = balances["entry"]["value"].as_str();
1758 match balance {
1759 None => Ok(Amount::ZERO),
1760 Some(amount) => Ok(Amount::from_str(amount)?),
1761 }
1762 }
1763
1764 pub fn make_application<A: ContractAbi>(
1765 &self,
1766 chain_id: &ChainId,
1767 application_id: &ApplicationId<A>,
1768 ) -> Result<ApplicationWrapper<A>> {
1769 let application_id = application_id.forget_abi().to_string();
1770 let link = format!(
1771 "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1772 self.port
1773 );
1774 Ok(ApplicationWrapper::from(link))
1775 }
1776
1777 pub async fn publish_data_blob(
1778 &self,
1779 chain_id: &ChainId,
1780 bytes: Vec<u8>,
1781 ) -> Result<CryptoHash> {
1782 let query = format!(
1783 "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1784 chain_id.to_value(),
1785 bytes.to_value(),
1786 );
1787 let data = self.query_node(query).await?;
1788 serde_json::from_value(data["publishDataBlob"].clone())
1789 .context("missing publishDataBlob field in response")
1790 }
1791
1792 pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1793 &self,
1794 chain_id: &ChainId,
1795 contract: PathBuf,
1796 service: PathBuf,
1797 vm_runtime: VmRuntime,
1798 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1799 let contract_code = Bytecode::load_from_file(&contract).await?;
1800 let service_code = Bytecode::load_from_file(&service).await?;
1801 let query = format!(
1802 "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1803 chain_id.to_value(),
1804 contract_code.to_value(),
1805 service_code.to_value(),
1806 vm_runtime.to_value(),
1807 );
1808 let data = self.query_node(query).await?;
1809 let module_str = data["publishModule"]
1810 .as_str()
1811 .context("module ID not found")?;
1812 let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1813 Ok(module_id.with_abi())
1814 }
1815
1816 pub async fn query_committee_hash(&self, chain_id: &ChainId) -> Result<Option<CryptoHash>> {
1817 let query = format!(
1818 "query {{ chain(chainId:\"{chain_id}\") {{
1819 executionState {{ system {{ committeeHash }} }}
1820 }} }}"
1821 );
1822 let mut response = self.query_node(query).await?;
1823 let hash = response["chain"]["executionState"]["system"]["committeeHash"].take();
1824 Ok(serde_json::from_value(hash)?)
1825 }
1826
1827 pub async fn query_chain_epoch(&self, chain_id: &ChainId) -> Result<Epoch> {
1828 let query = format!(
1829 "query {{ chain(chainId:\"{chain_id}\") {{
1830 executionState {{ system {{ epoch }} }}
1831 }} }}"
1832 );
1833 let mut response = self.query_node(query).await?;
1834 let epoch = response["chain"]["executionState"]["system"]["epoch"].take();
1835 Ok(serde_json::from_value(epoch)?)
1836 }
1837
1838 pub async fn events_from_index(
1839 &self,
1840 chain_id: &ChainId,
1841 stream_id: &StreamId,
1842 start_index: u32,
1843 ) -> Result<Vec<IndexAndEvent>> {
1844 let query = format!(
1845 "query {{
1846 eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1847 {{ index event }}
1848 }}",
1849 stream_id.to_value()
1850 );
1851 let mut response = self.query_node(query).await?;
1852 let response = response["eventsFromIndex"].take();
1853 Ok(serde_json::from_value(response)?)
1854 }
1855
1856 pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1857 let n_try = 5;
1858 let query = query.as_ref();
1859 for i in 0..n_try {
1860 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1861 let url = format!("http://localhost:{}/", self.port);
1862 let client = reqwest_client();
1863 let result = client
1864 .post(url)
1865 .json(&json!({ "query": query }))
1866 .send()
1867 .await;
1868 if matches!(result, Err(ref error) if error.is_timeout()) {
1869 tracing::warn!(
1870 "Timeout when sending query {} to the node service",
1871 truncate_query_output(query)
1872 );
1873 continue;
1874 }
1875 let response = result.with_context(|| {
1876 format!(
1877 "query_node: failed to post query={}",
1878 truncate_query_output(query)
1879 )
1880 })?;
1881 ensure!(
1882 response.status().is_success(),
1883 "Query \"{}\" failed: {}",
1884 truncate_query_output(query),
1885 response
1886 .text()
1887 .await
1888 .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1889 );
1890 let value: Value = response.json().await.context("invalid JSON")?;
1891 if let Some(errors) = value.get("errors") {
1892 tracing::warn!(
1893 "Query \"{}\" failed: {}",
1894 truncate_query_output(query),
1895 errors
1896 );
1897 } else {
1898 return Ok(value["data"].clone());
1899 }
1900 }
1901 bail!(
1902 "Query \"{}\" failed after {} retries.",
1903 truncate_query_output(query),
1904 n_try
1905 );
1906 }
1907
1908 pub async fn create_application<
1909 Abi: ContractAbi,
1910 Parameters: Serialize,
1911 InstantiationArgument: Serialize,
1912 >(
1913 &self,
1914 chain_id: &ChainId,
1915 module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1916 parameters: &Parameters,
1917 argument: &InstantiationArgument,
1918 required_application_ids: &[ApplicationId],
1919 ) -> Result<ApplicationId<Abi>> {
1920 let module_id = module_id.forget_abi();
1921 let json_required_applications_ids = required_application_ids
1922 .iter()
1923 .map(ApplicationId::to_string)
1924 .collect::<Vec<_>>()
1925 .to_value();
1926 let new_parameters = serde_json::to_value(parameters)
1928 .context("could not create parameters JSON")?
1929 .to_value();
1930 let new_argument = serde_json::to_value(argument)
1931 .context("could not create argument JSON")?
1932 .to_value();
1933 let query = format!(
1934 "mutation {{ createApplication(\
1935 chainId: \"{chain_id}\",
1936 moduleId: \"{module_id}\", \
1937 parameters: {new_parameters}, \
1938 instantiationArgument: {new_argument}, \
1939 requiredApplicationIds: {json_required_applications_ids}) \
1940 }}"
1941 );
1942 let data = self.query_node(query).await?;
1943 let app_id_str = data["createApplication"]
1944 .as_str()
1945 .context("missing createApplication string in response")?
1946 .trim();
1947 Ok(app_id_str
1948 .parse::<ApplicationId>()
1949 .context("invalid application ID")?
1950 .with_abi())
1951 }
1952
1953 pub async fn chain_tip(&self, chain: ChainId) -> Result<Option<(CryptoHash, BlockHeight)>> {
1955 let query = format!(
1956 r#"query {{ block(chainId: "{chain}") {{
1957 hash
1958 block {{ header {{ height }} }}
1959 }} }}"#
1960 );
1961
1962 let mut response = self.query_node(&query).await?;
1963
1964 match (
1965 mem::take(&mut response["block"]["hash"]),
1966 mem::take(&mut response["block"]["block"]["header"]["height"]),
1967 ) {
1968 (Value::Null, Value::Null) => Ok(None),
1969 (Value::String(hash), Value::Number(height)) => Ok(Some((
1970 hash.parse()
1971 .context("Received an invalid hash {hash:?} for chain tip")?,
1972 BlockHeight(height.as_u64().unwrap()),
1973 ))),
1974 invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
1975 }
1976 }
1977
1978 pub async fn notifications(
1980 &self,
1981 chain_id: ChainId,
1982 ) -> Result<Pin<Box<impl Stream<Item = Result<Notification>>>>> {
1983 let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
1984 let url = format!("ws://localhost:{}/ws", self.port);
1985 let mut request = url.into_client_request()?;
1986 request.headers_mut().insert(
1987 "Sec-WebSocket-Protocol",
1988 HeaderValue::from_str("graphql-transport-ws")?,
1989 );
1990 let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1991 let init_json = json!({
1992 "type": "connection_init",
1993 "payload": {}
1994 });
1995 websocket.send(init_json.to_string().into()).await?;
1996 let text = websocket
1997 .next()
1998 .await
1999 .context("Failed to establish connection")??
2000 .into_text()?;
2001 ensure!(
2002 text == "{\"type\":\"connection_ack\"}",
2003 "Unexpected response: {text}"
2004 );
2005 let query_json = json!({
2006 "id": "1",
2007 "type": "start",
2008 "payload": {
2009 "query": query,
2010 "variables": {},
2011 "operationName": null
2012 }
2013 });
2014 websocket.send(query_json.to_string().into()).await?;
2015 Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
2016 |message| async {
2017 let text = message.into_text()?;
2018 let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
2019 if let Some(errors) = value["payload"].get("errors") {
2020 bail!("Notification subscription failed: {errors:?}");
2021 }
2022 serde_json::from_value(value["payload"]["data"]["notifications"].clone())
2023 .context("Failed to deserialize notification")
2024 },
2025 )))
2026 }
2027
2028 pub async fn query_result(
2030 &self,
2031 name: &str,
2032 chain_id: ChainId,
2033 application_id: &ApplicationId,
2034 ) -> Result<Pin<Box<impl Stream<Item = Result<Value>>>>> {
2035 let query = format!(
2036 r#"subscription {{ queryResult(name: "{name}", chainId: "{chain_id}", applicationId: "{application_id}") }}"#,
2037 );
2038 let url = format!("ws://localhost:{}/ws", self.port);
2039 let mut request = url.into_client_request()?;
2040 request.headers_mut().insert(
2041 "Sec-WebSocket-Protocol",
2042 HeaderValue::from_str("graphql-transport-ws")?,
2043 );
2044 let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
2045 let init_json = json!({
2046 "type": "connection_init",
2047 "payload": {}
2048 });
2049 websocket.send(init_json.to_string().into()).await?;
2050 let text = websocket
2051 .next()
2052 .await
2053 .context("Failed to establish connection")??
2054 .into_text()?;
2055 ensure!(
2056 text == "{\"type\":\"connection_ack\"}",
2057 "Unexpected response: {text}"
2058 );
2059 let query_json = json!({
2060 "id": "1",
2061 "type": "start",
2062 "payload": {
2063 "query": query,
2064 "variables": {},
2065 "operationName": null
2066 }
2067 });
2068 websocket.send(query_json.to_string().into()).await?;
2069 Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
2070 |message| async {
2071 let text = message.into_text()?;
2072 let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
2073 if let Some(errors) = value["payload"].get("errors") {
2074 bail!("Query result subscription failed: {errors:?}");
2075 }
2076 Ok(value["payload"]["data"]["queryResult"].clone())
2077 },
2078 )))
2079 }
2080}
2081
2082pub struct FaucetService {
2084 port: u16,
2085 child: Child,
2086 _temp_dir: tempfile::TempDir,
2087 terminated: bool,
2088}
2089
2090impl Drop for FaucetService {
2091 fn drop(&mut self) {
2092 if !self.terminated {
2093 log_unexpected_exit(&mut self.child, "faucet", self.port);
2094 }
2095 }
2096}
2097
2098impl FaucetService {
2099 fn new(port: u16, child: Child, temp_dir: tempfile::TempDir) -> Self {
2100 Self {
2101 port,
2102 child,
2103 _temp_dir: temp_dir,
2104 terminated: false,
2105 }
2106 }
2107
2108 pub async fn terminate(mut self) -> Result<()> {
2109 self.terminated = true;
2110 self.child
2111 .kill()
2112 .await
2113 .context("terminating faucet service")
2114 }
2115
2116 pub fn ensure_is_running(&mut self) -> Result<()> {
2117 self.child.ensure_is_running()
2118 }
2119
2120 pub fn instance(&self) -> Faucet {
2121 Faucet::new(format!("http://localhost:{}/", self.port))
2122 }
2123}
2124
2125pub struct ApplicationWrapper<A> {
2127 uri: String,
2128 _phantom: PhantomData<A>,
2129}
2130
2131impl<A> ApplicationWrapper<A> {
2132 pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
2133 let query = query.as_ref();
2134 let value = self.run_json_query(json!({ "query": query })).await?;
2135 Ok(value["data"].clone())
2136 }
2137
2138 pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
2139 const MAX_RETRIES: usize = 5;
2140
2141 for i in 0.. {
2142 let client = reqwest_client();
2143 let result = client.post(&self.uri).json(&query).send().await;
2144 let response = match result {
2145 Ok(response) => response,
2146 Err(error) if i < MAX_RETRIES => {
2147 tracing::warn!(
2148 "Failed to post query \"{}\": {error}; retrying",
2149 truncate_query_output_serialize(&query),
2150 );
2151 continue;
2152 }
2153 Err(error) => {
2154 let query = truncate_query_output_serialize(&query);
2155 return Err(error)
2156 .with_context(|| format!("run_json_query: failed to post query={query}"));
2157 }
2158 };
2159 ensure!(
2160 response.status().is_success(),
2161 "Query \"{}\" failed: {}",
2162 truncate_query_output_serialize(&query),
2163 response
2164 .text()
2165 .await
2166 .unwrap_or_else(|error| format!("Could not get response text: {error}"))
2167 );
2168 let value: Value = response.json().await.context("invalid JSON")?;
2169 if let Some(errors) = value.get("errors") {
2170 bail!(
2171 "Query \"{}\" failed: {}",
2172 truncate_query_output_serialize(&query),
2173 errors
2174 );
2175 }
2176 return Ok(value);
2177 }
2178 unreachable!()
2179 }
2180
2181 pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
2182 let query = query.as_ref();
2183 self.run_graphql_query(&format!("query {{ {query} }}"))
2184 .await
2185 }
2186
2187 pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
2188 let query = query.as_ref().trim();
2189 let name = query
2190 .split_once(|ch: char| !ch.is_alphanumeric())
2191 .map_or(query, |(name, _)| name);
2192 let data = self.query(query).await?;
2193 serde_json::from_value(data[name].clone())
2194 .with_context(|| format!("{name} field missing in response"))
2195 }
2196
2197 pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
2198 let mutation = mutation.as_ref();
2199 self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
2200 .await
2201 }
2202
2203 pub async fn multiple_mutate(&self, mutations: &[String]) -> Result<Value> {
2204 let mut out = String::from("mutation {\n");
2205 for (index, mutation) in mutations.iter().enumerate() {
2206 out = format!("{out} u{index}: {mutation}\n");
2207 }
2208 out.push_str("}\n");
2209 self.run_graphql_query(&out).await
2210 }
2211}
2212
2213impl<A> From<String> for ApplicationWrapper<A> {
2214 fn from(uri: String) -> ApplicationWrapper<A> {
2215 ApplicationWrapper {
2216 uri,
2217 _phantom: PhantomData,
2218 }
2219 }
2220}
2221
2222#[cfg(with_testing)]
2225fn notification_timeout() -> Duration {
2226 const NOTIFICATION_TIMEOUT_MS_ENV: &str = "LINERA_TEST_NOTIFICATION_TIMEOUT_MS";
2227 const NOTIFICATION_TIMEOUT_MS_DEFAULT: u64 = 10_000;
2228
2229 match env::var(NOTIFICATION_TIMEOUT_MS_ENV) {
2230 Ok(var) => Duration::from_millis(var.parse().unwrap_or_else(|error| {
2231 panic!("{NOTIFICATION_TIMEOUT_MS_ENV} is not a valid number: {error}")
2232 })),
2233 Err(env::VarError::NotPresent) => Duration::from_millis(NOTIFICATION_TIMEOUT_MS_DEFAULT),
2234 Err(env::VarError::NotUnicode(_)) => {
2235 panic!("{NOTIFICATION_TIMEOUT_MS_ENV} must be valid Unicode")
2236 }
2237 }
2238}
2239
2240#[cfg(with_testing)]
2241pub trait NotificationsExt {
2242 fn wait_for<T>(
2244 &mut self,
2245 f: impl FnMut(Notification) -> Option<T>,
2246 ) -> impl Future<Output = Result<T>>;
2247
2248 fn wait_for_events(
2251 &mut self,
2252 expected_height: impl Into<Option<BlockHeight>>,
2253 ) -> impl Future<Output = Result<BTreeSet<StreamId>>> {
2254 let expected_height = expected_height.into();
2255 self.wait_for(move |notification| {
2256 if let Reason::NewEvents {
2257 height,
2258 event_streams,
2259 ..
2260 } = notification.reason
2261 {
2262 if expected_height.is_none_or(|h| h == height) {
2263 return Some(event_streams);
2264 }
2265 }
2266 None
2267 })
2268 }
2269
2270 fn wait_for_block(
2273 &mut self,
2274 expected_height: impl Into<Option<BlockHeight>>,
2275 ) -> impl Future<Output = Result<CryptoHash>> {
2276 let expected_height = expected_height.into();
2277 self.wait_for(move |notification| {
2278 if let Reason::NewBlock { height, hash, .. } = notification.reason {
2279 if expected_height.is_none_or(|h| h == height) {
2280 return Some(hash);
2281 }
2282 }
2283 None
2284 })
2285 }
2286
2287 fn wait_for_bundle(
2290 &mut self,
2291 expected_origin: ChainId,
2292 expected_height: impl Into<Option<BlockHeight>>,
2293 ) -> impl Future<Output = Result<()>> {
2294 let expected_height = expected_height.into();
2295 self.wait_for(move |notification| {
2296 if let Reason::NewIncomingBundle { height, origin } = notification.reason {
2297 if expected_height.is_none_or(|h| h == height) && origin == expected_origin {
2298 return Some(());
2299 }
2300 }
2301 None
2302 })
2303 }
2304}
2305
2306#[cfg(with_testing)]
2307impl<S: Stream<Item = Result<Notification>>> NotificationsExt for Pin<Box<S>> {
2308 async fn wait_for<T>(&mut self, mut f: impl FnMut(Notification) -> Option<T>) -> Result<T> {
2309 let mut timeout = Box::pin(linera_base::time::timer::sleep(notification_timeout())).fuse();
2310 loop {
2311 let notification = futures::select! {
2312 () = timeout => bail!("Timeout waiting for notification"),
2313 notification = self.next().fuse() => notification.context("Stream closed")??,
2314 };
2315 if let Some(t) = f(notification) {
2316 return Ok(t);
2317 }
2318 }
2319 }
2320}