1use std::{
5 borrow::Cow,
6 collections::BTreeMap,
7 env,
8 marker::PhantomData,
9 mem,
10 path::{Path, PathBuf},
11 pin::Pin,
12 process::Stdio,
13 str::FromStr,
14 sync,
15 time::Duration,
16};
17
18use anyhow::{bail, ensure, Context, Result};
19use async_graphql::InputType;
20use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
21use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
22use heck::ToKebabCase;
23use linera_base::{
24 abi::ContractAbi,
25 command::{resolve_binary, CommandExt},
26 crypto::{CryptoHash, InMemorySigner},
27 data_types::{Amount, BlockHeight, Bytecode, Epoch},
28 identifiers::{
29 Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
30 },
31 vm::VmRuntime,
32};
33use linera_client::{client_options::ResourceControlPolicyConfig, wallet::Wallet};
34use linera_core::worker::Notification;
35use linera_execution::committee::Committee;
36use linera_faucet_client::Faucet;
37use serde::{de::DeserializeOwned, ser::Serialize};
38use serde_command_opts::to_args;
39use serde_json::{json, Value};
40use tempfile::TempDir;
41use tokio::{
42 io::{AsyncBufReadExt, BufReader},
43 process::{Child, Command},
44 sync::oneshot,
45 task::JoinHandle,
46};
47#[cfg(with_testing)]
48use {
49 futures::FutureExt as _,
50 linera_core::worker::Reason,
51 std::{collections::BTreeSet, future::Future},
52};
53
54use crate::{
55 cli::command::BenchmarkCommand,
56 cli_wrappers::{
57 local_net::{PathProvider, ProcessInbox},
58 Network,
59 },
60 util::{self, ChildExt},
61};
62
63const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
66
67fn reqwest_client() -> reqwest::Client {
68 reqwest::ClientBuilder::new()
69 .timeout(Duration::from_secs(30))
70 .build()
71 .unwrap()
72}
73
74pub struct ClientWrapper {
76 binary_path: sync::Mutex<Option<PathBuf>>,
77 testing_prng_seed: Option<u64>,
78 storage: String,
79 wallet: String,
80 keystore: String,
81 max_pending_message_bundles: usize,
82 network: Network,
83 pub path_provider: PathProvider,
84 on_drop: OnClientDrop,
85 extra_args: Vec<String>,
86}
87
88#[derive(Clone, Copy, Debug, Eq, PartialEq)]
90pub enum OnClientDrop {
91 CloseChains,
93 LeakChains,
95}
96
97impl ClientWrapper {
98 pub fn new(
99 path_provider: PathProvider,
100 network: Network,
101 testing_prng_seed: Option<u64>,
102 id: usize,
103 on_drop: OnClientDrop,
104 ) -> Self {
105 Self::new_with_extra_args(
106 path_provider,
107 network,
108 testing_prng_seed,
109 id,
110 on_drop,
111 vec!["--wait-for-outgoing-messages".to_string()],
112 )
113 }
114
115 pub fn new_with_extra_args(
116 path_provider: PathProvider,
117 network: Network,
118 testing_prng_seed: Option<u64>,
119 id: usize,
120 on_drop: OnClientDrop,
121 extra_args: Vec<String>,
122 ) -> Self {
123 let storage = format!(
124 "rocksdb:{}/client_{}.db",
125 path_provider.path().display(),
126 id
127 );
128 let wallet = format!("wallet_{}.json", id);
129 let keystore = format!("keystore_{}.json", id);
130 Self {
131 binary_path: sync::Mutex::new(None),
132 testing_prng_seed,
133 storage,
134 wallet,
135 keystore,
136 max_pending_message_bundles: 10_000,
137 network,
138 path_provider,
139 on_drop,
140 extra_args,
141 }
142 }
143
144 pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
146 let tmp = TempDir::new()?;
147 let mut command = self.command().await?;
148 command
149 .current_dir(tmp.path())
150 .arg("project")
151 .arg("new")
152 .arg(project_name)
153 .arg("--linera-root")
154 .arg(linera_root)
155 .spawn_and_wait_for_stdout()
156 .await?;
157 Ok(tmp)
158 }
159
160 pub async fn project_publish<T: Serialize>(
162 &self,
163 path: PathBuf,
164 required_application_ids: Vec<String>,
165 publisher: impl Into<Option<ChainId>>,
166 argument: &T,
167 ) -> Result<String> {
168 let json_parameters = serde_json::to_string(&())?;
169 let json_argument = serde_json::to_string(argument)?;
170 let mut command = self.command().await?;
171 command
172 .arg("project")
173 .arg("publish-and-create")
174 .arg(path)
175 .args(publisher.into().iter().map(ChainId::to_string))
176 .args(["--json-parameters", &json_parameters])
177 .args(["--json-argument", &json_argument]);
178 if !required_application_ids.is_empty() {
179 command.arg("--required-application-ids");
180 command.args(required_application_ids);
181 }
182 let stdout = command.spawn_and_wait_for_stdout().await?;
183 Ok(stdout.trim().to_string())
184 }
185
186 pub async fn project_test(&self, path: &Path) -> Result<()> {
188 self.command()
189 .await
190 .context("failed to create project test command")?
191 .current_dir(path)
192 .arg("project")
193 .arg("test")
194 .spawn_and_wait_for_stdout()
195 .await?;
196 Ok(())
197 }
198
199 async fn command_with_envs_and_arguments(
200 &self,
201 envs: &[(&str, &str)],
202 arguments: impl IntoIterator<Item = Cow<'_, str>>,
203 ) -> Result<Command> {
204 let mut command = self.command_binary().await?;
205 command.current_dir(self.path_provider.path());
206 for (key, value) in envs {
207 command.env(key, value);
208 }
209 for argument in arguments {
210 command.arg(&*argument);
211 }
212 Ok(command)
213 }
214
215 async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
216 self.command_with_envs_and_arguments(envs, self.command_arguments())
217 .await
218 }
219
220 async fn command_with_arguments(
221 &self,
222 arguments: impl IntoIterator<Item = Cow<'_, str>>,
223 ) -> Result<Command> {
224 self.command_with_envs_and_arguments(
225 &[(
226 "RUST_LOG",
227 &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
228 )],
229 arguments,
230 )
231 .await
232 }
233
234 async fn command(&self) -> Result<Command> {
235 self.command_with_envs(&[(
236 "RUST_LOG",
237 &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
238 )])
239 .await
240 }
241
242 fn required_command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
243 [
244 "--wallet".into(),
245 self.wallet.as_str().into(),
246 "--keystore".into(),
247 self.keystore.as_str().into(),
248 "--storage".into(),
249 self.storage.as_str().into(),
250 "--send-timeout-ms".into(),
251 "500000".into(),
252 "--recv-timeout-ms".into(),
253 "500000".into(),
254 ]
255 .into_iter()
256 .chain(self.extra_args.iter().map(|s| s.as_str().into()))
257 }
258
259 fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
261 self.required_command_arguments().chain([
262 "--max-pending-message-bundles".into(),
263 self.max_pending_message_bundles.to_string().into(),
264 ])
265 }
266
267 async fn command_binary(&self) -> Result<Command> {
271 match self.command_with_cached_binary_path() {
272 Some(command) => Ok(command),
273 None => {
274 let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
275 let command = Command::new(&resolved_path);
276
277 self.set_cached_binary_path(resolved_path);
278
279 Ok(command)
280 }
281 }
282 }
283
284 fn command_with_cached_binary_path(&self) -> Option<Command> {
286 let binary_path = self.binary_path.lock().unwrap();
287
288 binary_path.as_ref().map(Command::new)
289 }
290
291 fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
299 let mut binary_path = self.binary_path.lock().unwrap();
300
301 if binary_path.is_none() {
302 *binary_path = Some(new_binary_path);
303 } else {
304 assert_eq!(*binary_path, Some(new_binary_path));
305 }
306 }
307
308 pub async fn create_genesis_config(
310 &self,
311 num_other_initial_chains: u32,
312 initial_funding: Amount,
313 policy_config: ResourceControlPolicyConfig,
314 http_allow_list: Option<Vec<String>>,
315 ) -> Result<()> {
316 let mut command = self.command().await?;
317 command
318 .args([
319 "create-genesis-config",
320 &num_other_initial_chains.to_string(),
321 ])
322 .args(["--initial-funding", &initial_funding.to_string()])
323 .args(["--committee", "committee.json"])
324 .args(["--genesis", "genesis.json"])
325 .args([
326 "--policy-config",
327 &policy_config.to_string().to_kebab_case(),
328 ]);
329 if let Some(allow_list) = http_allow_list {
330 command
331 .arg("--http-request-allow-list")
332 .arg(allow_list.join(","));
333 }
334 if let Some(seed) = self.testing_prng_seed {
335 command.arg("--testing-prng-seed").arg(seed.to_string());
336 }
337 command.spawn_and_wait_for_stdout().await?;
338 Ok(())
339 }
340
341 pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
344 let mut command = self.command().await?;
345 command.args(["wallet", "init"]);
346 match faucet {
347 None => command.args(["--genesis", "genesis.json"]),
348 Some(faucet) => command.args(["--faucet", faucet.url()]),
349 };
350 if let Some(seed) = self.testing_prng_seed {
351 command.arg("--testing-prng-seed").arg(seed.to_string());
352 }
353 command.spawn_and_wait_for_stdout().await?;
354 Ok(())
355 }
356
357 pub async fn request_chain(
359 &self,
360 faucet: &Faucet,
361 set_default: bool,
362 ) -> Result<(ChainId, AccountOwner)> {
363 let mut command = self.command().await?;
364 command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
365 if set_default {
366 command.arg("--set-default");
367 }
368 let stdout = command.spawn_and_wait_for_stdout().await?;
369 let mut lines = stdout.split_whitespace();
370 let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
371 let owner = lines.next().context("missing chain owner")?.parse()?;
372 Ok((chain_id, owner))
373 }
374
375 #[expect(clippy::too_many_arguments)]
377 pub async fn publish_and_create<
378 A: ContractAbi,
379 Parameters: Serialize,
380 InstantiationArgument: Serialize,
381 >(
382 &self,
383 contract: PathBuf,
384 service: PathBuf,
385 vm_runtime: VmRuntime,
386 parameters: &Parameters,
387 argument: &InstantiationArgument,
388 required_application_ids: &[ApplicationId],
389 publisher: impl Into<Option<ChainId>>,
390 ) -> Result<ApplicationId<A>> {
391 let json_parameters = serde_json::to_string(parameters)?;
392 let json_argument = serde_json::to_string(argument)?;
393 let mut command = self.command().await?;
394 let vm_runtime = format!("{}", vm_runtime);
395 command
396 .arg("publish-and-create")
397 .args([contract, service])
398 .args(["--vm-runtime", &vm_runtime.to_lowercase()])
399 .args(publisher.into().iter().map(ChainId::to_string))
400 .args(["--json-parameters", &json_parameters])
401 .args(["--json-argument", &json_argument]);
402 if !required_application_ids.is_empty() {
403 command.arg("--required-application-ids");
404 command.args(
405 required_application_ids
406 .iter()
407 .map(ApplicationId::to_string),
408 );
409 }
410 let stdout = command.spawn_and_wait_for_stdout().await?;
411 Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
412 }
413
414 pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
416 &self,
417 contract: PathBuf,
418 service: PathBuf,
419 vm_runtime: VmRuntime,
420 publisher: impl Into<Option<ChainId>>,
421 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
422 let stdout = self
423 .command()
424 .await?
425 .arg("publish-module")
426 .args([contract, service])
427 .args(["--vm-runtime", &format!("{}", vm_runtime).to_lowercase()])
428 .args(publisher.into().iter().map(ChainId::to_string))
429 .spawn_and_wait_for_stdout()
430 .await?;
431 let module_id: ModuleId = stdout.trim().parse()?;
432 Ok(module_id.with_abi())
433 }
434
435 pub async fn create_application<
437 Abi: ContractAbi,
438 Parameters: Serialize,
439 InstantiationArgument: Serialize,
440 >(
441 &self,
442 module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
443 parameters: &Parameters,
444 argument: &InstantiationArgument,
445 required_application_ids: &[ApplicationId],
446 creator: impl Into<Option<ChainId>>,
447 ) -> Result<ApplicationId<Abi>> {
448 let json_parameters = serde_json::to_string(parameters)?;
449 let json_argument = serde_json::to_string(argument)?;
450 let mut command = self.command().await?;
451 command
452 .arg("create-application")
453 .arg(module_id.forget_abi().to_string())
454 .args(["--json-parameters", &json_parameters])
455 .args(["--json-argument", &json_argument])
456 .args(creator.into().iter().map(ChainId::to_string));
457 if !required_application_ids.is_empty() {
458 command.arg("--required-application-ids");
459 command.args(
460 required_application_ids
461 .iter()
462 .map(ApplicationId::to_string),
463 );
464 }
465 let stdout = command.spawn_and_wait_for_stdout().await?;
466 Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
467 }
468
469 pub async fn run_node_service(
471 &self,
472 port: impl Into<Option<u16>>,
473 process_inbox: ProcessInbox,
474 ) -> Result<NodeService> {
475 let port = port.into().unwrap_or(8080);
476 let mut command = self.command().await?;
477 command.arg("service");
478 if let ProcessInbox::Skip = process_inbox {
479 command.arg("--listener-skip-process-inbox");
480 }
481 if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
482 command.args(var.split_whitespace());
483 }
484 let child = command
485 .args(["--port".to_string(), port.to_string()])
486 .spawn_into()?;
487 let client = reqwest_client();
488 for i in 0..10 {
489 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
490 let request = client
491 .get(format!("http://localhost:{}/", port))
492 .send()
493 .await;
494 if request.is_ok() {
495 tracing::info!("Node service has started");
496 return Ok(NodeService::new(port, child));
497 } else {
498 tracing::warn!("Waiting for node service to start");
499 }
500 }
501 bail!("Failed to start node service");
502 }
503
504 pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
506 let mut command = self.command().await?;
507 command.arg("validator").arg("query").arg(address);
508 let stdout = command.spawn_and_wait_for_stdout().await?;
509
510 let hash = stdout
513 .lines()
514 .find_map(|line| {
515 line.strip_prefix("Genesis config hash: ")
516 .and_then(|hash_str| hash_str.trim().parse().ok())
517 })
518 .context("error while parsing the result of `linera validator query`")?;
519 Ok(hash)
520 }
521
522 pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
524 let mut command = self.command().await?;
525 command.arg("validator").arg("list");
526 if let Some(chain_id) = chain_id {
527 command.args(["--chain-id", &chain_id.to_string()]);
528 }
529 command.spawn_and_wait_for_stdout().await?;
530 Ok(())
531 }
532
533 pub async fn sync_validator(
535 &self,
536 chain_ids: impl IntoIterator<Item = &ChainId>,
537 validator_address: impl Into<String>,
538 ) -> Result<()> {
539 let mut command = self.command().await?;
540 command
541 .arg("validator")
542 .arg("sync")
543 .arg(validator_address.into());
544 let mut chain_ids = chain_ids.into_iter().peekable();
545 if chain_ids.peek().is_some() {
546 command
547 .arg("--chains")
548 .args(chain_ids.map(ChainId::to_string));
549 }
550 command.spawn_and_wait_for_stdout().await?;
551 Ok(())
552 }
553
554 pub async fn run_faucet(
556 &self,
557 port: impl Into<Option<u16>>,
558 chain_id: Option<ChainId>,
559 amount: Amount,
560 ) -> Result<FaucetService> {
561 let port = port.into().unwrap_or(8080);
562 let temp_dir = tempfile::tempdir()
563 .context("Failed to create temporary directory for faucet storage")?;
564 let storage_path = temp_dir.path().join("faucet_storage.sqlite");
565 let mut command = self.command().await?;
566 let command = command
567 .arg("faucet")
568 .args(["--port".to_string(), port.to_string()])
569 .args(["--amount".to_string(), amount.to_string()])
570 .args([
571 "--storage-path".to_string(),
572 storage_path.to_string_lossy().to_string(),
573 ]);
574 if let Some(chain_id) = chain_id {
575 command.arg(chain_id.to_string());
576 }
577 let child = command.spawn_into()?;
578 let client = reqwest_client();
579 for i in 0..10 {
580 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
581 let request = client
582 .get(format!("http://localhost:{}/", port))
583 .send()
584 .await;
585 if request.is_ok() {
586 tracing::info!("Faucet has started");
587 return Ok(FaucetService::new(port, child, temp_dir));
588 } else {
589 tracing::debug!("Waiting for faucet to start");
590 }
591 }
592 bail!("Failed to start faucet");
593 }
594
595 pub async fn local_balance(&self, account: Account) -> Result<Amount> {
597 let stdout = self
598 .command()
599 .await?
600 .arg("local-balance")
601 .arg(account.to_string())
602 .spawn_and_wait_for_stdout()
603 .await?;
604 let amount = stdout
605 .trim()
606 .parse()
607 .context("error while parsing the result of `linera local-balance`")?;
608 Ok(amount)
609 }
610
611 pub async fn query_balance(&self, account: Account) -> Result<Amount> {
613 let stdout = self
614 .command()
615 .await?
616 .arg("query-balance")
617 .arg(account.to_string())
618 .spawn_and_wait_for_stdout()
619 .await?;
620 let amount = stdout
621 .trim()
622 .parse()
623 .context("error while parsing the result of `linera query-balance`")?;
624 Ok(amount)
625 }
626
627 pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
629 self.command()
630 .await?
631 .arg("sync")
632 .arg(chain_id.to_string())
633 .spawn_and_wait_for_stdout()
634 .await?;
635 Ok(())
636 }
637
638 pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
640 self.command()
641 .await?
642 .arg("process-inbox")
643 .arg(chain_id.to_string())
644 .spawn_and_wait_for_stdout()
645 .await?;
646 Ok(())
647 }
648
649 pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
651 self.command()
652 .await?
653 .arg("transfer")
654 .arg(amount.to_string())
655 .args(["--from", &from.to_string()])
656 .args(["--to", &to.to_string()])
657 .spawn_and_wait_for_stdout()
658 .await?;
659 Ok(())
660 }
661
662 pub async fn transfer_with_silent_logs(
664 &self,
665 amount: Amount,
666 from: ChainId,
667 to: ChainId,
668 ) -> Result<()> {
669 self.command()
670 .await?
671 .env("RUST_LOG", "off")
672 .arg("transfer")
673 .arg(amount.to_string())
674 .args(["--from", &from.to_string()])
675 .args(["--to", &to.to_string()])
676 .spawn_and_wait_for_stdout()
677 .await?;
678 Ok(())
679 }
680
681 pub async fn transfer_with_accounts(
683 &self,
684 amount: Amount,
685 from: Account,
686 to: Account,
687 ) -> Result<()> {
688 self.command()
689 .await?
690 .arg("transfer")
691 .arg(amount.to_string())
692 .args(["--from", &from.to_string()])
693 .args(["--to", &to.to_string()])
694 .spawn_and_wait_for_stdout()
695 .await?;
696 Ok(())
697 }
698
699 fn benchmark_command_internal(command: &mut Command, args: BenchmarkCommand) -> Result<()> {
700 let mut formatted_args = to_args(&args)?;
701 let subcommand = formatted_args.remove(0);
702 formatted_args.remove(0);
705 let options = formatted_args
706 .chunks_exact(2)
707 .flat_map(|pair| {
708 let option = format!("--{}", pair[0]);
709 match pair[1].as_str() {
710 "true" => vec![option],
711 "false" => vec![],
712 _ => vec![option, pair[1].clone()],
713 }
714 })
715 .collect::<Vec<_>>();
716 command
717 .args([
718 "--max-pending-message-bundles",
719 &args.transactions_per_block().to_string(),
720 ])
721 .arg("benchmark")
722 .arg(subcommand)
723 .args(options);
724 Ok(())
725 }
726
727 async fn benchmark_command_with_envs(
728 &self,
729 args: BenchmarkCommand,
730 envs: &[(&str, &str)],
731 ) -> Result<Command> {
732 let mut command = self
733 .command_with_envs_and_arguments(envs, self.required_command_arguments())
734 .await?;
735 Self::benchmark_command_internal(&mut command, args)?;
736 Ok(command)
737 }
738
739 async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
740 let mut command = self
741 .command_with_arguments(self.required_command_arguments())
742 .await?;
743 Self::benchmark_command_internal(&mut command, args)?;
744 Ok(command)
745 }
746
747 pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
749 let mut command = self.benchmark_command(args).await?;
750 command.spawn_and_wait_for_stdout().await?;
751 Ok(())
752 }
753
754 pub async fn benchmark_detached(
757 &self,
758 args: BenchmarkCommand,
759 tx: oneshot::Sender<()>,
760 ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
761 let mut child = self
762 .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
763 .await?
764 .kill_on_drop(true)
765 .stdin(Stdio::piped())
766 .stdout(Stdio::piped())
767 .stderr(Stdio::piped())
768 .spawn()?;
769
770 let pid = child.id().expect("failed to get pid");
771 let stdout = child.stdout.take().expect("stdout not open");
772 let stdout_handle = tokio::spawn(async move {
773 let mut lines = BufReader::new(stdout).lines();
774 while let Ok(Some(line)) = lines.next_line().await {
775 println!("benchmark{{pid={pid}}} {line}");
776 }
777 });
778
779 let stderr = child.stderr.take().expect("stderr not open");
780 let stderr_handle = tokio::spawn(async move {
781 let mut lines = BufReader::new(stderr).lines();
782 let mut tx = Some(tx);
783 while let Ok(Some(line)) = lines.next_line().await {
784 if line.contains("Ready to start benchmark") {
785 tx.take()
786 .expect("Should only send signal once")
787 .send(())
788 .expect("failed to send ready signal to main thread");
789 } else {
790 println!("benchmark{{pid={pid}}} {line}");
791 }
792 }
793 });
794 Ok((child, stdout_handle, stderr_handle))
795 }
796
797 async fn open_chain_internal(
798 &self,
799 from: ChainId,
800 owner: Option<AccountOwner>,
801 initial_balance: Amount,
802 super_owner: bool,
803 ) -> Result<(ChainId, AccountOwner)> {
804 let mut command = self.command().await?;
805 command
806 .arg("open-chain")
807 .args(["--from", &from.to_string()])
808 .args(["--initial-balance", &initial_balance.to_string()]);
809
810 if let Some(owner) = owner {
811 command.args(["--owner", &owner.to_string()]);
812 }
813
814 if super_owner {
815 command.arg("--super-owner");
816 }
817
818 let stdout = command.spawn_and_wait_for_stdout().await?;
819 let mut split = stdout.split('\n');
820 let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
821 let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
822 if let Some(owner) = owner {
823 assert_eq!(owner, new_owner);
824 }
825 Ok((chain_id, new_owner))
826 }
827
828 pub async fn open_chain_super_owner(
830 &self,
831 from: ChainId,
832 owner: Option<AccountOwner>,
833 initial_balance: Amount,
834 ) -> Result<(ChainId, AccountOwner)> {
835 self.open_chain_internal(from, owner, initial_balance, true)
836 .await
837 }
838
839 pub async fn open_chain(
841 &self,
842 from: ChainId,
843 owner: Option<AccountOwner>,
844 initial_balance: Amount,
845 ) -> Result<(ChainId, AccountOwner)> {
846 self.open_chain_internal(from, owner, initial_balance, false)
847 .await
848 }
849
850 pub async fn open_and_assign(
852 &self,
853 client: &ClientWrapper,
854 initial_balance: Amount,
855 ) -> Result<ChainId> {
856 let our_chain = self
857 .load_wallet()?
858 .default_chain()
859 .context("no default chain found")?;
860 let owner = client.keygen().await?;
861 let (new_chain, _) = self
862 .open_chain(our_chain, Some(owner), initial_balance)
863 .await?;
864 client.assign(owner, new_chain).await?;
865 Ok(new_chain)
866 }
867
868 pub async fn open_multi_owner_chain(
869 &self,
870 from: ChainId,
871 owners: Vec<AccountOwner>,
872 weights: Vec<u64>,
873 multi_leader_rounds: u32,
874 balance: Amount,
875 base_timeout_ms: u64,
876 ) -> Result<ChainId> {
877 let mut command = self.command().await?;
878 command
879 .arg("open-multi-owner-chain")
880 .args(["--from", &from.to_string()])
881 .arg("--owners")
882 .args(owners.iter().map(AccountOwner::to_string))
883 .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
884 if !weights.is_empty() {
885 command
886 .arg("--owner-weights")
887 .args(weights.iter().map(u64::to_string));
888 };
889 command
890 .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
891 .args(["--initial-balance", &balance.to_string()]);
892
893 let stdout = command.spawn_and_wait_for_stdout().await?;
894 let mut split = stdout.split('\n');
895 let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
896
897 Ok(chain_id)
898 }
899
900 pub async fn change_ownership(
901 &self,
902 chain_id: ChainId,
903 super_owners: Vec<AccountOwner>,
904 owners: Vec<AccountOwner>,
905 ) -> Result<()> {
906 let mut command = self.command().await?;
907 command
908 .arg("change-ownership")
909 .args(["--chain-id", &chain_id.to_string()]);
910 if !super_owners.is_empty() {
911 command
912 .arg("--super-owners")
913 .args(super_owners.iter().map(AccountOwner::to_string));
914 }
915 if !owners.is_empty() {
916 command
917 .arg("--owners")
918 .args(owners.iter().map(AccountOwner::to_string));
919 }
920 command.spawn_and_wait_for_stdout().await?;
921 Ok(())
922 }
923
924 pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
926 let mut command = self.command().await?;
927 command
928 .args(["wallet", "follow-chain"])
929 .arg(chain_id.to_string());
930 if sync {
931 command.arg("--sync");
932 }
933 command.spawn_and_wait_for_stdout().await?;
934 Ok(())
935 }
936
937 pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
939 let mut command = self.command().await?;
940 command
941 .args(["wallet", "forget-chain"])
942 .arg(chain_id.to_string());
943 command.spawn_and_wait_for_stdout().await?;
944 Ok(())
945 }
946
947 pub async fn retry_pending_block(
948 &self,
949 chain_id: Option<ChainId>,
950 ) -> Result<Option<CryptoHash>> {
951 let mut command = self.command().await?;
952 command.arg("retry-pending-block");
953 if let Some(chain_id) = chain_id {
954 command.arg(chain_id.to_string());
955 }
956 let stdout = command.spawn_and_wait_for_stdout().await?;
957 let stdout = stdout.trim();
958 if stdout.is_empty() {
959 Ok(None)
960 } else {
961 Ok(Some(CryptoHash::from_str(stdout)?))
962 }
963 }
964
965 pub async fn publish_data_blob(
967 &self,
968 path: &Path,
969 chain_id: Option<ChainId>,
970 ) -> Result<CryptoHash> {
971 let mut command = self.command().await?;
972 command.arg("publish-data-blob").arg(path);
973 if let Some(chain_id) = chain_id {
974 command.arg(chain_id.to_string());
975 }
976 let stdout = command.spawn_and_wait_for_stdout().await?;
977 let stdout = stdout.trim();
978 Ok(CryptoHash::from_str(stdout)?)
979 }
980
981 pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
983 let mut command = self.command().await?;
984 command.arg("read-data-blob").arg(hash.to_string());
985 if let Some(chain_id) = chain_id {
986 command.arg(chain_id.to_string());
987 }
988 command.spawn_and_wait_for_stdout().await?;
989 Ok(())
990 }
991
992 pub fn load_wallet(&self) -> Result<Wallet> {
993 util::read_json(self.wallet_path())
994 }
995
996 pub fn load_keystore(&self) -> Result<InMemorySigner> {
997 util::read_json(self.keystore_path())
998 }
999
1000 pub fn wallet_path(&self) -> PathBuf {
1001 self.path_provider.path().join(&self.wallet)
1002 }
1003
1004 pub fn keystore_path(&self) -> PathBuf {
1005 self.path_provider.path().join(&self.keystore)
1006 }
1007
1008 pub fn storage_path(&self) -> &str {
1009 &self.storage
1010 }
1011
1012 pub fn get_owner(&self) -> Option<AccountOwner> {
1013 let wallet = self.load_wallet().ok()?;
1014 let chain_id = wallet.default_chain()?;
1015 wallet.get(chain_id)?.owner
1016 }
1017
1018 pub fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
1019 self.load_wallet()
1020 .ok()
1021 .is_some_and(|wallet| wallet.get(chain).is_some())
1022 }
1023
1024 pub async fn set_validator(
1025 &self,
1026 validator_key: &(String, String),
1027 port: usize,
1028 votes: usize,
1029 ) -> Result<()> {
1030 let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1031 self.command()
1032 .await?
1033 .arg("validator")
1034 .arg("add")
1035 .args(["--public-key", &validator_key.0])
1036 .args(["--account-key", &validator_key.1])
1037 .args(["--address", &address])
1038 .args(["--votes", &votes.to_string()])
1039 .spawn_and_wait_for_stdout()
1040 .await?;
1041 Ok(())
1042 }
1043
1044 pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
1045 self.command()
1046 .await?
1047 .arg("validator")
1048 .arg("remove")
1049 .args(["--public-key", validator_key])
1050 .spawn_and_wait_for_stdout()
1051 .await?;
1052 Ok(())
1053 }
1054
1055 pub async fn change_validators(
1056 &self,
1057 add_validators: &[(String, String, usize, usize)], modify_validators: &[(String, String, usize, usize)], remove_validators: &[String],
1060 ) -> Result<()> {
1061 use std::str::FromStr;
1062
1063 use linera_base::crypto::{AccountPublicKey, ValidatorPublicKey};
1064
1065 let mut changes = std::collections::HashMap::new();
1068
1069 for (public_key_str, account_key_str, port, votes) in
1071 add_validators.iter().chain(modify_validators.iter())
1072 {
1073 let public_key = ValidatorPublicKey::from_str(public_key_str)
1074 .with_context(|| format!("Invalid validator public key: {}", public_key_str))?;
1075
1076 let account_key = AccountPublicKey::from_str(account_key_str)
1077 .with_context(|| format!("Invalid account public key: {}", account_key_str))?;
1078
1079 let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1080
1081 let change = crate::cli::validator::ValidatorChange {
1083 account_key,
1084 network_address: address,
1085 votes: std::num::NonZero::new(*votes as u64).context("Votes must be non-zero")?,
1086 };
1087
1088 changes.insert(public_key, Some(change));
1089 }
1090
1091 for validator_key_str in remove_validators {
1093 let public_key = ValidatorPublicKey::from_str(validator_key_str)
1094 .with_context(|| format!("Invalid validator public key: {}", validator_key_str))?;
1095 changes.insert(public_key, None);
1096 }
1097
1098 let temp_file = tempfile::NamedTempFile::new()
1100 .context("Failed to create temporary file for validator changes")?;
1101 serde_json::to_writer(&temp_file, &changes)
1102 .context("Failed to write validator changes to file")?;
1103 let temp_path = temp_file.path();
1104
1105 self.command()
1106 .await?
1107 .arg("validator")
1108 .arg("update")
1109 .arg(temp_path)
1110 .arg("--yes") .spawn_and_wait_for_stdout()
1112 .await?;
1113
1114 Ok(())
1115 }
1116
1117 pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
1118 self.command()
1119 .await?
1120 .arg("revoke-epochs")
1121 .arg(epoch.to_string())
1122 .spawn_and_wait_for_stdout()
1123 .await?;
1124 Ok(())
1125 }
1126
1127 pub async fn keygen(&self) -> Result<AccountOwner> {
1129 let stdout = self
1130 .command()
1131 .await?
1132 .arg("keygen")
1133 .spawn_and_wait_for_stdout()
1134 .await?;
1135 AccountOwner::from_str(stdout.as_str().trim())
1136 }
1137
1138 pub fn default_chain(&self) -> Option<ChainId> {
1140 self.load_wallet().ok()?.default_chain()
1141 }
1142
1143 pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
1145 let _stdout = self
1146 .command()
1147 .await?
1148 .arg("assign")
1149 .args(["--owner", &owner.to_string()])
1150 .args(["--chain-id", &chain_id.to_string()])
1151 .spawn_and_wait_for_stdout()
1152 .await?;
1153 Ok(())
1154 }
1155
1156 pub async fn set_preferred_owner(
1158 &self,
1159 chain_id: ChainId,
1160 owner: Option<AccountOwner>,
1161 ) -> Result<()> {
1162 let mut owner_arg = vec!["--owner".to_string()];
1163 if let Some(owner) = owner {
1164 owner_arg.push(owner.to_string());
1165 };
1166 self.command()
1167 .await?
1168 .arg("set-preferred-owner")
1169 .args(["--chain-id", &chain_id.to_string()])
1170 .args(owner_arg)
1171 .spawn_and_wait_for_stdout()
1172 .await?;
1173 Ok(())
1174 }
1175
1176 pub async fn build_application(
1177 &self,
1178 path: &Path,
1179 name: &str,
1180 is_workspace: bool,
1181 ) -> Result<(PathBuf, PathBuf)> {
1182 Command::new("cargo")
1183 .current_dir(self.path_provider.path())
1184 .arg("build")
1185 .arg("--release")
1186 .args(["--target", "wasm32-unknown-unknown"])
1187 .arg("--manifest-path")
1188 .arg(path.join("Cargo.toml"))
1189 .spawn_and_wait_for_stdout()
1190 .await?;
1191
1192 let release_dir = match is_workspace {
1193 true => path.join("../target/wasm32-unknown-unknown/release"),
1194 false => path.join("target/wasm32-unknown-unknown/release"),
1195 };
1196
1197 let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1198 let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1199
1200 let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1201 let service_size = fs_err::tokio::metadata(&service).await?.len();
1202 tracing::info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1203
1204 Ok((contract, service))
1205 }
1206}
1207
1208impl Drop for ClientWrapper {
1209 fn drop(&mut self) {
1210 use std::process::Command as SyncCommand;
1211
1212 if self.on_drop != OnClientDrop::CloseChains {
1213 return;
1214 }
1215
1216 let Ok(binary_path) = self.binary_path.lock() else {
1217 tracing::error!(
1218 "Failed to close chains because a thread panicked with a lock to `binary_path`"
1219 );
1220 return;
1221 };
1222
1223 let Some(binary_path) = binary_path.as_ref() else {
1224 tracing::warn!(
1225 "Assuming no chains need to be closed, because the command binary was never \
1226 resolved and therefore presumably never called"
1227 );
1228 return;
1229 };
1230
1231 let working_directory = self.path_provider.path();
1232 let mut wallet_show_command = SyncCommand::new(binary_path);
1233
1234 for argument in self.command_arguments() {
1235 wallet_show_command.arg(&*argument);
1236 }
1237
1238 let Ok(wallet_show_output) = wallet_show_command
1239 .current_dir(working_directory)
1240 .args(["wallet", "show", "--short", "--owned"])
1241 .output()
1242 else {
1243 tracing::warn!("Failed to execute `wallet show --short` to list chains to close");
1244 return;
1245 };
1246
1247 if !wallet_show_output.status.success() {
1248 tracing::warn!("Failed to list chains in the wallet to close them");
1249 return;
1250 }
1251
1252 let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1253 tracing::warn!(
1254 "Failed to close chains because `linera wallet show --short` \
1255 returned a non-UTF-8 output"
1256 );
1257 return;
1258 };
1259
1260 let chain_ids = chain_list_string
1261 .split('\n')
1262 .map(|line| line.trim())
1263 .filter(|line| !line.is_empty());
1264
1265 for chain_id in chain_ids {
1266 let mut close_chain_command = SyncCommand::new(binary_path);
1267
1268 for argument in self.command_arguments() {
1269 close_chain_command.arg(&*argument);
1270 }
1271
1272 close_chain_command.current_dir(working_directory);
1273
1274 match close_chain_command.args(["close-chain", chain_id]).status() {
1275 Ok(status) if status.success() => (),
1276 Ok(failure) => tracing::warn!("Failed to close chain {chain_id}: {failure}"),
1277 Err(error) => tracing::warn!("Failed to close chain {chain_id}: {error}"),
1278 }
1279 }
1280 }
1281}
1282
1283#[cfg(with_testing)]
1284impl ClientWrapper {
1285 pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1286 self.build_application(Self::example_path(name)?.as_path(), name, true)
1287 .await
1288 }
1289
1290 pub fn example_path(name: &str) -> Result<PathBuf> {
1291 Ok(env::current_dir()?.join("../examples/").join(name))
1292 }
1293}
1294
1295fn truncate_query_output(input: &str) -> String {
1296 let max_len = 1000;
1297 if input.len() < max_len {
1298 input.to_string()
1299 } else {
1300 format!("{} ...", input.get(..max_len).unwrap())
1301 }
1302}
1303
1304fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1305 let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1306 let max_len = 200;
1307 if query.len() < max_len {
1308 query
1309 } else {
1310 format!("{} ...", query.get(..max_len).unwrap())
1311 }
1312}
1313
1314pub struct NodeService {
1316 port: u16,
1317 child: Child,
1318}
1319
1320impl NodeService {
1321 fn new(port: u16, child: Child) -> Self {
1322 Self { port, child }
1323 }
1324
1325 pub async fn terminate(mut self) -> Result<()> {
1326 self.child.kill().await.context("terminating node service")
1327 }
1328
1329 pub fn port(&self) -> u16 {
1330 self.port
1331 }
1332
1333 pub fn ensure_is_running(&mut self) -> Result<()> {
1334 self.child.ensure_is_running()
1335 }
1336
1337 pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1338 let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1339 let mut data = self.query_node(query).await?;
1340 Ok(serde_json::from_value(data["processInbox"].take())?)
1341 }
1342
1343 pub async fn sync(&self, chain_id: &ChainId) -> Result<u64> {
1344 let query = format!("mutation {{ sync(chainId: \"{chain_id}\") }}");
1345 let mut data = self.query_node(query).await?;
1346 Ok(serde_json::from_value(data["sync"].take())?)
1347 }
1348
1349 pub async fn transfer(
1350 &self,
1351 chain_id: ChainId,
1352 owner: AccountOwner,
1353 recipient: Account,
1354 amount: Amount,
1355 ) -> Result<CryptoHash> {
1356 let json_owner = owner.to_value();
1357 let json_recipient = recipient.to_value();
1358 let query = format!(
1359 "mutation {{ transfer(\
1360 chainId: \"{chain_id}\", \
1361 owner: {json_owner}, \
1362 recipient: {json_recipient}, \
1363 amount: \"{amount}\") \
1364 }}"
1365 );
1366 let data = self.query_node(query).await?;
1367 serde_json::from_value(data["transfer"].clone())
1368 .context("missing transfer field in response")
1369 }
1370
1371 pub async fn balance(&self, account: &Account) -> Result<Amount> {
1372 let chain = account.chain_id;
1373 let owner = account.owner;
1374 if matches!(owner, AccountOwner::CHAIN) {
1375 let query = format!(
1376 "query {{ chain(chainId:\"{chain}\") {{
1377 executionState {{ system {{ balance }} }}
1378 }} }}"
1379 );
1380 let response = self.query_node(query).await?;
1381 let balance = &response["chain"]["executionState"]["system"]["balance"]
1382 .as_str()
1383 .unwrap();
1384 return Ok(Amount::from_str(balance)?);
1385 }
1386 let query = format!(
1387 "query {{ chain(chainId:\"{chain}\") {{
1388 executionState {{ system {{ balances {{
1389 entry(key:\"{owner}\") {{ value }}
1390 }} }} }}
1391 }} }}"
1392 );
1393 let response = self.query_node(query).await?;
1394 let balances = &response["chain"]["executionState"]["system"]["balances"];
1395 let balance = balances["entry"]["value"].as_str();
1396 match balance {
1397 None => Ok(Amount::ZERO),
1398 Some(amount) => Ok(Amount::from_str(amount)?),
1399 }
1400 }
1401
1402 pub fn make_application<A: ContractAbi>(
1403 &self,
1404 chain_id: &ChainId,
1405 application_id: &ApplicationId<A>,
1406 ) -> Result<ApplicationWrapper<A>> {
1407 let application_id = application_id.forget_abi().to_string();
1408 let link = format!(
1409 "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1410 self.port
1411 );
1412 Ok(ApplicationWrapper::from(link))
1413 }
1414
1415 pub async fn publish_data_blob(
1416 &self,
1417 chain_id: &ChainId,
1418 bytes: Vec<u8>,
1419 ) -> Result<CryptoHash> {
1420 let query = format!(
1421 "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1422 chain_id.to_value(),
1423 bytes.to_value(),
1424 );
1425 let data = self.query_node(query).await?;
1426 serde_json::from_value(data["publishDataBlob"].clone())
1427 .context("missing publishDataBlob field in response")
1428 }
1429
1430 pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1431 &self,
1432 chain_id: &ChainId,
1433 contract: PathBuf,
1434 service: PathBuf,
1435 vm_runtime: VmRuntime,
1436 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1437 let contract_code = Bytecode::load_from_file(&contract)?;
1438 let service_code = Bytecode::load_from_file(&service)?;
1439 let query = format!(
1440 "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1441 chain_id.to_value(),
1442 contract_code.to_value(),
1443 service_code.to_value(),
1444 vm_runtime.to_value(),
1445 );
1446 let data = self.query_node(query).await?;
1447 let module_str = data["publishModule"]
1448 .as_str()
1449 .context("module ID not found")?;
1450 let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1451 Ok(module_id.with_abi())
1452 }
1453
1454 pub async fn query_committees(&self, chain_id: &ChainId) -> Result<BTreeMap<Epoch, Committee>> {
1455 let query = format!(
1456 "query {{ chain(chainId:\"{chain_id}\") {{
1457 executionState {{ system {{ committees }} }}
1458 }} }}"
1459 );
1460 let mut response = self.query_node(query).await?;
1461 let committees = response["chain"]["executionState"]["system"]["committees"].take();
1462 Ok(serde_json::from_value(committees)?)
1463 }
1464
1465 pub async fn events_from_index(
1466 &self,
1467 chain_id: &ChainId,
1468 stream_id: &StreamId,
1469 start_index: u32,
1470 ) -> Result<Vec<IndexAndEvent>> {
1471 let query = format!(
1472 "query {{
1473 eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1474 {{ index event }}
1475 }}",
1476 stream_id.to_value()
1477 );
1478 let mut response = self.query_node(query).await?;
1479 let response = response["eventsFromIndex"].take();
1480 Ok(serde_json::from_value(response)?)
1481 }
1482
1483 pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1484 let n_try = 5;
1485 let query = query.as_ref();
1486 for i in 0..n_try {
1487 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1488 let url = format!("http://localhost:{}/", self.port);
1489 let client = reqwest_client();
1490 let result = client
1491 .post(url)
1492 .json(&json!({ "query": query }))
1493 .send()
1494 .await;
1495 if matches!(result, Err(ref error) if error.is_timeout()) {
1496 tracing::warn!(
1497 "Timeout when sending query {} to the node service",
1498 truncate_query_output(query)
1499 );
1500 continue;
1501 }
1502 let response = result.with_context(|| {
1503 format!(
1504 "query_node: failed to post query={}",
1505 truncate_query_output(query)
1506 )
1507 })?;
1508 ensure!(
1509 response.status().is_success(),
1510 "Query \"{}\" failed: {}",
1511 truncate_query_output(query),
1512 response
1513 .text()
1514 .await
1515 .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1516 );
1517 let value: Value = response.json().await.context("invalid JSON")?;
1518 if let Some(errors) = value.get("errors") {
1519 tracing::warn!(
1520 "Query \"{}\" failed: {}",
1521 truncate_query_output(query),
1522 errors
1523 );
1524 } else {
1525 return Ok(value["data"].clone());
1526 }
1527 }
1528 bail!(
1529 "Query \"{}\" failed after {} retries.",
1530 truncate_query_output(query),
1531 n_try
1532 );
1533 }
1534
1535 pub async fn create_application<
1536 Abi: ContractAbi,
1537 Parameters: Serialize,
1538 InstantiationArgument: Serialize,
1539 >(
1540 &self,
1541 chain_id: &ChainId,
1542 module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1543 parameters: &Parameters,
1544 argument: &InstantiationArgument,
1545 required_application_ids: &[ApplicationId],
1546 ) -> Result<ApplicationId<Abi>> {
1547 let module_id = module_id.forget_abi();
1548 let json_required_applications_ids = required_application_ids
1549 .iter()
1550 .map(ApplicationId::to_string)
1551 .collect::<Vec<_>>()
1552 .to_value();
1553 let new_parameters = serde_json::to_value(parameters)
1555 .context("could not create parameters JSON")?
1556 .to_value();
1557 let new_argument = serde_json::to_value(argument)
1558 .context("could not create argument JSON")?
1559 .to_value();
1560 let query = format!(
1561 "mutation {{ createApplication(\
1562 chainId: \"{chain_id}\",
1563 moduleId: \"{module_id}\", \
1564 parameters: {new_parameters}, \
1565 instantiationArgument: {new_argument}, \
1566 requiredApplicationIds: {json_required_applications_ids}) \
1567 }}"
1568 );
1569 let data = self.query_node(query).await?;
1570 let app_id_str = data["createApplication"]
1571 .as_str()
1572 .context("missing createApplication string in response")?
1573 .trim();
1574 Ok(app_id_str
1575 .parse::<ApplicationId>()
1576 .context("invalid application ID")?
1577 .with_abi())
1578 }
1579
1580 pub async fn chain_tip(&self, chain: ChainId) -> Result<Option<(CryptoHash, BlockHeight)>> {
1582 let query = format!(
1583 r#"query {{ block(chainId: "{chain}") {{
1584 hash
1585 block {{ header {{ height }} }}
1586 }} }}"#
1587 );
1588
1589 let mut response = self.query_node(&query).await?;
1590
1591 match (
1592 mem::take(&mut response["block"]["hash"]),
1593 mem::take(&mut response["block"]["block"]["header"]["height"]),
1594 ) {
1595 (Value::Null, Value::Null) => Ok(None),
1596 (Value::String(hash), Value::Number(height)) => Ok(Some((
1597 hash.parse()
1598 .context("Received an invalid hash {hash:?} for chain tip")?,
1599 BlockHeight(height.as_u64().unwrap()),
1600 ))),
1601 invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
1602 }
1603 }
1604
1605 pub async fn notifications(
1607 &self,
1608 chain_id: ChainId,
1609 ) -> Result<Pin<Box<impl Stream<Item = Result<Notification>>>>> {
1610 let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
1611 let url = format!("ws://localhost:{}/ws", self.port);
1612 let mut request = url.into_client_request()?;
1613 request.headers_mut().insert(
1614 "Sec-WebSocket-Protocol",
1615 HeaderValue::from_str("graphql-transport-ws")?,
1616 );
1617 let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1618 let init_json = json!({
1619 "type": "connection_init",
1620 "payload": {}
1621 });
1622 websocket.send(init_json.to_string().into()).await?;
1623 let text = websocket
1624 .next()
1625 .await
1626 .context("Failed to establish connection")??
1627 .into_text()?;
1628 ensure!(
1629 text == "{\"type\":\"connection_ack\"}",
1630 "Unexpected response: {text}"
1631 );
1632 let query_json = json!({
1633 "id": "1",
1634 "type": "start",
1635 "payload": {
1636 "query": query,
1637 "variables": {},
1638 "operationName": null
1639 }
1640 });
1641 websocket.send(query_json.to_string().into()).await?;
1642 Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
1643 |message| async {
1644 let text = message.into_text()?;
1645 let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1646 if let Some(errors) = value["payload"].get("errors") {
1647 bail!("Notification subscription failed: {errors:?}");
1648 }
1649 serde_json::from_value(value["payload"]["data"]["notifications"].clone())
1650 .context("Failed to deserialize notification")
1651 },
1652 )))
1653 }
1654}
1655
1656pub struct FaucetService {
1658 port: u16,
1659 child: Child,
1660 _temp_dir: tempfile::TempDir,
1661}
1662
1663impl FaucetService {
1664 fn new(port: u16, child: Child, temp_dir: tempfile::TempDir) -> Self {
1665 Self {
1666 port,
1667 child,
1668 _temp_dir: temp_dir,
1669 }
1670 }
1671
1672 pub async fn terminate(mut self) -> Result<()> {
1673 self.child
1674 .kill()
1675 .await
1676 .context("terminating faucet service")
1677 }
1678
1679 pub fn ensure_is_running(&mut self) -> Result<()> {
1680 self.child.ensure_is_running()
1681 }
1682
1683 pub fn instance(&self) -> Faucet {
1684 Faucet::new(format!("http://localhost:{}/", self.port))
1685 }
1686}
1687
1688pub struct ApplicationWrapper<A> {
1690 uri: String,
1691 _phantom: PhantomData<A>,
1692}
1693
1694impl<A> ApplicationWrapper<A> {
1695 pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
1696 let query = query.as_ref();
1697 let value = self.run_json_query(json!({ "query": query })).await?;
1698 Ok(value["data"].clone())
1699 }
1700
1701 pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
1702 const MAX_RETRIES: usize = 5;
1703
1704 for i in 0.. {
1705 let client = reqwest_client();
1706 let result = client.post(&self.uri).json(&query).send().await;
1707 let response = match result {
1708 Ok(response) => response,
1709 Err(error) if i < MAX_RETRIES => {
1710 tracing::warn!(
1711 "Failed to post query \"{}\": {error}; retrying",
1712 truncate_query_output_serialize(&query),
1713 );
1714 continue;
1715 }
1716 Err(error) => {
1717 let query = truncate_query_output_serialize(&query);
1718 return Err(error)
1719 .with_context(|| format!("run_json_query: failed to post query={query}"));
1720 }
1721 };
1722 ensure!(
1723 response.status().is_success(),
1724 "Query \"{}\" failed: {}",
1725 truncate_query_output_serialize(&query),
1726 response
1727 .text()
1728 .await
1729 .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1730 );
1731 let value: Value = response.json().await.context("invalid JSON")?;
1732 if let Some(errors) = value.get("errors") {
1733 bail!(
1734 "Query \"{}\" failed: {}",
1735 truncate_query_output_serialize(&query),
1736 errors
1737 );
1738 }
1739 return Ok(value);
1740 }
1741 unreachable!()
1742 }
1743
1744 pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
1745 let query = query.as_ref();
1746 self.run_graphql_query(&format!("query {{ {query} }}"))
1747 .await
1748 }
1749
1750 pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
1751 let query = query.as_ref().trim();
1752 let name = query
1753 .split_once(|ch: char| !ch.is_alphanumeric())
1754 .map_or(query, |(name, _)| name);
1755 let data = self.query(query).await?;
1756 serde_json::from_value(data[name].clone())
1757 .with_context(|| format!("{name} field missing in response"))
1758 }
1759
1760 pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
1761 let mutation = mutation.as_ref();
1762 self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
1763 .await
1764 }
1765
1766 pub async fn multiple_mutate(&self, mutations: &[String]) -> Result<Value> {
1767 let mut out = String::from("mutation {\n");
1768 for (index, mutation) in mutations.iter().enumerate() {
1769 out = format!("{} u{}: {}\n", out, index, mutation);
1770 }
1771 out.push_str("}\n");
1772 self.run_graphql_query(&out).await
1773 }
1774}
1775
1776impl<A> From<String> for ApplicationWrapper<A> {
1777 fn from(uri: String) -> ApplicationWrapper<A> {
1778 ApplicationWrapper {
1779 uri,
1780 _phantom: PhantomData,
1781 }
1782 }
1783}
1784
1785#[cfg(with_testing)]
1788fn notification_timeout() -> Duration {
1789 const NOTIFICATION_TIMEOUT_MS_ENV: &str = "LINERA_TEST_NOTIFICATION_TIMEOUT_MS";
1790 const NOTIFICATION_TIMEOUT_MS_DEFAULT: u64 = 10_000;
1791
1792 match env::var(NOTIFICATION_TIMEOUT_MS_ENV) {
1793 Ok(var) => Duration::from_millis(var.parse().unwrap_or_else(|error| {
1794 panic!("{NOTIFICATION_TIMEOUT_MS_ENV} is not a valid number: {error}")
1795 })),
1796 Err(env::VarError::NotPresent) => Duration::from_millis(NOTIFICATION_TIMEOUT_MS_DEFAULT),
1797 Err(env::VarError::NotUnicode(_)) => {
1798 panic!("{NOTIFICATION_TIMEOUT_MS_ENV} must be valid Unicode")
1799 }
1800 }
1801}
1802
1803#[cfg(with_testing)]
1804pub trait NotificationsExt {
1805 fn wait_for<T>(
1807 &mut self,
1808 f: impl FnMut(Notification) -> Option<T>,
1809 ) -> impl Future<Output = Result<T>>;
1810
1811 fn wait_for_events(
1814 &mut self,
1815 expected_height: impl Into<Option<BlockHeight>>,
1816 ) -> impl Future<Output = Result<BTreeSet<StreamId>>> {
1817 let expected_height = expected_height.into();
1818 self.wait_for(move |notification| {
1819 if let Reason::NewEvents {
1820 height,
1821 event_streams,
1822 ..
1823 } = notification.reason
1824 {
1825 if expected_height.is_none_or(|h| h == height) {
1826 return Some(event_streams);
1827 }
1828 }
1829 None
1830 })
1831 }
1832
1833 fn wait_for_block(
1836 &mut self,
1837 expected_height: impl Into<Option<BlockHeight>>,
1838 ) -> impl Future<Output = Result<CryptoHash>> {
1839 let expected_height = expected_height.into();
1840 self.wait_for(move |notification| {
1841 if let Reason::NewBlock { height, hash, .. } = notification.reason {
1842 if expected_height.is_none_or(|h| h == height) {
1843 return Some(hash);
1844 }
1845 }
1846 None
1847 })
1848 }
1849
1850 fn wait_for_bundle(
1853 &mut self,
1854 expected_origin: ChainId,
1855 expected_height: impl Into<Option<BlockHeight>>,
1856 ) -> impl Future<Output = Result<()>> {
1857 let expected_height = expected_height.into();
1858 self.wait_for(move |notification| {
1859 if let Reason::NewIncomingBundle { height, origin } = notification.reason {
1860 if expected_height.is_none_or(|h| h == height) && origin == expected_origin {
1861 return Some(());
1862 }
1863 }
1864 None
1865 })
1866 }
1867}
1868
1869#[cfg(with_testing)]
1870impl<S: Stream<Item = Result<Notification>>> NotificationsExt for Pin<Box<S>> {
1871 async fn wait_for<T>(&mut self, mut f: impl FnMut(Notification) -> Option<T>) -> Result<T> {
1872 let mut timeout = Box::pin(linera_base::time::timer::sleep(notification_timeout())).fuse();
1873 loop {
1874 let notification = futures::select! {
1875 () = timeout => bail!("Timeout waiting for notification"),
1876 notification = self.next().fuse() => notification.context("Stream closed")??,
1877 };
1878 if let Some(t) = f(notification) {
1879 return Ok(t);
1880 }
1881 }
1882 }
1883}