1use std::{
5 borrow::Cow,
6 collections::BTreeMap,
7 env,
8 marker::PhantomData,
9 mem,
10 path::{Path, PathBuf},
11 pin::Pin,
12 process::Stdio,
13 str::FromStr,
14 sync,
15 time::Duration,
16};
17
18use anyhow::{bail, ensure, Context, Result};
19use async_graphql::InputType;
20use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
21use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
22use heck::ToKebabCase;
23use linera_base::{
24 abi::ContractAbi,
25 command::{resolve_binary, CommandExt},
26 crypto::{CryptoHash, InMemorySigner},
27 data_types::{Amount, BlockHeight, Bytecode, Epoch},
28 identifiers::{
29 Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
30 },
31 vm::VmRuntime,
32};
33use linera_client::{client_options::ResourceControlPolicyConfig, wallet::Wallet};
34use linera_core::worker::Notification;
35use linera_execution::committee::Committee;
36use linera_faucet_client::Faucet;
37use serde::{de::DeserializeOwned, ser::Serialize};
38use serde_command_opts::to_args;
39use serde_json::{json, Value};
40use tempfile::TempDir;
41use tokio::{
42 io::{AsyncBufReadExt, BufReader},
43 process::{Child, Command},
44 sync::oneshot,
45 task::JoinHandle,
46};
47#[cfg(with_testing)]
48use {
49 futures::FutureExt as _,
50 linera_core::worker::Reason,
51 std::{collections::BTreeSet, future::Future},
52};
53
54use crate::{
55 cli::command::BenchmarkCommand,
56 cli_wrappers::{
57 local_net::{PathProvider, ProcessInbox},
58 Network,
59 },
60 util::{self, ChildExt},
61};
62
63const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
66
67fn reqwest_client() -> reqwest::Client {
68 reqwest::ClientBuilder::new()
69 .timeout(Duration::from_secs(30))
70 .build()
71 .unwrap()
72}
73
74pub struct ClientWrapper {
76 binary_path: sync::Mutex<Option<PathBuf>>,
77 testing_prng_seed: Option<u64>,
78 storage: String,
79 wallet: String,
80 keystore: String,
81 max_pending_message_bundles: usize,
82 network: Network,
83 pub path_provider: PathProvider,
84 on_drop: OnClientDrop,
85 extra_args: Vec<String>,
86}
87
88#[derive(Clone, Copy, Debug, Eq, PartialEq)]
90pub enum OnClientDrop {
91 CloseChains,
93 LeakChains,
95}
96
97impl ClientWrapper {
98 pub fn new(
99 path_provider: PathProvider,
100 network: Network,
101 testing_prng_seed: Option<u64>,
102 id: usize,
103 on_drop: OnClientDrop,
104 ) -> Self {
105 Self::new_with_extra_args(
106 path_provider,
107 network,
108 testing_prng_seed,
109 id,
110 on_drop,
111 vec!["--wait-for-outgoing-messages".to_string()],
112 )
113 }
114
115 pub fn new_with_extra_args(
116 path_provider: PathProvider,
117 network: Network,
118 testing_prng_seed: Option<u64>,
119 id: usize,
120 on_drop: OnClientDrop,
121 extra_args: Vec<String>,
122 ) -> Self {
123 let storage = format!(
124 "rocksdb:{}/client_{}.db",
125 path_provider.path().display(),
126 id
127 );
128 let wallet = format!("wallet_{}.json", id);
129 let keystore = format!("keystore_{}.json", id);
130 Self {
131 binary_path: sync::Mutex::new(None),
132 testing_prng_seed,
133 storage,
134 wallet,
135 keystore,
136 max_pending_message_bundles: 10_000,
137 network,
138 path_provider,
139 on_drop,
140 extra_args,
141 }
142 }
143
144 pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
146 let tmp = TempDir::new()?;
147 let mut command = self.command().await?;
148 command
149 .current_dir(tmp.path())
150 .arg("project")
151 .arg("new")
152 .arg(project_name)
153 .arg("--linera-root")
154 .arg(linera_root)
155 .spawn_and_wait_for_stdout()
156 .await?;
157 Ok(tmp)
158 }
159
160 pub async fn project_publish<T: Serialize>(
162 &self,
163 path: PathBuf,
164 required_application_ids: Vec<String>,
165 publisher: impl Into<Option<ChainId>>,
166 argument: &T,
167 ) -> Result<String> {
168 let json_parameters = serde_json::to_string(&())?;
169 let json_argument = serde_json::to_string(argument)?;
170 let mut command = self.command().await?;
171 command
172 .arg("project")
173 .arg("publish-and-create")
174 .arg(path)
175 .args(publisher.into().iter().map(ChainId::to_string))
176 .args(["--json-parameters", &json_parameters])
177 .args(["--json-argument", &json_argument]);
178 if !required_application_ids.is_empty() {
179 command.arg("--required-application-ids");
180 command.args(required_application_ids);
181 }
182 let stdout = command.spawn_and_wait_for_stdout().await?;
183 Ok(stdout.trim().to_string())
184 }
185
186 pub async fn project_test(&self, path: &Path) -> Result<()> {
188 self.command()
189 .await
190 .context("failed to create project test command")?
191 .current_dir(path)
192 .arg("project")
193 .arg("test")
194 .spawn_and_wait_for_stdout()
195 .await?;
196 Ok(())
197 }
198
199 async fn command_with_envs_and_arguments(
200 &self,
201 envs: &[(&str, &str)],
202 arguments: impl IntoIterator<Item = Cow<'_, str>>,
203 ) -> Result<Command> {
204 let mut command = self.command_binary().await?;
205 command.current_dir(self.path_provider.path());
206 for (key, value) in envs {
207 command.env(key, value);
208 }
209 for argument in arguments {
210 command.arg(&*argument);
211 }
212 Ok(command)
213 }
214
215 async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
216 self.command_with_envs_and_arguments(envs, self.command_arguments())
217 .await
218 }
219
220 async fn command_with_arguments(
221 &self,
222 arguments: impl IntoIterator<Item = Cow<'_, str>>,
223 ) -> Result<Command> {
224 self.command_with_envs_and_arguments(
225 &[(
226 "RUST_LOG",
227 &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
228 )],
229 arguments,
230 )
231 .await
232 }
233
234 async fn command(&self) -> Result<Command> {
235 self.command_with_envs(&[(
236 "RUST_LOG",
237 &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
238 )])
239 .await
240 }
241
242 fn required_command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
243 [
244 "--wallet".into(),
245 self.wallet.as_str().into(),
246 "--keystore".into(),
247 self.keystore.as_str().into(),
248 "--storage".into(),
249 self.storage.as_str().into(),
250 "--send-timeout-ms".into(),
251 "500000".into(),
252 "--recv-timeout-ms".into(),
253 "500000".into(),
254 ]
255 .into_iter()
256 .chain(self.extra_args.iter().map(|s| s.as_str().into()))
257 }
258
259 fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
261 self.required_command_arguments().chain([
262 "--max-pending-message-bundles".into(),
263 self.max_pending_message_bundles.to_string().into(),
264 ])
265 }
266
267 async fn command_binary(&self) -> Result<Command> {
271 match self.command_with_cached_binary_path() {
272 Some(command) => Ok(command),
273 None => {
274 let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
275 let command = Command::new(&resolved_path);
276
277 self.set_cached_binary_path(resolved_path);
278
279 Ok(command)
280 }
281 }
282 }
283
284 fn command_with_cached_binary_path(&self) -> Option<Command> {
286 let binary_path = self.binary_path.lock().unwrap();
287
288 binary_path.as_ref().map(Command::new)
289 }
290
291 fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
299 let mut binary_path = self.binary_path.lock().unwrap();
300
301 if binary_path.is_none() {
302 *binary_path = Some(new_binary_path);
303 } else {
304 assert_eq!(*binary_path, Some(new_binary_path));
305 }
306 }
307
308 pub async fn create_genesis_config(
310 &self,
311 num_other_initial_chains: u32,
312 initial_funding: Amount,
313 policy_config: ResourceControlPolicyConfig,
314 http_allow_list: Option<Vec<String>>,
315 ) -> Result<()> {
316 let mut command = self.command().await?;
317 command
318 .args([
319 "create-genesis-config",
320 &num_other_initial_chains.to_string(),
321 ])
322 .args(["--initial-funding", &initial_funding.to_string()])
323 .args(["--committee", "committee.json"])
324 .args(["--genesis", "genesis.json"])
325 .args([
326 "--policy-config",
327 &policy_config.to_string().to_kebab_case(),
328 ]);
329 if let Some(allow_list) = http_allow_list {
330 command
331 .arg("--http-request-allow-list")
332 .arg(allow_list.join(","));
333 }
334 if let Some(seed) = self.testing_prng_seed {
335 command.arg("--testing-prng-seed").arg(seed.to_string());
336 }
337 command.spawn_and_wait_for_stdout().await?;
338 Ok(())
339 }
340
341 pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
344 let mut command = self.command().await?;
345 command.args(["wallet", "init"]);
346 match faucet {
347 None => command.args(["--genesis", "genesis.json"]),
348 Some(faucet) => command.args(["--faucet", faucet.url()]),
349 };
350 if let Some(seed) = self.testing_prng_seed {
351 command.arg("--testing-prng-seed").arg(seed.to_string());
352 }
353 command.spawn_and_wait_for_stdout().await?;
354 Ok(())
355 }
356
357 pub async fn request_chain(
359 &self,
360 faucet: &Faucet,
361 set_default: bool,
362 ) -> Result<(ChainId, AccountOwner)> {
363 let mut command = self.command().await?;
364 command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
365 if set_default {
366 command.arg("--set-default");
367 }
368 let stdout = command.spawn_and_wait_for_stdout().await?;
369 let mut lines = stdout.split_whitespace();
370 let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
371 let owner = lines.next().context("missing chain owner")?.parse()?;
372 Ok((chain_id, owner))
373 }
374
375 #[expect(clippy::too_many_arguments)]
377 pub async fn publish_and_create<
378 A: ContractAbi,
379 Parameters: Serialize,
380 InstantiationArgument: Serialize,
381 >(
382 &self,
383 contract: PathBuf,
384 service: PathBuf,
385 vm_runtime: VmRuntime,
386 parameters: &Parameters,
387 argument: &InstantiationArgument,
388 required_application_ids: &[ApplicationId],
389 publisher: impl Into<Option<ChainId>>,
390 ) -> Result<ApplicationId<A>> {
391 let json_parameters = serde_json::to_string(parameters)?;
392 let json_argument = serde_json::to_string(argument)?;
393 let mut command = self.command().await?;
394 let vm_runtime = format!("{}", vm_runtime);
395 command
396 .arg("publish-and-create")
397 .args([contract, service])
398 .args(["--vm-runtime", &vm_runtime.to_lowercase()])
399 .args(publisher.into().iter().map(ChainId::to_string))
400 .args(["--json-parameters", &json_parameters])
401 .args(["--json-argument", &json_argument]);
402 if !required_application_ids.is_empty() {
403 command.arg("--required-application-ids");
404 command.args(
405 required_application_ids
406 .iter()
407 .map(ApplicationId::to_string),
408 );
409 }
410 let stdout = command.spawn_and_wait_for_stdout().await?;
411 Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
412 }
413
414 pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
416 &self,
417 contract: PathBuf,
418 service: PathBuf,
419 vm_runtime: VmRuntime,
420 publisher: impl Into<Option<ChainId>>,
421 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
422 let stdout = self
423 .command()
424 .await?
425 .arg("publish-module")
426 .args([contract, service])
427 .args(["--vm-runtime", &format!("{}", vm_runtime).to_lowercase()])
428 .args(publisher.into().iter().map(ChainId::to_string))
429 .spawn_and_wait_for_stdout()
430 .await?;
431 let module_id: ModuleId = stdout.trim().parse()?;
432 Ok(module_id.with_abi())
433 }
434
435 pub async fn create_application<
437 Abi: ContractAbi,
438 Parameters: Serialize,
439 InstantiationArgument: Serialize,
440 >(
441 &self,
442 module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
443 parameters: &Parameters,
444 argument: &InstantiationArgument,
445 required_application_ids: &[ApplicationId],
446 creator: impl Into<Option<ChainId>>,
447 ) -> Result<ApplicationId<Abi>> {
448 let json_parameters = serde_json::to_string(parameters)?;
449 let json_argument = serde_json::to_string(argument)?;
450 let mut command = self.command().await?;
451 command
452 .arg("create-application")
453 .arg(module_id.forget_abi().to_string())
454 .args(["--json-parameters", &json_parameters])
455 .args(["--json-argument", &json_argument])
456 .args(creator.into().iter().map(ChainId::to_string));
457 if !required_application_ids.is_empty() {
458 command.arg("--required-application-ids");
459 command.args(
460 required_application_ids
461 .iter()
462 .map(ApplicationId::to_string),
463 );
464 }
465 let stdout = command.spawn_and_wait_for_stdout().await?;
466 Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
467 }
468
469 pub async fn run_node_service(
471 &self,
472 port: impl Into<Option<u16>>,
473 process_inbox: ProcessInbox,
474 ) -> Result<NodeService> {
475 let port = port.into().unwrap_or(8080);
476 let mut command = self.command().await?;
477 command.arg("service");
478 if let ProcessInbox::Skip = process_inbox {
479 command.arg("--listener-skip-process-inbox");
480 }
481 if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
482 command.args(var.split_whitespace());
483 }
484 let child = command
485 .args(["--port".to_string(), port.to_string()])
486 .spawn_into()?;
487 let client = reqwest_client();
488 for i in 0..10 {
489 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
490 let request = client
491 .get(format!("http://localhost:{}/", port))
492 .send()
493 .await;
494 if request.is_ok() {
495 tracing::info!("Node service has started");
496 return Ok(NodeService::new(port, child));
497 } else {
498 tracing::warn!("Waiting for node service to start");
499 }
500 }
501 bail!("Failed to start node service");
502 }
503
504 pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
506 let mut command = self.command().await?;
507 command.arg("query-validator").arg(address);
508 let stdout = command.spawn_and_wait_for_stdout().await?;
509
510 let hash = stdout
513 .lines()
514 .find_map(|line| {
515 line.strip_prefix("Genesis config hash: ")
516 .and_then(|hash_str| hash_str.trim().parse().ok())
517 })
518 .context("error while parsing the result of `linera query-validator`")?;
519 Ok(hash)
520 }
521
522 pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
524 let mut command = self.command().await?;
525 command.arg("query-validators");
526 if let Some(chain_id) = chain_id {
527 command.arg(chain_id.to_string());
528 }
529 command.spawn_and_wait_for_stdout().await?;
530 Ok(())
531 }
532
533 pub async fn sync_validator(
535 &self,
536 chain_ids: impl IntoIterator<Item = &ChainId>,
537 validator_address: impl Into<String>,
538 ) -> Result<()> {
539 let mut command = self.command().await?;
540 command.arg("sync-validator").arg(validator_address.into());
541 let mut chain_ids = chain_ids.into_iter().peekable();
542 if chain_ids.peek().is_some() {
543 command
544 .arg("--chains")
545 .args(chain_ids.map(ChainId::to_string));
546 }
547 command.spawn_and_wait_for_stdout().await?;
548 Ok(())
549 }
550
551 pub async fn run_faucet(
553 &self,
554 port: impl Into<Option<u16>>,
555 chain_id: Option<ChainId>,
556 amount: Amount,
557 ) -> Result<FaucetService> {
558 let port = port.into().unwrap_or(8080);
559 let temp_dir = tempfile::tempdir()
560 .context("Failed to create temporary directory for faucet storage")?;
561 let storage_path = temp_dir.path().join("faucet_storage.sqlite");
562 let mut command = self.command().await?;
563 let command = command
564 .arg("faucet")
565 .args(["--port".to_string(), port.to_string()])
566 .args(["--amount".to_string(), amount.to_string()])
567 .args([
568 "--storage-path".to_string(),
569 storage_path.to_string_lossy().to_string(),
570 ]);
571 if let Some(chain_id) = chain_id {
572 command.arg(chain_id.to_string());
573 }
574 let child = command.spawn_into()?;
575 let client = reqwest_client();
576 for i in 0..10 {
577 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
578 let request = client
579 .get(format!("http://localhost:{}/", port))
580 .send()
581 .await;
582 if request.is_ok() {
583 tracing::info!("Faucet has started");
584 return Ok(FaucetService::new(port, child, temp_dir));
585 } else {
586 tracing::debug!("Waiting for faucet to start");
587 }
588 }
589 bail!("Failed to start faucet");
590 }
591
592 pub async fn local_balance(&self, account: Account) -> Result<Amount> {
594 let stdout = self
595 .command()
596 .await?
597 .arg("local-balance")
598 .arg(account.to_string())
599 .spawn_and_wait_for_stdout()
600 .await?;
601 let amount = stdout
602 .trim()
603 .parse()
604 .context("error while parsing the result of `linera local-balance`")?;
605 Ok(amount)
606 }
607
608 pub async fn query_balance(&self, account: Account) -> Result<Amount> {
610 let stdout = self
611 .command()
612 .await?
613 .arg("query-balance")
614 .arg(account.to_string())
615 .spawn_and_wait_for_stdout()
616 .await?;
617 let amount = stdout
618 .trim()
619 .parse()
620 .context("error while parsing the result of `linera query-balance`")?;
621 Ok(amount)
622 }
623
624 pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
626 self.command()
627 .await?
628 .arg("sync")
629 .arg(chain_id.to_string())
630 .spawn_and_wait_for_stdout()
631 .await?;
632 Ok(())
633 }
634
635 pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
637 self.command()
638 .await?
639 .arg("process-inbox")
640 .arg(chain_id.to_string())
641 .spawn_and_wait_for_stdout()
642 .await?;
643 Ok(())
644 }
645
646 pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
648 self.command()
649 .await?
650 .arg("transfer")
651 .arg(amount.to_string())
652 .args(["--from", &from.to_string()])
653 .args(["--to", &to.to_string()])
654 .spawn_and_wait_for_stdout()
655 .await?;
656 Ok(())
657 }
658
659 pub async fn transfer_with_silent_logs(
661 &self,
662 amount: Amount,
663 from: ChainId,
664 to: ChainId,
665 ) -> Result<()> {
666 self.command()
667 .await?
668 .env("RUST_LOG", "off")
669 .arg("transfer")
670 .arg(amount.to_string())
671 .args(["--from", &from.to_string()])
672 .args(["--to", &to.to_string()])
673 .spawn_and_wait_for_stdout()
674 .await?;
675 Ok(())
676 }
677
678 pub async fn transfer_with_accounts(
680 &self,
681 amount: Amount,
682 from: Account,
683 to: Account,
684 ) -> Result<()> {
685 self.command()
686 .await?
687 .arg("transfer")
688 .arg(amount.to_string())
689 .args(["--from", &from.to_string()])
690 .args(["--to", &to.to_string()])
691 .spawn_and_wait_for_stdout()
692 .await?;
693 Ok(())
694 }
695
696 fn benchmark_command_internal(command: &mut Command, args: BenchmarkCommand) -> Result<()> {
697 let mut formatted_args = to_args(&args)?;
698 let subcommand = formatted_args.remove(0);
699 formatted_args.remove(0);
702 let options = formatted_args
703 .chunks_exact(2)
704 .flat_map(|pair| {
705 let option = format!("--{}", pair[0]);
706 match pair[1].as_str() {
707 "true" => vec![option],
708 "false" => vec![],
709 _ => vec![option, pair[1].clone()],
710 }
711 })
712 .collect::<Vec<_>>();
713 command
714 .args([
715 "--max-pending-message-bundles",
716 &args.transactions_per_block().to_string(),
717 ])
718 .arg("benchmark")
719 .arg(subcommand)
720 .args(options);
721 Ok(())
722 }
723
724 async fn benchmark_command_with_envs(
725 &self,
726 args: BenchmarkCommand,
727 envs: &[(&str, &str)],
728 ) -> Result<Command> {
729 let mut command = self
730 .command_with_envs_and_arguments(envs, self.required_command_arguments())
731 .await?;
732 Self::benchmark_command_internal(&mut command, args)?;
733 Ok(command)
734 }
735
736 async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
737 let mut command = self
738 .command_with_arguments(self.required_command_arguments())
739 .await?;
740 Self::benchmark_command_internal(&mut command, args)?;
741 Ok(command)
742 }
743
744 pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
746 let mut command = self.benchmark_command(args).await?;
747 command.spawn_and_wait_for_stdout().await?;
748 Ok(())
749 }
750
751 pub async fn benchmark_detached(
754 &self,
755 args: BenchmarkCommand,
756 tx: oneshot::Sender<()>,
757 ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
758 let mut child = self
759 .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
760 .await?
761 .kill_on_drop(true)
762 .stdin(Stdio::piped())
763 .stdout(Stdio::piped())
764 .stderr(Stdio::piped())
765 .spawn()?;
766
767 let pid = child.id().expect("failed to get pid");
768 let stdout = child.stdout.take().expect("stdout not open");
769 let stdout_handle = tokio::spawn(async move {
770 let mut lines = BufReader::new(stdout).lines();
771 while let Ok(Some(line)) = lines.next_line().await {
772 println!("benchmark{{pid={pid}}} {line}");
773 }
774 });
775
776 let stderr = child.stderr.take().expect("stderr not open");
777 let stderr_handle = tokio::spawn(async move {
778 let mut lines = BufReader::new(stderr).lines();
779 let mut tx = Some(tx);
780 while let Ok(Some(line)) = lines.next_line().await {
781 if line.contains("Ready to start benchmark") {
782 tx.take()
783 .expect("Should only send signal once")
784 .send(())
785 .expect("failed to send ready signal to main thread");
786 } else {
787 println!("benchmark{{pid={pid}}} {line}");
788 }
789 }
790 });
791 Ok((child, stdout_handle, stderr_handle))
792 }
793
794 async fn open_chain_internal(
795 &self,
796 from: ChainId,
797 owner: Option<AccountOwner>,
798 initial_balance: Amount,
799 super_owner: bool,
800 ) -> Result<(ChainId, AccountOwner)> {
801 let mut command = self.command().await?;
802 command
803 .arg("open-chain")
804 .args(["--from", &from.to_string()])
805 .args(["--initial-balance", &initial_balance.to_string()]);
806
807 if let Some(owner) = owner {
808 command.args(["--owner", &owner.to_string()]);
809 }
810
811 if super_owner {
812 command.arg("--super-owner");
813 }
814
815 let stdout = command.spawn_and_wait_for_stdout().await?;
816 let mut split = stdout.split('\n');
817 let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
818 let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
819 if let Some(owner) = owner {
820 assert_eq!(owner, new_owner);
821 }
822 Ok((chain_id, new_owner))
823 }
824
825 pub async fn open_chain_super_owner(
827 &self,
828 from: ChainId,
829 owner: Option<AccountOwner>,
830 initial_balance: Amount,
831 ) -> Result<(ChainId, AccountOwner)> {
832 self.open_chain_internal(from, owner, initial_balance, true)
833 .await
834 }
835
836 pub async fn open_chain(
838 &self,
839 from: ChainId,
840 owner: Option<AccountOwner>,
841 initial_balance: Amount,
842 ) -> Result<(ChainId, AccountOwner)> {
843 self.open_chain_internal(from, owner, initial_balance, false)
844 .await
845 }
846
847 pub async fn open_and_assign(
849 &self,
850 client: &ClientWrapper,
851 initial_balance: Amount,
852 ) -> Result<ChainId> {
853 let our_chain = self
854 .load_wallet()?
855 .default_chain()
856 .context("no default chain found")?;
857 let owner = client.keygen().await?;
858 let (new_chain, _) = self
859 .open_chain(our_chain, Some(owner), initial_balance)
860 .await?;
861 client.assign(owner, new_chain).await?;
862 Ok(new_chain)
863 }
864
865 pub async fn open_multi_owner_chain(
866 &self,
867 from: ChainId,
868 owners: Vec<AccountOwner>,
869 weights: Vec<u64>,
870 multi_leader_rounds: u32,
871 balance: Amount,
872 base_timeout_ms: u64,
873 ) -> Result<ChainId> {
874 let mut command = self.command().await?;
875 command
876 .arg("open-multi-owner-chain")
877 .args(["--from", &from.to_string()])
878 .arg("--owners")
879 .args(owners.iter().map(AccountOwner::to_string))
880 .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
881 if !weights.is_empty() {
882 command
883 .arg("--owner-weights")
884 .args(weights.iter().map(u64::to_string));
885 };
886 command
887 .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
888 .args(["--initial-balance", &balance.to_string()]);
889
890 let stdout = command.spawn_and_wait_for_stdout().await?;
891 let mut split = stdout.split('\n');
892 let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
893
894 Ok(chain_id)
895 }
896
897 pub async fn change_ownership(
898 &self,
899 chain_id: ChainId,
900 super_owners: Vec<AccountOwner>,
901 owners: Vec<AccountOwner>,
902 ) -> Result<()> {
903 let mut command = self.command().await?;
904 command
905 .arg("change-ownership")
906 .args(["--chain-id", &chain_id.to_string()]);
907 if !super_owners.is_empty() {
908 command
909 .arg("--super-owners")
910 .args(super_owners.iter().map(AccountOwner::to_string));
911 }
912 if !owners.is_empty() {
913 command
914 .arg("--owners")
915 .args(owners.iter().map(AccountOwner::to_string));
916 }
917 command.spawn_and_wait_for_stdout().await?;
918 Ok(())
919 }
920
921 pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
923 let mut command = self.command().await?;
924 command
925 .args(["wallet", "follow-chain"])
926 .arg(chain_id.to_string());
927 if sync {
928 command.arg("--sync");
929 }
930 command.spawn_and_wait_for_stdout().await?;
931 Ok(())
932 }
933
934 pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
936 let mut command = self.command().await?;
937 command
938 .args(["wallet", "forget-chain"])
939 .arg(chain_id.to_string());
940 command.spawn_and_wait_for_stdout().await?;
941 Ok(())
942 }
943
944 pub async fn retry_pending_block(
945 &self,
946 chain_id: Option<ChainId>,
947 ) -> Result<Option<CryptoHash>> {
948 let mut command = self.command().await?;
949 command.arg("retry-pending-block");
950 if let Some(chain_id) = chain_id {
951 command.arg(chain_id.to_string());
952 }
953 let stdout = command.spawn_and_wait_for_stdout().await?;
954 let stdout = stdout.trim();
955 if stdout.is_empty() {
956 Ok(None)
957 } else {
958 Ok(Some(CryptoHash::from_str(stdout)?))
959 }
960 }
961
962 pub async fn publish_data_blob(
964 &self,
965 path: &Path,
966 chain_id: Option<ChainId>,
967 ) -> Result<CryptoHash> {
968 let mut command = self.command().await?;
969 command.arg("publish-data-blob").arg(path);
970 if let Some(chain_id) = chain_id {
971 command.arg(chain_id.to_string());
972 }
973 let stdout = command.spawn_and_wait_for_stdout().await?;
974 let stdout = stdout.trim();
975 Ok(CryptoHash::from_str(stdout)?)
976 }
977
978 pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
980 let mut command = self.command().await?;
981 command.arg("read-data-blob").arg(hash.to_string());
982 if let Some(chain_id) = chain_id {
983 command.arg(chain_id.to_string());
984 }
985 command.spawn_and_wait_for_stdout().await?;
986 Ok(())
987 }
988
989 pub fn load_wallet(&self) -> Result<Wallet> {
990 util::read_json(self.wallet_path())
991 }
992
993 pub fn load_keystore(&self) -> Result<InMemorySigner> {
994 util::read_json(self.keystore_path())
995 }
996
997 pub fn wallet_path(&self) -> PathBuf {
998 self.path_provider.path().join(&self.wallet)
999 }
1000
1001 pub fn keystore_path(&self) -> PathBuf {
1002 self.path_provider.path().join(&self.keystore)
1003 }
1004
1005 pub fn storage_path(&self) -> &str {
1006 &self.storage
1007 }
1008
1009 pub fn get_owner(&self) -> Option<AccountOwner> {
1010 let wallet = self.load_wallet().ok()?;
1011 let chain_id = wallet.default_chain()?;
1012 wallet.get(chain_id)?.owner
1013 }
1014
1015 pub fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
1016 self.load_wallet()
1017 .ok()
1018 .is_some_and(|wallet| wallet.get(chain).is_some())
1019 }
1020
1021 pub async fn set_validator(
1022 &self,
1023 validator_key: &(String, String),
1024 port: usize,
1025 votes: usize,
1026 ) -> Result<()> {
1027 let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1028 self.command()
1029 .await?
1030 .arg("set-validator")
1031 .args(["--public-key", &validator_key.0])
1032 .args(["--account-key", &validator_key.1])
1033 .args(["--address", &address])
1034 .args(["--votes", &votes.to_string()])
1035 .spawn_and_wait_for_stdout()
1036 .await?;
1037 Ok(())
1038 }
1039
1040 pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
1041 self.command()
1042 .await?
1043 .arg("remove-validator")
1044 .args(["--public-key", validator_key])
1045 .spawn_and_wait_for_stdout()
1046 .await?;
1047 Ok(())
1048 }
1049
1050 pub async fn change_validators(
1051 &self,
1052 add_validators: &[(String, String, usize, usize)], modify_validators: &[(String, String, usize, usize)], remove_validators: &[String],
1055 ) -> Result<()> {
1056 let mut command = self.command().await?;
1057 command.arg("change-validators");
1058
1059 for (public_key, account_key, port, votes) in add_validators {
1060 let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1061 let validator_spec = format!("{public_key},{account_key},{address},{votes}");
1062 command.args(["--add", &validator_spec]);
1063 }
1064
1065 for (public_key, account_key, port, votes) in modify_validators {
1066 let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1067 let validator_spec = format!("{public_key},{account_key},{address},{votes}");
1068 command.args(["--modify", &validator_spec]);
1069 }
1070
1071 for validator_key in remove_validators {
1072 command.args(["--remove", validator_key]);
1073 }
1074
1075 command.spawn_and_wait_for_stdout().await?;
1076 Ok(())
1077 }
1078
1079 pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
1080 self.command()
1081 .await?
1082 .arg("revoke-epochs")
1083 .arg(epoch.to_string())
1084 .spawn_and_wait_for_stdout()
1085 .await?;
1086 Ok(())
1087 }
1088
1089 pub async fn keygen(&self) -> Result<AccountOwner> {
1091 let stdout = self
1092 .command()
1093 .await?
1094 .arg("keygen")
1095 .spawn_and_wait_for_stdout()
1096 .await?;
1097 AccountOwner::from_str(stdout.as_str().trim())
1098 }
1099
1100 pub fn default_chain(&self) -> Option<ChainId> {
1102 self.load_wallet().ok()?.default_chain()
1103 }
1104
1105 pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
1107 let _stdout = self
1108 .command()
1109 .await?
1110 .arg("assign")
1111 .args(["--owner", &owner.to_string()])
1112 .args(["--chain-id", &chain_id.to_string()])
1113 .spawn_and_wait_for_stdout()
1114 .await?;
1115 Ok(())
1116 }
1117
1118 pub async fn set_preferred_owner(
1120 &self,
1121 chain_id: ChainId,
1122 owner: Option<AccountOwner>,
1123 ) -> Result<()> {
1124 let mut owner_arg = vec!["--owner".to_string()];
1125 if let Some(owner) = owner {
1126 owner_arg.push(owner.to_string());
1127 };
1128 self.command()
1129 .await?
1130 .arg("set-preferred-owner")
1131 .args(["--chain-id", &chain_id.to_string()])
1132 .args(owner_arg)
1133 .spawn_and_wait_for_stdout()
1134 .await?;
1135 Ok(())
1136 }
1137
1138 pub async fn build_application(
1139 &self,
1140 path: &Path,
1141 name: &str,
1142 is_workspace: bool,
1143 ) -> Result<(PathBuf, PathBuf)> {
1144 Command::new("cargo")
1145 .current_dir(self.path_provider.path())
1146 .arg("build")
1147 .arg("--release")
1148 .args(["--target", "wasm32-unknown-unknown"])
1149 .arg("--manifest-path")
1150 .arg(path.join("Cargo.toml"))
1151 .spawn_and_wait_for_stdout()
1152 .await?;
1153
1154 let release_dir = match is_workspace {
1155 true => path.join("../target/wasm32-unknown-unknown/release"),
1156 false => path.join("target/wasm32-unknown-unknown/release"),
1157 };
1158
1159 let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1160 let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1161
1162 let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1163 let service_size = fs_err::tokio::metadata(&service).await?.len();
1164 tracing::info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1165
1166 Ok((contract, service))
1167 }
1168}
1169
1170impl Drop for ClientWrapper {
1171 fn drop(&mut self) {
1172 use std::process::Command as SyncCommand;
1173
1174 if self.on_drop != OnClientDrop::CloseChains {
1175 return;
1176 }
1177
1178 let Ok(binary_path) = self.binary_path.lock() else {
1179 tracing::error!(
1180 "Failed to close chains because a thread panicked with a lock to `binary_path`"
1181 );
1182 return;
1183 };
1184
1185 let Some(binary_path) = binary_path.as_ref() else {
1186 tracing::warn!(
1187 "Assuming no chains need to be closed, because the command binary was never \
1188 resolved and therefore presumably never called"
1189 );
1190 return;
1191 };
1192
1193 let working_directory = self.path_provider.path();
1194 let mut wallet_show_command = SyncCommand::new(binary_path);
1195
1196 for argument in self.command_arguments() {
1197 wallet_show_command.arg(&*argument);
1198 }
1199
1200 let Ok(wallet_show_output) = wallet_show_command
1201 .current_dir(working_directory)
1202 .args(["wallet", "show", "--short", "--owned"])
1203 .output()
1204 else {
1205 tracing::warn!("Failed to execute `wallet show --short` to list chains to close");
1206 return;
1207 };
1208
1209 if !wallet_show_output.status.success() {
1210 tracing::warn!("Failed to list chains in the wallet to close them");
1211 return;
1212 }
1213
1214 let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1215 tracing::warn!(
1216 "Failed to close chains because `linera wallet show --short` \
1217 returned a non-UTF-8 output"
1218 );
1219 return;
1220 };
1221
1222 let chain_ids = chain_list_string
1223 .split('\n')
1224 .map(|line| line.trim())
1225 .filter(|line| !line.is_empty());
1226
1227 for chain_id in chain_ids {
1228 let mut close_chain_command = SyncCommand::new(binary_path);
1229
1230 for argument in self.command_arguments() {
1231 close_chain_command.arg(&*argument);
1232 }
1233
1234 close_chain_command.current_dir(working_directory);
1235
1236 match close_chain_command.args(["close-chain", chain_id]).status() {
1237 Ok(status) if status.success() => (),
1238 Ok(failure) => tracing::warn!("Failed to close chain {chain_id}: {failure}"),
1239 Err(error) => tracing::warn!("Failed to close chain {chain_id}: {error}"),
1240 }
1241 }
1242 }
1243}
1244
1245#[cfg(with_testing)]
1246impl ClientWrapper {
1247 pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1248 self.build_application(Self::example_path(name)?.as_path(), name, true)
1249 .await
1250 }
1251
1252 pub fn example_path(name: &str) -> Result<PathBuf> {
1253 Ok(env::current_dir()?.join("../examples/").join(name))
1254 }
1255}
1256
1257fn truncate_query_output(input: &str) -> String {
1258 let max_len = 1000;
1259 if input.len() < max_len {
1260 input.to_string()
1261 } else {
1262 format!("{} ...", input.get(..max_len).unwrap())
1263 }
1264}
1265
1266fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1267 let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1268 let max_len = 200;
1269 if query.len() < max_len {
1270 query
1271 } else {
1272 format!("{} ...", query.get(..max_len).unwrap())
1273 }
1274}
1275
1276pub struct NodeService {
1278 port: u16,
1279 child: Child,
1280}
1281
1282impl NodeService {
1283 fn new(port: u16, child: Child) -> Self {
1284 Self { port, child }
1285 }
1286
1287 pub async fn terminate(mut self) -> Result<()> {
1288 self.child.kill().await.context("terminating node service")
1289 }
1290
1291 pub fn port(&self) -> u16 {
1292 self.port
1293 }
1294
1295 pub fn ensure_is_running(&mut self) -> Result<()> {
1296 self.child.ensure_is_running()
1297 }
1298
1299 pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1300 let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1301 let mut data = self.query_node(query).await?;
1302 Ok(serde_json::from_value(data["processInbox"].take())?)
1303 }
1304
1305 pub async fn sync(&self, chain_id: &ChainId) -> Result<u64> {
1306 let query = format!("mutation {{ sync(chainId: \"{chain_id}\") }}");
1307 let mut data = self.query_node(query).await?;
1308 Ok(serde_json::from_value(data["sync"].take())?)
1309 }
1310
1311 pub async fn transfer(
1312 &self,
1313 chain_id: ChainId,
1314 owner: AccountOwner,
1315 recipient: Account,
1316 amount: Amount,
1317 ) -> Result<CryptoHash> {
1318 let json_owner = owner.to_value();
1319 let json_recipient = recipient.to_value();
1320 let query = format!(
1321 "mutation {{ transfer(\
1322 chainId: \"{chain_id}\", \
1323 owner: {json_owner}, \
1324 recipient: {json_recipient}, \
1325 amount: \"{amount}\") \
1326 }}"
1327 );
1328 let data = self.query_node(query).await?;
1329 serde_json::from_value(data["transfer"].clone())
1330 .context("missing transfer field in response")
1331 }
1332
1333 pub async fn balance(&self, account: &Account) -> Result<Amount> {
1334 let chain = account.chain_id;
1335 let owner = account.owner;
1336 if matches!(owner, AccountOwner::CHAIN) {
1337 let query = format!(
1338 "query {{ chain(chainId:\"{chain}\") {{
1339 executionState {{ system {{ balance }} }}
1340 }} }}"
1341 );
1342 let response = self.query_node(query).await?;
1343 let balance = &response["chain"]["executionState"]["system"]["balance"]
1344 .as_str()
1345 .unwrap();
1346 return Ok(Amount::from_str(balance)?);
1347 }
1348 let query = format!(
1349 "query {{ chain(chainId:\"{chain}\") {{
1350 executionState {{ system {{ balances {{
1351 entry(key:\"{owner}\") {{ value }}
1352 }} }} }}
1353 }} }}"
1354 );
1355 let response = self.query_node(query).await?;
1356 let balances = &response["chain"]["executionState"]["system"]["balances"];
1357 let balance = balances["entry"]["value"].as_str();
1358 match balance {
1359 None => Ok(Amount::ZERO),
1360 Some(amount) => Ok(Amount::from_str(amount)?),
1361 }
1362 }
1363
1364 pub fn make_application<A: ContractAbi>(
1365 &self,
1366 chain_id: &ChainId,
1367 application_id: &ApplicationId<A>,
1368 ) -> Result<ApplicationWrapper<A>> {
1369 let application_id = application_id.forget_abi().to_string();
1370 let link = format!(
1371 "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1372 self.port
1373 );
1374 Ok(ApplicationWrapper::from(link))
1375 }
1376
1377 pub async fn publish_data_blob(
1378 &self,
1379 chain_id: &ChainId,
1380 bytes: Vec<u8>,
1381 ) -> Result<CryptoHash> {
1382 let query = format!(
1383 "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1384 chain_id.to_value(),
1385 bytes.to_value(),
1386 );
1387 let data = self.query_node(query).await?;
1388 serde_json::from_value(data["publishDataBlob"].clone())
1389 .context("missing publishDataBlob field in response")
1390 }
1391
1392 pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1393 &self,
1394 chain_id: &ChainId,
1395 contract: PathBuf,
1396 service: PathBuf,
1397 vm_runtime: VmRuntime,
1398 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1399 let contract_code = Bytecode::load_from_file(&contract)?;
1400 let service_code = Bytecode::load_from_file(&service)?;
1401 let query = format!(
1402 "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1403 chain_id.to_value(),
1404 contract_code.to_value(),
1405 service_code.to_value(),
1406 vm_runtime.to_value(),
1407 );
1408 let data = self.query_node(query).await?;
1409 let module_str = data["publishModule"]
1410 .as_str()
1411 .context("module ID not found")?;
1412 let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1413 Ok(module_id.with_abi())
1414 }
1415
1416 pub async fn query_committees(&self, chain_id: &ChainId) -> Result<BTreeMap<Epoch, Committee>> {
1417 let query = format!(
1418 "query {{ chain(chainId:\"{chain_id}\") {{
1419 executionState {{ system {{ committees }} }}
1420 }} }}"
1421 );
1422 let mut response = self.query_node(query).await?;
1423 let committees = response["chain"]["executionState"]["system"]["committees"].take();
1424 Ok(serde_json::from_value(committees)?)
1425 }
1426
1427 pub async fn events_from_index(
1428 &self,
1429 chain_id: &ChainId,
1430 stream_id: &StreamId,
1431 start_index: u32,
1432 ) -> Result<Vec<IndexAndEvent>> {
1433 let query = format!(
1434 "query {{
1435 eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1436 {{ index event }}
1437 }}",
1438 stream_id.to_value()
1439 );
1440 let mut response = self.query_node(query).await?;
1441 let response = response["eventsFromIndex"].take();
1442 Ok(serde_json::from_value(response)?)
1443 }
1444
1445 pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1446 let n_try = 5;
1447 let query = query.as_ref();
1448 for i in 0..n_try {
1449 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1450 let url = format!("http://localhost:{}/", self.port);
1451 let client = reqwest_client();
1452 let result = client
1453 .post(url)
1454 .json(&json!({ "query": query }))
1455 .send()
1456 .await;
1457 if matches!(result, Err(ref error) if error.is_timeout()) {
1458 tracing::warn!(
1459 "Timeout when sending query {} to the node service",
1460 truncate_query_output(query)
1461 );
1462 continue;
1463 }
1464 let response = result.with_context(|| {
1465 format!(
1466 "query_node: failed to post query={}",
1467 truncate_query_output(query)
1468 )
1469 })?;
1470 ensure!(
1471 response.status().is_success(),
1472 "Query \"{}\" failed: {}",
1473 truncate_query_output(query),
1474 response
1475 .text()
1476 .await
1477 .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1478 );
1479 let value: Value = response.json().await.context("invalid JSON")?;
1480 if let Some(errors) = value.get("errors") {
1481 tracing::warn!(
1482 "Query \"{}\" failed: {}",
1483 truncate_query_output(query),
1484 errors
1485 );
1486 } else {
1487 return Ok(value["data"].clone());
1488 }
1489 }
1490 bail!(
1491 "Query \"{}\" failed after {} retries.",
1492 truncate_query_output(query),
1493 n_try
1494 );
1495 }
1496
1497 pub async fn create_application<
1498 Abi: ContractAbi,
1499 Parameters: Serialize,
1500 InstantiationArgument: Serialize,
1501 >(
1502 &self,
1503 chain_id: &ChainId,
1504 module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1505 parameters: &Parameters,
1506 argument: &InstantiationArgument,
1507 required_application_ids: &[ApplicationId],
1508 ) -> Result<ApplicationId<Abi>> {
1509 let module_id = module_id.forget_abi();
1510 let json_required_applications_ids = required_application_ids
1511 .iter()
1512 .map(ApplicationId::to_string)
1513 .collect::<Vec<_>>()
1514 .to_value();
1515 let new_parameters = serde_json::to_value(parameters)
1517 .context("could not create parameters JSON")?
1518 .to_value();
1519 let new_argument = serde_json::to_value(argument)
1520 .context("could not create argument JSON")?
1521 .to_value();
1522 let query = format!(
1523 "mutation {{ createApplication(\
1524 chainId: \"{chain_id}\",
1525 moduleId: \"{module_id}\", \
1526 parameters: {new_parameters}, \
1527 instantiationArgument: {new_argument}, \
1528 requiredApplicationIds: {json_required_applications_ids}) \
1529 }}"
1530 );
1531 let data = self.query_node(query).await?;
1532 let app_id_str = data["createApplication"]
1533 .as_str()
1534 .context("missing createApplication string in response")?
1535 .trim();
1536 Ok(app_id_str
1537 .parse::<ApplicationId>()
1538 .context("invalid application ID")?
1539 .with_abi())
1540 }
1541
1542 pub async fn chain_tip(&self, chain: ChainId) -> Result<Option<(CryptoHash, BlockHeight)>> {
1544 let query = format!(
1545 r#"query {{ block(chainId: "{chain}") {{
1546 hash
1547 block {{ header {{ height }} }}
1548 }} }}"#
1549 );
1550
1551 let mut response = self.query_node(&query).await?;
1552
1553 match (
1554 mem::take(&mut response["block"]["hash"]),
1555 mem::take(&mut response["block"]["block"]["header"]["height"]),
1556 ) {
1557 (Value::Null, Value::Null) => Ok(None),
1558 (Value::String(hash), Value::Number(height)) => Ok(Some((
1559 hash.parse()
1560 .context("Received an invalid hash {hash:?} for chain tip")?,
1561 BlockHeight(height.as_u64().unwrap()),
1562 ))),
1563 invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
1564 }
1565 }
1566
1567 pub async fn notifications(
1569 &self,
1570 chain_id: ChainId,
1571 ) -> Result<Pin<Box<impl Stream<Item = Result<Notification>>>>> {
1572 let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
1573 let url = format!("ws://localhost:{}/ws", self.port);
1574 let mut request = url.into_client_request()?;
1575 request.headers_mut().insert(
1576 "Sec-WebSocket-Protocol",
1577 HeaderValue::from_str("graphql-transport-ws")?,
1578 );
1579 let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1580 let init_json = json!({
1581 "type": "connection_init",
1582 "payload": {}
1583 });
1584 websocket.send(init_json.to_string().into()).await?;
1585 let text = websocket
1586 .next()
1587 .await
1588 .context("Failed to establish connection")??
1589 .into_text()?;
1590 ensure!(
1591 text == "{\"type\":\"connection_ack\"}",
1592 "Unexpected response: {text}"
1593 );
1594 let query_json = json!({
1595 "id": "1",
1596 "type": "start",
1597 "payload": {
1598 "query": query,
1599 "variables": {},
1600 "operationName": null
1601 }
1602 });
1603 websocket.send(query_json.to_string().into()).await?;
1604 Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
1605 |message| async {
1606 let text = message.into_text()?;
1607 let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1608 if let Some(errors) = value["payload"].get("errors") {
1609 bail!("Notification subscription failed: {errors:?}");
1610 }
1611 serde_json::from_value(value["payload"]["data"]["notifications"].clone())
1612 .context("Failed to deserialize notification")
1613 },
1614 )))
1615 }
1616}
1617
1618pub struct FaucetService {
1620 port: u16,
1621 child: Child,
1622 _temp_dir: tempfile::TempDir,
1623}
1624
1625impl FaucetService {
1626 fn new(port: u16, child: Child, temp_dir: tempfile::TempDir) -> Self {
1627 Self {
1628 port,
1629 child,
1630 _temp_dir: temp_dir,
1631 }
1632 }
1633
1634 pub async fn terminate(mut self) -> Result<()> {
1635 self.child
1636 .kill()
1637 .await
1638 .context("terminating faucet service")
1639 }
1640
1641 pub fn ensure_is_running(&mut self) -> Result<()> {
1642 self.child.ensure_is_running()
1643 }
1644
1645 pub fn instance(&self) -> Faucet {
1646 Faucet::new(format!("http://localhost:{}/", self.port))
1647 }
1648}
1649
1650pub struct ApplicationWrapper<A> {
1652 uri: String,
1653 _phantom: PhantomData<A>,
1654}
1655
1656impl<A> ApplicationWrapper<A> {
1657 pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
1658 let query = query.as_ref();
1659 let value = self.run_json_query(json!({ "query": query })).await?;
1660 Ok(value["data"].clone())
1661 }
1662
1663 pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
1664 const MAX_RETRIES: usize = 5;
1665
1666 for i in 0.. {
1667 let client = reqwest_client();
1668 let result = client.post(&self.uri).json(&query).send().await;
1669 let response = match result {
1670 Ok(response) => response,
1671 Err(error) if i < MAX_RETRIES => {
1672 tracing::warn!(
1673 "Failed to post query \"{}\": {error}; retrying",
1674 truncate_query_output_serialize(&query),
1675 );
1676 continue;
1677 }
1678 Err(error) => {
1679 let query = truncate_query_output_serialize(&query);
1680 return Err(error)
1681 .with_context(|| format!("run_json_query: failed to post query={query}"));
1682 }
1683 };
1684 ensure!(
1685 response.status().is_success(),
1686 "Query \"{}\" failed: {}",
1687 truncate_query_output_serialize(&query),
1688 response
1689 .text()
1690 .await
1691 .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1692 );
1693 let value: Value = response.json().await.context("invalid JSON")?;
1694 if let Some(errors) = value.get("errors") {
1695 bail!(
1696 "Query \"{}\" failed: {}",
1697 truncate_query_output_serialize(&query),
1698 errors
1699 );
1700 }
1701 return Ok(value);
1702 }
1703 unreachable!()
1704 }
1705
1706 pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
1707 let query = query.as_ref();
1708 self.run_graphql_query(&format!("query {{ {query} }}"))
1709 .await
1710 }
1711
1712 pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
1713 let query = query.as_ref().trim();
1714 let name = query
1715 .split_once(|ch: char| !ch.is_alphanumeric())
1716 .map_or(query, |(name, _)| name);
1717 let data = self.query(query).await?;
1718 serde_json::from_value(data[name].clone())
1719 .with_context(|| format!("{name} field missing in response"))
1720 }
1721
1722 pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
1723 let mutation = mutation.as_ref();
1724 self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
1725 .await
1726 }
1727
1728 pub async fn multiple_mutate(&self, mutations: &[String]) -> Result<Value> {
1729 let mut out = String::from("mutation {\n");
1730 for (index, mutation) in mutations.iter().enumerate() {
1731 out = format!("{} u{}: {}\n", out, index, mutation);
1732 }
1733 out.push_str("}\n");
1734 self.run_graphql_query(&out).await
1735 }
1736}
1737
1738impl<A> From<String> for ApplicationWrapper<A> {
1739 fn from(uri: String) -> ApplicationWrapper<A> {
1740 ApplicationWrapper {
1741 uri,
1742 _phantom: PhantomData,
1743 }
1744 }
1745}
1746
1747#[cfg(with_testing)]
1750fn notification_timeout() -> Duration {
1751 const NOTIFICATION_TIMEOUT_MS_ENV: &str = "LINERA_TEST_NOTIFICATION_TIMEOUT_MS";
1752 const NOTIFICATION_TIMEOUT_MS_DEFAULT: u64 = 10_000;
1753
1754 match env::var(NOTIFICATION_TIMEOUT_MS_ENV) {
1755 Ok(var) => Duration::from_millis(var.parse().unwrap_or_else(|error| {
1756 panic!("{NOTIFICATION_TIMEOUT_MS_ENV} is not a valid number: {error}")
1757 })),
1758 Err(env::VarError::NotPresent) => Duration::from_millis(NOTIFICATION_TIMEOUT_MS_DEFAULT),
1759 Err(env::VarError::NotUnicode(_)) => {
1760 panic!("{NOTIFICATION_TIMEOUT_MS_ENV} must be valid Unicode")
1761 }
1762 }
1763}
1764
1765#[cfg(with_testing)]
1766pub trait NotificationsExt {
1767 fn wait_for<T>(
1769 &mut self,
1770 f: impl FnMut(Notification) -> Option<T>,
1771 ) -> impl Future<Output = Result<T>>;
1772
1773 fn wait_for_events(
1776 &mut self,
1777 expected_height: impl Into<Option<BlockHeight>>,
1778 ) -> impl Future<Output = Result<BTreeSet<StreamId>>> {
1779 let expected_height = expected_height.into();
1780 self.wait_for(move |notification| {
1781 if let Reason::NewEvents {
1782 height,
1783 event_streams,
1784 ..
1785 } = notification.reason
1786 {
1787 if expected_height.is_none_or(|h| h == height) {
1788 return Some(event_streams);
1789 }
1790 }
1791 None
1792 })
1793 }
1794
1795 fn wait_for_block(
1798 &mut self,
1799 expected_height: impl Into<Option<BlockHeight>>,
1800 ) -> impl Future<Output = Result<CryptoHash>> {
1801 let expected_height = expected_height.into();
1802 self.wait_for(move |notification| {
1803 if let Reason::NewBlock { height, hash, .. } = notification.reason {
1804 if expected_height.is_none_or(|h| h == height) {
1805 return Some(hash);
1806 }
1807 }
1808 None
1809 })
1810 }
1811
1812 fn wait_for_bundle(
1815 &mut self,
1816 expected_origin: ChainId,
1817 expected_height: impl Into<Option<BlockHeight>>,
1818 ) -> impl Future<Output = Result<()>> {
1819 let expected_height = expected_height.into();
1820 self.wait_for(move |notification| {
1821 if let Reason::NewIncomingBundle { height, origin } = notification.reason {
1822 if expected_height.is_none_or(|h| h == height) && origin == expected_origin {
1823 return Some(());
1824 }
1825 }
1826 None
1827 })
1828 }
1829}
1830
1831#[cfg(with_testing)]
1832impl<S: Stream<Item = Result<Notification>>> NotificationsExt for Pin<Box<S>> {
1833 async fn wait_for<T>(&mut self, mut f: impl FnMut(Notification) -> Option<T>) -> Result<T> {
1834 let mut timeout = Box::pin(linera_base::time::timer::sleep(notification_timeout())).fuse();
1835 loop {
1836 let notification = futures::select! {
1837 () = timeout => bail!("Timeout waiting for notification"),
1838 notification = self.next().fuse() => notification.context("Stream closed")??,
1839 };
1840 if let Some(t) = f(notification) {
1841 return Ok(t);
1842 }
1843 }
1844 }
1845}