1use std::{
5 borrow::Cow,
6 collections::BTreeMap,
7 env,
8 marker::PhantomData,
9 mem,
10 path::{Path, PathBuf},
11 pin::Pin,
12 process::Stdio,
13 str::FromStr,
14 sync,
15 time::Duration,
16};
17
18use anyhow::{bail, ensure, Context, Result};
19use async_graphql::InputType;
20use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
21use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
22use heck::ToKebabCase;
23use linera_base::{
24 abi::ContractAbi,
25 command::{resolve_binary, CommandExt},
26 crypto::{CryptoHash, InMemorySigner},
27 data_types::{Amount, BlockHeight, Bytecode, Epoch},
28 identifiers::{
29 Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
30 },
31 vm::VmRuntime,
32};
33use linera_client::client_options::ResourceControlPolicyConfig;
34use linera_core::worker::Notification;
35use linera_execution::committee::Committee;
36use linera_faucet_client::Faucet;
37use serde::{de::DeserializeOwned, ser::Serialize};
38use serde_command_opts::to_args;
39use serde_json::{json, Value};
40use tempfile::TempDir;
41use tokio::{
42 io::{AsyncBufReadExt, BufReader},
43 process::{Child, Command},
44 sync::oneshot,
45 task::JoinHandle,
46};
47#[cfg(with_testing)]
48use {
49 futures::FutureExt as _,
50 linera_core::worker::Reason,
51 std::{collections::BTreeSet, future::Future},
52};
53
54use crate::{
55 cli::command::BenchmarkCommand,
56 cli_wrappers::{
57 local_net::{PathProvider, ProcessInbox},
58 Network,
59 },
60 util::{self, ChildExt},
61 wallet::Wallet,
62};
63
64const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
67
68fn reqwest_client() -> reqwest::Client {
69 reqwest::ClientBuilder::new()
70 .timeout(Duration::from_secs(30))
71 .build()
72 .unwrap()
73}
74
75pub struct ClientWrapper {
77 binary_path: sync::Mutex<Option<PathBuf>>,
78 testing_prng_seed: Option<u64>,
79 storage: String,
80 wallet: String,
81 keystore: String,
82 max_pending_message_bundles: usize,
83 network: Network,
84 pub path_provider: PathProvider,
85 on_drop: OnClientDrop,
86 extra_args: Vec<String>,
87}
88
89#[derive(Clone, Copy, Debug, Eq, PartialEq)]
91pub enum OnClientDrop {
92 CloseChains,
94 LeakChains,
96}
97
98impl ClientWrapper {
99 pub fn new(
100 path_provider: PathProvider,
101 network: Network,
102 testing_prng_seed: Option<u64>,
103 id: usize,
104 on_drop: OnClientDrop,
105 ) -> Self {
106 Self::new_with_extra_args(
107 path_provider,
108 network,
109 testing_prng_seed,
110 id,
111 on_drop,
112 vec!["--wait-for-outgoing-messages".to_string()],
113 )
114 }
115
116 pub fn new_with_extra_args(
117 path_provider: PathProvider,
118 network: Network,
119 testing_prng_seed: Option<u64>,
120 id: usize,
121 on_drop: OnClientDrop,
122 extra_args: Vec<String>,
123 ) -> Self {
124 let storage = format!(
125 "rocksdb:{}/client_{}.db",
126 path_provider.path().display(),
127 id
128 );
129 let wallet = format!("wallet_{}.json", id);
130 let keystore = format!("keystore_{}.json", id);
131 Self {
132 binary_path: sync::Mutex::new(None),
133 testing_prng_seed,
134 storage,
135 wallet,
136 keystore,
137 max_pending_message_bundles: 10_000,
138 network,
139 path_provider,
140 on_drop,
141 extra_args,
142 }
143 }
144
145 pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
147 let tmp = TempDir::new()?;
148 let mut command = self.command().await?;
149 command
150 .current_dir(tmp.path())
151 .arg("project")
152 .arg("new")
153 .arg(project_name)
154 .arg("--linera-root")
155 .arg(linera_root)
156 .spawn_and_wait_for_stdout()
157 .await?;
158 Ok(tmp)
159 }
160
161 pub async fn project_publish<T: Serialize>(
163 &self,
164 path: PathBuf,
165 required_application_ids: Vec<String>,
166 publisher: impl Into<Option<ChainId>>,
167 argument: &T,
168 ) -> Result<String> {
169 let json_parameters = serde_json::to_string(&())?;
170 let json_argument = serde_json::to_string(argument)?;
171 let mut command = self.command().await?;
172 command
173 .arg("project")
174 .arg("publish-and-create")
175 .arg(path)
176 .args(publisher.into().iter().map(ChainId::to_string))
177 .args(["--json-parameters", &json_parameters])
178 .args(["--json-argument", &json_argument]);
179 if !required_application_ids.is_empty() {
180 command.arg("--required-application-ids");
181 command.args(required_application_ids);
182 }
183 let stdout = command.spawn_and_wait_for_stdout().await?;
184 Ok(stdout.trim().to_string())
185 }
186
187 pub async fn project_test(&self, path: &Path) -> Result<()> {
189 self.command()
190 .await
191 .context("failed to create project test command")?
192 .current_dir(path)
193 .arg("project")
194 .arg("test")
195 .spawn_and_wait_for_stdout()
196 .await?;
197 Ok(())
198 }
199
200 async fn command_with_envs_and_arguments(
201 &self,
202 envs: &[(&str, &str)],
203 arguments: impl IntoIterator<Item = Cow<'_, str>>,
204 ) -> Result<Command> {
205 let mut command = self.command_binary().await?;
206 command.current_dir(self.path_provider.path());
207 for (key, value) in envs {
208 command.env(key, value);
209 }
210 for argument in arguments {
211 command.arg(&*argument);
212 }
213 Ok(command)
214 }
215
216 async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
217 self.command_with_envs_and_arguments(envs, self.command_arguments())
218 .await
219 }
220
221 async fn command_with_arguments(
222 &self,
223 arguments: impl IntoIterator<Item = Cow<'_, str>>,
224 ) -> Result<Command> {
225 self.command_with_envs_and_arguments(
226 &[(
227 "RUST_LOG",
228 &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
229 )],
230 arguments,
231 )
232 .await
233 }
234
235 async fn command(&self) -> Result<Command> {
236 self.command_with_envs(&[(
237 "RUST_LOG",
238 &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
239 )])
240 .await
241 }
242
243 fn required_command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
244 [
245 "--wallet".into(),
246 self.wallet.as_str().into(),
247 "--keystore".into(),
248 self.keystore.as_str().into(),
249 "--storage".into(),
250 self.storage.as_str().into(),
251 "--send-timeout-ms".into(),
252 "500000".into(),
253 "--recv-timeout-ms".into(),
254 "500000".into(),
255 ]
256 .into_iter()
257 .chain(self.extra_args.iter().map(|s| s.as_str().into()))
258 }
259
260 fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
262 self.required_command_arguments().chain([
263 "--max-pending-message-bundles".into(),
264 self.max_pending_message_bundles.to_string().into(),
265 ])
266 }
267
268 async fn command_binary(&self) -> Result<Command> {
272 match self.command_with_cached_binary_path() {
273 Some(command) => Ok(command),
274 None => {
275 let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
276 let command = Command::new(&resolved_path);
277
278 self.set_cached_binary_path(resolved_path);
279
280 Ok(command)
281 }
282 }
283 }
284
285 fn command_with_cached_binary_path(&self) -> Option<Command> {
287 let binary_path = self.binary_path.lock().unwrap();
288
289 binary_path.as_ref().map(Command::new)
290 }
291
292 fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
300 let mut binary_path = self.binary_path.lock().unwrap();
301
302 if binary_path.is_none() {
303 *binary_path = Some(new_binary_path);
304 } else {
305 assert_eq!(*binary_path, Some(new_binary_path));
306 }
307 }
308
309 pub async fn create_genesis_config(
311 &self,
312 num_other_initial_chains: u32,
313 initial_funding: Amount,
314 policy_config: ResourceControlPolicyConfig,
315 http_allow_list: Option<Vec<String>>,
316 ) -> Result<()> {
317 let mut command = self.command().await?;
318 command
319 .args([
320 "create-genesis-config",
321 &num_other_initial_chains.to_string(),
322 ])
323 .args(["--initial-funding", &initial_funding.to_string()])
324 .args(["--committee", "committee.json"])
325 .args(["--genesis", "genesis.json"])
326 .args([
327 "--policy-config",
328 &policy_config.to_string().to_kebab_case(),
329 ]);
330 if let Some(allow_list) = http_allow_list {
331 command
332 .arg("--http-request-allow-list")
333 .arg(allow_list.join(","));
334 }
335 if let Some(seed) = self.testing_prng_seed {
336 command.arg("--testing-prng-seed").arg(seed.to_string());
337 }
338 command.spawn_and_wait_for_stdout().await?;
339 Ok(())
340 }
341
342 pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
345 let mut command = self.command().await?;
346 command.args(["wallet", "init"]);
347 match faucet {
348 None => command.args(["--genesis", "genesis.json"]),
349 Some(faucet) => command.args(["--faucet", faucet.url()]),
350 };
351 if let Some(seed) = self.testing_prng_seed {
352 command.arg("--testing-prng-seed").arg(seed.to_string());
353 }
354 command.spawn_and_wait_for_stdout().await?;
355 Ok(())
356 }
357
358 pub async fn request_chain(
360 &self,
361 faucet: &Faucet,
362 set_default: bool,
363 ) -> Result<(ChainId, AccountOwner)> {
364 let mut command = self.command().await?;
365 command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
366 if set_default {
367 command.arg("--set-default");
368 }
369 let stdout = command.spawn_and_wait_for_stdout().await?;
370 let mut lines = stdout.split_whitespace();
371 let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
372 let owner = lines.next().context("missing chain owner")?.parse()?;
373 Ok((chain_id, owner))
374 }
375
376 #[expect(clippy::too_many_arguments)]
378 pub async fn publish_and_create<
379 A: ContractAbi,
380 Parameters: Serialize,
381 InstantiationArgument: Serialize,
382 >(
383 &self,
384 contract: PathBuf,
385 service: PathBuf,
386 vm_runtime: VmRuntime,
387 parameters: &Parameters,
388 argument: &InstantiationArgument,
389 required_application_ids: &[ApplicationId],
390 publisher: impl Into<Option<ChainId>>,
391 ) -> Result<ApplicationId<A>> {
392 let json_parameters = serde_json::to_string(parameters)?;
393 let json_argument = serde_json::to_string(argument)?;
394 let mut command = self.command().await?;
395 let vm_runtime = format!("{}", vm_runtime);
396 command
397 .arg("publish-and-create")
398 .args([contract, service])
399 .args(["--vm-runtime", &vm_runtime.to_lowercase()])
400 .args(publisher.into().iter().map(ChainId::to_string))
401 .args(["--json-parameters", &json_parameters])
402 .args(["--json-argument", &json_argument]);
403 if !required_application_ids.is_empty() {
404 command.arg("--required-application-ids");
405 command.args(
406 required_application_ids
407 .iter()
408 .map(ApplicationId::to_string),
409 );
410 }
411 let stdout = command.spawn_and_wait_for_stdout().await?;
412 Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
413 }
414
415 pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
417 &self,
418 contract: PathBuf,
419 service: PathBuf,
420 vm_runtime: VmRuntime,
421 publisher: impl Into<Option<ChainId>>,
422 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
423 let stdout = self
424 .command()
425 .await?
426 .arg("publish-module")
427 .args([contract, service])
428 .args(["--vm-runtime", &format!("{}", vm_runtime).to_lowercase()])
429 .args(publisher.into().iter().map(ChainId::to_string))
430 .spawn_and_wait_for_stdout()
431 .await?;
432 let module_id: ModuleId = stdout.trim().parse()?;
433 Ok(module_id.with_abi())
434 }
435
436 pub async fn create_application<
438 Abi: ContractAbi,
439 Parameters: Serialize,
440 InstantiationArgument: Serialize,
441 >(
442 &self,
443 module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
444 parameters: &Parameters,
445 argument: &InstantiationArgument,
446 required_application_ids: &[ApplicationId],
447 creator: impl Into<Option<ChainId>>,
448 ) -> Result<ApplicationId<Abi>> {
449 let json_parameters = serde_json::to_string(parameters)?;
450 let json_argument = serde_json::to_string(argument)?;
451 let mut command = self.command().await?;
452 command
453 .arg("create-application")
454 .arg(module_id.forget_abi().to_string())
455 .args(["--json-parameters", &json_parameters])
456 .args(["--json-argument", &json_argument])
457 .args(creator.into().iter().map(ChainId::to_string));
458 if !required_application_ids.is_empty() {
459 command.arg("--required-application-ids");
460 command.args(
461 required_application_ids
462 .iter()
463 .map(ApplicationId::to_string),
464 );
465 }
466 let stdout = command.spawn_and_wait_for_stdout().await?;
467 Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
468 }
469
470 pub async fn run_node_service(
472 &self,
473 port: impl Into<Option<u16>>,
474 process_inbox: ProcessInbox,
475 ) -> Result<NodeService> {
476 self.run_node_service_with_options(port, process_inbox, &[], &[], false)
477 .await
478 }
479
480 pub async fn run_node_service_with_options(
482 &self,
483 port: impl Into<Option<u16>>,
484 process_inbox: ProcessInbox,
485 operator_application_ids: &[ApplicationId],
486 operators: &[(String, PathBuf)],
487 read_only: bool,
488 ) -> Result<NodeService> {
489 let port = port.into().unwrap_or(8080);
490 let mut command = self.command().await?;
491 command.arg("service");
492 if let ProcessInbox::Skip = process_inbox {
493 command.arg("--listener-skip-process-inbox");
494 }
495 if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
496 command.args(var.split_whitespace());
497 }
498 for app_id in operator_application_ids {
499 command.args(["--operator-application-ids", &app_id.to_string()]);
500 }
501 for (name, path) in operators {
502 command.args(["--operators", &format!("{}={}", name, path.display())]);
503 }
504 if read_only {
505 command.arg("--read-only");
506 }
507 let child = command
508 .args(["--port".to_string(), port.to_string()])
509 .spawn_into()?;
510 let client = reqwest_client();
511 for i in 0..10 {
512 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
513 let request = client
514 .get(format!("http://localhost:{}/", port))
515 .send()
516 .await;
517 if request.is_ok() {
518 tracing::info!("Node service has started");
519 return Ok(NodeService::new(port, child));
520 } else {
521 tracing::warn!("Waiting for node service to start");
522 }
523 }
524 bail!("Failed to start node service");
525 }
526
527 pub async fn run_node_service_with_controller(
529 &self,
530 port: impl Into<Option<u16>>,
531 process_inbox: ProcessInbox,
532 controller_id: &ApplicationId,
533 operators: &[(String, PathBuf)],
534 ) -> Result<NodeService> {
535 let port = port.into().unwrap_or(8080);
536 let mut command = self.command().await?;
537 command.arg("service");
538 if let ProcessInbox::Skip = process_inbox {
539 command.arg("--listener-skip-process-inbox");
540 }
541 if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
542 command.args(var.split_whitespace());
543 }
544 command.args(["--controller-id", &controller_id.to_string()]);
545 for (name, path) in operators {
546 command.args(["--operators", &format!("{}={}", name, path.display())]);
547 }
548 let child = command
549 .args(["--port".to_string(), port.to_string()])
550 .spawn_into()?;
551 let client = reqwest_client();
552 for i in 0..10 {
553 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
554 let request = client
555 .get(format!("http://localhost:{}/", port))
556 .send()
557 .await;
558 if request.is_ok() {
559 tracing::info!("Node service has started");
560 return Ok(NodeService::new(port, child));
561 } else {
562 tracing::warn!("Waiting for node service to start");
563 }
564 }
565 bail!("Failed to start node service");
566 }
567
568 pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
570 let mut command = self.command().await?;
571 command.arg("validator").arg("query").arg(address);
572 let stdout = command.spawn_and_wait_for_stdout().await?;
573
574 let hash = stdout
577 .lines()
578 .find_map(|line| {
579 line.strip_prefix("Genesis config hash: ")
580 .and_then(|hash_str| hash_str.trim().parse().ok())
581 })
582 .context("error while parsing the result of `linera validator query`")?;
583 Ok(hash)
584 }
585
586 pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
588 let mut command = self.command().await?;
589 command.arg("validator").arg("list");
590 if let Some(chain_id) = chain_id {
591 command.args(["--chain-id", &chain_id.to_string()]);
592 }
593 command.spawn_and_wait_for_stdout().await?;
594 Ok(())
595 }
596
597 pub async fn sync_validator(
599 &self,
600 chain_ids: impl IntoIterator<Item = &ChainId>,
601 validator_address: impl Into<String>,
602 ) -> Result<()> {
603 let mut command = self.command().await?;
604 command
605 .arg("validator")
606 .arg("sync")
607 .arg(validator_address.into());
608 let mut chain_ids = chain_ids.into_iter().peekable();
609 if chain_ids.peek().is_some() {
610 command
611 .arg("--chains")
612 .args(chain_ids.map(ChainId::to_string));
613 }
614 command.spawn_and_wait_for_stdout().await?;
615 Ok(())
616 }
617
618 pub async fn run_faucet(
620 &self,
621 port: impl Into<Option<u16>>,
622 chain_id: Option<ChainId>,
623 amount: Amount,
624 ) -> Result<FaucetService> {
625 let port = port.into().unwrap_or(8080);
626 let temp_dir = tempfile::tempdir()
627 .context("Failed to create temporary directory for faucet storage")?;
628 let storage_path = temp_dir.path().join("faucet_storage.sqlite");
629 let mut command = self.command().await?;
630 let command = command
631 .arg("faucet")
632 .args(["--port".to_string(), port.to_string()])
633 .args(["--amount".to_string(), amount.to_string()])
634 .args([
635 "--storage-path".to_string(),
636 storage_path.to_string_lossy().to_string(),
637 ]);
638 if let Some(chain_id) = chain_id {
639 command.arg(chain_id.to_string());
640 }
641 let child = command.spawn_into()?;
642 let client = reqwest_client();
643 for i in 0..10 {
644 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
645 let request = client
646 .get(format!("http://localhost:{}/", port))
647 .send()
648 .await;
649 if request.is_ok() {
650 tracing::info!("Faucet has started");
651 return Ok(FaucetService::new(port, child, temp_dir));
652 } else {
653 tracing::debug!("Waiting for faucet to start");
654 }
655 }
656 bail!("Failed to start faucet");
657 }
658
659 pub async fn local_balance(&self, account: Account) -> Result<Amount> {
661 let stdout = self
662 .command()
663 .await?
664 .arg("local-balance")
665 .arg(account.to_string())
666 .spawn_and_wait_for_stdout()
667 .await?;
668 let amount = stdout
669 .trim()
670 .parse()
671 .context("error while parsing the result of `linera local-balance`")?;
672 Ok(amount)
673 }
674
675 pub async fn query_balance(&self, account: Account) -> Result<Amount> {
677 let stdout = self
678 .command()
679 .await?
680 .arg("query-balance")
681 .arg(account.to_string())
682 .spawn_and_wait_for_stdout()
683 .await?;
684 let amount = stdout
685 .trim()
686 .parse()
687 .context("error while parsing the result of `linera query-balance`")?;
688 Ok(amount)
689 }
690
691 pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
693 self.command()
694 .await?
695 .arg("sync")
696 .arg(chain_id.to_string())
697 .spawn_and_wait_for_stdout()
698 .await?;
699 Ok(())
700 }
701
702 pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
704 self.command()
705 .await?
706 .arg("process-inbox")
707 .arg(chain_id.to_string())
708 .spawn_and_wait_for_stdout()
709 .await?;
710 Ok(())
711 }
712
713 pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
715 self.command()
716 .await?
717 .arg("transfer")
718 .arg(amount.to_string())
719 .args(["--from", &from.to_string()])
720 .args(["--to", &to.to_string()])
721 .spawn_and_wait_for_stdout()
722 .await?;
723 Ok(())
724 }
725
726 pub async fn transfer_with_silent_logs(
728 &self,
729 amount: Amount,
730 from: ChainId,
731 to: ChainId,
732 ) -> Result<()> {
733 self.command()
734 .await?
735 .env("RUST_LOG", "off")
736 .arg("transfer")
737 .arg(amount.to_string())
738 .args(["--from", &from.to_string()])
739 .args(["--to", &to.to_string()])
740 .spawn_and_wait_for_stdout()
741 .await?;
742 Ok(())
743 }
744
745 pub async fn transfer_with_accounts(
747 &self,
748 amount: Amount,
749 from: Account,
750 to: Account,
751 ) -> Result<()> {
752 self.command()
753 .await?
754 .arg("transfer")
755 .arg(amount.to_string())
756 .args(["--from", &from.to_string()])
757 .args(["--to", &to.to_string()])
758 .spawn_and_wait_for_stdout()
759 .await?;
760 Ok(())
761 }
762
763 fn benchmark_command_internal(command: &mut Command, args: BenchmarkCommand) -> Result<()> {
764 let mut formatted_args = to_args(&args)?;
765 let subcommand = formatted_args.remove(0);
766 formatted_args.remove(0);
769 let options = formatted_args
770 .chunks_exact(2)
771 .flat_map(|pair| {
772 let option = format!("--{}", pair[0]);
773 match pair[1].as_str() {
774 "true" => vec![option],
775 "false" => vec![],
776 _ => vec![option, pair[1].clone()],
777 }
778 })
779 .collect::<Vec<_>>();
780 command
781 .args([
782 "--max-pending-message-bundles",
783 &args.transactions_per_block().to_string(),
784 ])
785 .arg("benchmark")
786 .arg(subcommand)
787 .args(options);
788 Ok(())
789 }
790
791 async fn benchmark_command_with_envs(
792 &self,
793 args: BenchmarkCommand,
794 envs: &[(&str, &str)],
795 ) -> Result<Command> {
796 let mut command = self
797 .command_with_envs_and_arguments(envs, self.required_command_arguments())
798 .await?;
799 Self::benchmark_command_internal(&mut command, args)?;
800 Ok(command)
801 }
802
803 async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
804 let mut command = self
805 .command_with_arguments(self.required_command_arguments())
806 .await?;
807 Self::benchmark_command_internal(&mut command, args)?;
808 Ok(command)
809 }
810
811 pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
813 let mut command = self.benchmark_command(args).await?;
814 command.spawn_and_wait_for_stdout().await?;
815 Ok(())
816 }
817
818 pub async fn benchmark_detached(
821 &self,
822 args: BenchmarkCommand,
823 tx: oneshot::Sender<()>,
824 ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
825 let mut child = self
826 .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
827 .await?
828 .kill_on_drop(true)
829 .stdin(Stdio::piped())
830 .stdout(Stdio::piped())
831 .stderr(Stdio::piped())
832 .spawn()?;
833
834 let pid = child.id().expect("failed to get pid");
835 let stdout = child.stdout.take().expect("stdout not open");
836 let stdout_handle = tokio::spawn(async move {
837 let mut lines = BufReader::new(stdout).lines();
838 while let Ok(Some(line)) = lines.next_line().await {
839 println!("benchmark{{pid={pid}}} {line}");
840 }
841 });
842
843 let stderr = child.stderr.take().expect("stderr not open");
844 let stderr_handle = tokio::spawn(async move {
845 let mut lines = BufReader::new(stderr).lines();
846 let mut tx = Some(tx);
847 while let Ok(Some(line)) = lines.next_line().await {
848 if line.contains("Ready to start benchmark") {
849 tx.take()
850 .expect("Should only send signal once")
851 .send(())
852 .expect("failed to send ready signal to main thread");
853 } else {
854 println!("benchmark{{pid={pid}}} {line}");
855 }
856 }
857 });
858 Ok((child, stdout_handle, stderr_handle))
859 }
860
861 async fn open_chain_internal(
862 &self,
863 from: ChainId,
864 owner: Option<AccountOwner>,
865 initial_balance: Amount,
866 super_owner: bool,
867 ) -> Result<(ChainId, AccountOwner)> {
868 let mut command = self.command().await?;
869 command
870 .arg("open-chain")
871 .args(["--from", &from.to_string()])
872 .args(["--initial-balance", &initial_balance.to_string()]);
873
874 if let Some(owner) = owner {
875 command.args(["--owner", &owner.to_string()]);
876 }
877
878 if super_owner {
879 command.arg("--super-owner");
880 }
881
882 let stdout = command.spawn_and_wait_for_stdout().await?;
883 let mut split = stdout.split('\n');
884 let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
885 let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
886 if let Some(owner) = owner {
887 assert_eq!(owner, new_owner);
888 }
889 Ok((chain_id, new_owner))
890 }
891
892 pub async fn open_chain_super_owner(
894 &self,
895 from: ChainId,
896 owner: Option<AccountOwner>,
897 initial_balance: Amount,
898 ) -> Result<(ChainId, AccountOwner)> {
899 self.open_chain_internal(from, owner, initial_balance, true)
900 .await
901 }
902
903 pub async fn open_chain(
905 &self,
906 from: ChainId,
907 owner: Option<AccountOwner>,
908 initial_balance: Amount,
909 ) -> Result<(ChainId, AccountOwner)> {
910 self.open_chain_internal(from, owner, initial_balance, false)
911 .await
912 }
913
914 pub async fn open_and_assign(
916 &self,
917 client: &ClientWrapper,
918 initial_balance: Amount,
919 ) -> Result<ChainId> {
920 let our_chain = self
921 .load_wallet()?
922 .default_chain()
923 .context("no default chain found")?;
924 let owner = client.keygen().await?;
925 let (new_chain, _) = self
926 .open_chain(our_chain, Some(owner), initial_balance)
927 .await?;
928 client.assign(owner, new_chain).await?;
929 Ok(new_chain)
930 }
931
932 pub async fn open_multi_owner_chain(
933 &self,
934 from: ChainId,
935 owners: BTreeMap<AccountOwner, u64>,
936 multi_leader_rounds: u32,
937 balance: Amount,
938 base_timeout_ms: u64,
939 ) -> Result<ChainId> {
940 let mut command = self.command().await?;
941 command
942 .arg("open-multi-owner-chain")
943 .args(["--from", &from.to_string()])
944 .arg("--owners")
945 .arg(serde_json::to_string(&owners)?)
946 .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
947 command
948 .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
949 .args(["--initial-balance", &balance.to_string()]);
950
951 let stdout = command.spawn_and_wait_for_stdout().await?;
952 let mut split = stdout.split('\n');
953 let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
954
955 Ok(chain_id)
956 }
957
958 pub async fn change_ownership(
959 &self,
960 chain_id: ChainId,
961 super_owners: Vec<AccountOwner>,
962 owners: Vec<AccountOwner>,
963 ) -> Result<()> {
964 let mut command = self.command().await?;
965 command
966 .arg("change-ownership")
967 .args(["--chain-id", &chain_id.to_string()]);
968 command
969 .arg("--super-owners")
970 .arg(serde_json::to_string(&super_owners)?);
971 command.arg("--owners").arg(serde_json::to_string(
972 &owners
973 .into_iter()
974 .zip(std::iter::repeat(100u64))
975 .collect::<BTreeMap<_, _>>(),
976 )?);
977 command.spawn_and_wait_for_stdout().await?;
978 Ok(())
979 }
980
981 pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
983 let mut command = self.command().await?;
984 command
985 .args(["wallet", "follow-chain"])
986 .arg(chain_id.to_string());
987 if sync {
988 command.arg("--sync");
989 }
990 command.spawn_and_wait_for_stdout().await?;
991 Ok(())
992 }
993
994 pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
996 let mut command = self.command().await?;
997 command
998 .args(["wallet", "forget-chain"])
999 .arg(chain_id.to_string());
1000 command.spawn_and_wait_for_stdout().await?;
1001 Ok(())
1002 }
1003
1004 pub async fn set_default_chain(&self, chain_id: ChainId) -> Result<()> {
1006 let mut command = self.command().await?;
1007 command
1008 .args(["wallet", "set-default"])
1009 .arg(chain_id.to_string());
1010 command.spawn_and_wait_for_stdout().await?;
1011 Ok(())
1012 }
1013
1014 pub async fn retry_pending_block(
1015 &self,
1016 chain_id: Option<ChainId>,
1017 ) -> Result<Option<CryptoHash>> {
1018 let mut command = self.command().await?;
1019 command.arg("retry-pending-block");
1020 if let Some(chain_id) = chain_id {
1021 command.arg(chain_id.to_string());
1022 }
1023 let stdout = command.spawn_and_wait_for_stdout().await?;
1024 let stdout = stdout.trim();
1025 if stdout.is_empty() {
1026 Ok(None)
1027 } else {
1028 Ok(Some(CryptoHash::from_str(stdout)?))
1029 }
1030 }
1031
1032 pub async fn publish_data_blob(
1034 &self,
1035 path: &Path,
1036 chain_id: Option<ChainId>,
1037 ) -> Result<CryptoHash> {
1038 let mut command = self.command().await?;
1039 command.arg("publish-data-blob").arg(path);
1040 if let Some(chain_id) = chain_id {
1041 command.arg(chain_id.to_string());
1042 }
1043 let stdout = command.spawn_and_wait_for_stdout().await?;
1044 let stdout = stdout.trim();
1045 Ok(CryptoHash::from_str(stdout)?)
1046 }
1047
1048 pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
1050 let mut command = self.command().await?;
1051 command.arg("read-data-blob").arg(hash.to_string());
1052 if let Some(chain_id) = chain_id {
1053 command.arg(chain_id.to_string());
1054 }
1055 command.spawn_and_wait_for_stdout().await?;
1056 Ok(())
1057 }
1058
1059 pub fn load_wallet(&self) -> Result<Wallet> {
1060 Ok(Wallet::read(&self.wallet_path())?)
1061 }
1062
1063 pub fn load_keystore(&self) -> Result<InMemorySigner> {
1064 util::read_json(self.keystore_path())
1065 }
1066
1067 pub fn wallet_path(&self) -> PathBuf {
1068 self.path_provider.path().join(&self.wallet)
1069 }
1070
1071 pub fn keystore_path(&self) -> PathBuf {
1072 self.path_provider.path().join(&self.keystore)
1073 }
1074
1075 pub fn storage_path(&self) -> &str {
1076 &self.storage
1077 }
1078
1079 pub fn get_owner(&self) -> Option<AccountOwner> {
1080 let wallet = self.load_wallet().ok()?;
1081 wallet
1082 .get(wallet.default_chain()?)
1083 .expect("default chain must be in wallet")
1084 .owner
1085 }
1086
1087 pub fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
1088 self.load_wallet()
1089 .ok()
1090 .is_some_and(|wallet| wallet.get(chain).is_some())
1091 }
1092
1093 pub async fn set_validator(
1094 &self,
1095 validator_key: &(String, String),
1096 port: usize,
1097 votes: usize,
1098 ) -> Result<()> {
1099 let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1100 self.command()
1101 .await?
1102 .arg("validator")
1103 .arg("add")
1104 .args(["--public-key", &validator_key.0])
1105 .args(["--account-key", &validator_key.1])
1106 .args(["--address", &address])
1107 .args(["--votes", &votes.to_string()])
1108 .spawn_and_wait_for_stdout()
1109 .await?;
1110 Ok(())
1111 }
1112
1113 pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
1114 self.command()
1115 .await?
1116 .arg("validator")
1117 .arg("remove")
1118 .args(["--public-key", validator_key])
1119 .spawn_and_wait_for_stdout()
1120 .await?;
1121 Ok(())
1122 }
1123
1124 pub async fn change_validators(
1125 &self,
1126 add_validators: &[(String, String, usize, usize)], modify_validators: &[(String, String, usize, usize)], remove_validators: &[String],
1129 ) -> Result<()> {
1130 use std::str::FromStr;
1131
1132 use linera_base::crypto::{AccountPublicKey, ValidatorPublicKey};
1133
1134 let mut changes = std::collections::HashMap::new();
1137
1138 for (public_key_str, account_key_str, port, votes) in
1140 add_validators.iter().chain(modify_validators.iter())
1141 {
1142 let public_key = ValidatorPublicKey::from_str(public_key_str)
1143 .with_context(|| format!("Invalid validator public key: {}", public_key_str))?;
1144
1145 let account_key = AccountPublicKey::from_str(account_key_str)
1146 .with_context(|| format!("Invalid account public key: {}", account_key_str))?;
1147
1148 let address = format!("{}:127.0.0.1:{}", self.network.short(), port)
1149 .parse()
1150 .unwrap();
1151
1152 let change = crate::cli::validator::Change {
1154 account_key,
1155 address,
1156 votes: crate::cli::validator::Votes(
1157 std::num::NonZero::new(*votes as u64).context("Votes must be non-zero")?,
1158 ),
1159 };
1160
1161 changes.insert(public_key, Some(change));
1162 }
1163
1164 for validator_key_str in remove_validators {
1166 let public_key = ValidatorPublicKey::from_str(validator_key_str)
1167 .with_context(|| format!("Invalid validator public key: {}", validator_key_str))?;
1168 changes.insert(public_key, None);
1169 }
1170
1171 let temp_file = tempfile::NamedTempFile::new()
1173 .context("Failed to create temporary file for validator changes")?;
1174 serde_json::to_writer(&temp_file, &changes)
1175 .context("Failed to write validator changes to file")?;
1176 let temp_path = temp_file.path();
1177
1178 self.command()
1179 .await?
1180 .arg("validator")
1181 .arg("update")
1182 .arg(temp_path)
1183 .arg("--yes") .spawn_and_wait_for_stdout()
1185 .await?;
1186
1187 Ok(())
1188 }
1189
1190 pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
1191 self.command()
1192 .await?
1193 .arg("revoke-epochs")
1194 .arg(epoch.to_string())
1195 .spawn_and_wait_for_stdout()
1196 .await?;
1197 Ok(())
1198 }
1199
1200 pub async fn keygen(&self) -> Result<AccountOwner> {
1202 let stdout = self
1203 .command()
1204 .await?
1205 .arg("keygen")
1206 .spawn_and_wait_for_stdout()
1207 .await?;
1208 AccountOwner::from_str(stdout.as_str().trim())
1209 }
1210
1211 pub fn default_chain(&self) -> Option<ChainId> {
1213 self.load_wallet().ok()?.default_chain()
1214 }
1215
1216 pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
1218 let _stdout = self
1219 .command()
1220 .await?
1221 .arg("assign")
1222 .args(["--owner", &owner.to_string()])
1223 .args(["--chain-id", &chain_id.to_string()])
1224 .spawn_and_wait_for_stdout()
1225 .await?;
1226 Ok(())
1227 }
1228
1229 pub async fn set_preferred_owner(
1231 &self,
1232 chain_id: ChainId,
1233 owner: Option<AccountOwner>,
1234 ) -> Result<()> {
1235 let mut owner_arg = vec!["--owner".to_string()];
1236 if let Some(owner) = owner {
1237 owner_arg.push(owner.to_string());
1238 };
1239 self.command()
1240 .await?
1241 .arg("set-preferred-owner")
1242 .args(["--chain-id", &chain_id.to_string()])
1243 .args(owner_arg)
1244 .spawn_and_wait_for_stdout()
1245 .await?;
1246 Ok(())
1247 }
1248
1249 pub async fn build_application(
1250 &self,
1251 path: &Path,
1252 name: &str,
1253 is_workspace: bool,
1254 ) -> Result<(PathBuf, PathBuf)> {
1255 Command::new("cargo")
1256 .current_dir(self.path_provider.path())
1257 .arg("build")
1258 .arg("--release")
1259 .args(["--target", "wasm32-unknown-unknown"])
1260 .arg("--manifest-path")
1261 .arg(path.join("Cargo.toml"))
1262 .spawn_and_wait_for_stdout()
1263 .await?;
1264
1265 let release_dir = match is_workspace {
1266 true => path.join("../target/wasm32-unknown-unknown/release"),
1267 false => path.join("target/wasm32-unknown-unknown/release"),
1268 };
1269
1270 let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1271 let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1272
1273 let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1274 let service_size = fs_err::tokio::metadata(&service).await?.len();
1275 tracing::info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1276
1277 Ok((contract, service))
1278 }
1279}
1280
1281impl Drop for ClientWrapper {
1282 fn drop(&mut self) {
1283 use std::process::Command as SyncCommand;
1284
1285 if self.on_drop != OnClientDrop::CloseChains {
1286 return;
1287 }
1288
1289 let Ok(binary_path) = self.binary_path.lock() else {
1290 tracing::error!(
1291 "Failed to close chains because a thread panicked with a lock to `binary_path`"
1292 );
1293 return;
1294 };
1295
1296 let Some(binary_path) = binary_path.as_ref() else {
1297 tracing::warn!(
1298 "Assuming no chains need to be closed, because the command binary was never \
1299 resolved and therefore presumably never called"
1300 );
1301 return;
1302 };
1303
1304 let working_directory = self.path_provider.path();
1305 let mut wallet_show_command = SyncCommand::new(binary_path);
1306
1307 for argument in self.command_arguments() {
1308 wallet_show_command.arg(&*argument);
1309 }
1310
1311 let Ok(wallet_show_output) = wallet_show_command
1312 .current_dir(working_directory)
1313 .args(["wallet", "show", "--short", "--owned"])
1314 .output()
1315 else {
1316 tracing::warn!("Failed to execute `wallet show --short` to list chains to close");
1317 return;
1318 };
1319
1320 if !wallet_show_output.status.success() {
1321 tracing::warn!("Failed to list chains in the wallet to close them");
1322 return;
1323 }
1324
1325 let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1326 tracing::warn!(
1327 "Failed to close chains because `linera wallet show --short` \
1328 returned a non-UTF-8 output"
1329 );
1330 return;
1331 };
1332
1333 let chain_ids = chain_list_string
1334 .split('\n')
1335 .map(|line| line.trim())
1336 .filter(|line| !line.is_empty());
1337
1338 for chain_id in chain_ids {
1339 let mut close_chain_command = SyncCommand::new(binary_path);
1340
1341 for argument in self.command_arguments() {
1342 close_chain_command.arg(&*argument);
1343 }
1344
1345 close_chain_command.current_dir(working_directory);
1346
1347 match close_chain_command.args(["close-chain", chain_id]).status() {
1348 Ok(status) if status.success() => (),
1349 Ok(failure) => tracing::warn!("Failed to close chain {chain_id}: {failure}"),
1350 Err(error) => tracing::warn!("Failed to close chain {chain_id}: {error}"),
1351 }
1352 }
1353 }
1354}
1355
1356#[cfg(with_testing)]
1357impl ClientWrapper {
1358 pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1359 self.build_application(Self::example_path(name)?.as_path(), name, true)
1360 .await
1361 }
1362
1363 pub fn example_path(name: &str) -> Result<PathBuf> {
1364 Ok(env::current_dir()?.join("../examples/").join(name))
1365 }
1366}
1367
1368fn truncate_query_output(input: &str) -> String {
1369 let max_len = 1000;
1370 if input.len() < max_len {
1371 input.to_string()
1372 } else {
1373 format!("{} ...", input.get(..max_len).unwrap())
1374 }
1375}
1376
1377fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1378 let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1379 let max_len = 200;
1380 if query.len() < max_len {
1381 query
1382 } else {
1383 format!("{} ...", query.get(..max_len).unwrap())
1384 }
1385}
1386
1387pub struct NodeService {
1389 port: u16,
1390 child: Child,
1391}
1392
1393impl NodeService {
1394 fn new(port: u16, child: Child) -> Self {
1395 Self { port, child }
1396 }
1397
1398 pub async fn terminate(mut self) -> Result<()> {
1399 self.child.kill().await.context("terminating node service")
1400 }
1401
1402 pub fn port(&self) -> u16 {
1403 self.port
1404 }
1405
1406 pub fn ensure_is_running(&mut self) -> Result<()> {
1407 self.child.ensure_is_running()
1408 }
1409
1410 pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1411 let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1412 let mut data = self.query_node(query).await?;
1413 Ok(serde_json::from_value(data["processInbox"].take())?)
1414 }
1415
1416 pub async fn sync(&self, chain_id: &ChainId) -> Result<u64> {
1417 let query = format!("mutation {{ sync(chainId: \"{chain_id}\") }}");
1418 let mut data = self.query_node(query).await?;
1419 Ok(serde_json::from_value(data["sync"].take())?)
1420 }
1421
1422 pub async fn transfer(
1423 &self,
1424 chain_id: ChainId,
1425 owner: AccountOwner,
1426 recipient: Account,
1427 amount: Amount,
1428 ) -> Result<CryptoHash> {
1429 let json_owner = owner.to_value();
1430 let json_recipient = recipient.to_value();
1431 let query = format!(
1432 "mutation {{ transfer(\
1433 chainId: \"{chain_id}\", \
1434 owner: {json_owner}, \
1435 recipient: {json_recipient}, \
1436 amount: \"{amount}\") \
1437 }}"
1438 );
1439 let data = self.query_node(query).await?;
1440 serde_json::from_value(data["transfer"].clone())
1441 .context("missing transfer field in response")
1442 }
1443
1444 pub async fn balance(&self, account: &Account) -> Result<Amount> {
1445 let chain = account.chain_id;
1446 let owner = account.owner;
1447 if matches!(owner, AccountOwner::CHAIN) {
1448 let query = format!(
1449 "query {{ chain(chainId:\"{chain}\") {{
1450 executionState {{ system {{ balance }} }}
1451 }} }}"
1452 );
1453 let response = self.query_node(query).await?;
1454 let balance = &response["chain"]["executionState"]["system"]["balance"]
1455 .as_str()
1456 .unwrap();
1457 return Ok(Amount::from_str(balance)?);
1458 }
1459 let query = format!(
1460 "query {{ chain(chainId:\"{chain}\") {{
1461 executionState {{ system {{ balances {{
1462 entry(key:\"{owner}\") {{ value }}
1463 }} }} }}
1464 }} }}"
1465 );
1466 let response = self.query_node(query).await?;
1467 let balances = &response["chain"]["executionState"]["system"]["balances"];
1468 let balance = balances["entry"]["value"].as_str();
1469 match balance {
1470 None => Ok(Amount::ZERO),
1471 Some(amount) => Ok(Amount::from_str(amount)?),
1472 }
1473 }
1474
1475 pub fn make_application<A: ContractAbi>(
1476 &self,
1477 chain_id: &ChainId,
1478 application_id: &ApplicationId<A>,
1479 ) -> Result<ApplicationWrapper<A>> {
1480 let application_id = application_id.forget_abi().to_string();
1481 let link = format!(
1482 "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1483 self.port
1484 );
1485 Ok(ApplicationWrapper::from(link))
1486 }
1487
1488 pub async fn publish_data_blob(
1489 &self,
1490 chain_id: &ChainId,
1491 bytes: Vec<u8>,
1492 ) -> Result<CryptoHash> {
1493 let query = format!(
1494 "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1495 chain_id.to_value(),
1496 bytes.to_value(),
1497 );
1498 let data = self.query_node(query).await?;
1499 serde_json::from_value(data["publishDataBlob"].clone())
1500 .context("missing publishDataBlob field in response")
1501 }
1502
1503 pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1504 &self,
1505 chain_id: &ChainId,
1506 contract: PathBuf,
1507 service: PathBuf,
1508 vm_runtime: VmRuntime,
1509 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1510 let contract_code = Bytecode::load_from_file(&contract).await?;
1511 let service_code = Bytecode::load_from_file(&service).await?;
1512 let query = format!(
1513 "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1514 chain_id.to_value(),
1515 contract_code.to_value(),
1516 service_code.to_value(),
1517 vm_runtime.to_value(),
1518 );
1519 let data = self.query_node(query).await?;
1520 let module_str = data["publishModule"]
1521 .as_str()
1522 .context("module ID not found")?;
1523 let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1524 Ok(module_id.with_abi())
1525 }
1526
1527 pub async fn query_committees(&self, chain_id: &ChainId) -> Result<BTreeMap<Epoch, Committee>> {
1528 let query = format!(
1529 "query {{ chain(chainId:\"{chain_id}\") {{
1530 executionState {{ system {{ committees }} }}
1531 }} }}"
1532 );
1533 let mut response = self.query_node(query).await?;
1534 let committees = response["chain"]["executionState"]["system"]["committees"].take();
1535 Ok(serde_json::from_value(committees)?)
1536 }
1537
1538 pub async fn events_from_index(
1539 &self,
1540 chain_id: &ChainId,
1541 stream_id: &StreamId,
1542 start_index: u32,
1543 ) -> Result<Vec<IndexAndEvent>> {
1544 let query = format!(
1545 "query {{
1546 eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1547 {{ index event }}
1548 }}",
1549 stream_id.to_value()
1550 );
1551 let mut response = self.query_node(query).await?;
1552 let response = response["eventsFromIndex"].take();
1553 Ok(serde_json::from_value(response)?)
1554 }
1555
1556 pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1557 let n_try = 5;
1558 let query = query.as_ref();
1559 for i in 0..n_try {
1560 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1561 let url = format!("http://localhost:{}/", self.port);
1562 let client = reqwest_client();
1563 let result = client
1564 .post(url)
1565 .json(&json!({ "query": query }))
1566 .send()
1567 .await;
1568 if matches!(result, Err(ref error) if error.is_timeout()) {
1569 tracing::warn!(
1570 "Timeout when sending query {} to the node service",
1571 truncate_query_output(query)
1572 );
1573 continue;
1574 }
1575 let response = result.with_context(|| {
1576 format!(
1577 "query_node: failed to post query={}",
1578 truncate_query_output(query)
1579 )
1580 })?;
1581 ensure!(
1582 response.status().is_success(),
1583 "Query \"{}\" failed: {}",
1584 truncate_query_output(query),
1585 response
1586 .text()
1587 .await
1588 .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1589 );
1590 let value: Value = response.json().await.context("invalid JSON")?;
1591 if let Some(errors) = value.get("errors") {
1592 tracing::warn!(
1593 "Query \"{}\" failed: {}",
1594 truncate_query_output(query),
1595 errors
1596 );
1597 } else {
1598 return Ok(value["data"].clone());
1599 }
1600 }
1601 bail!(
1602 "Query \"{}\" failed after {} retries.",
1603 truncate_query_output(query),
1604 n_try
1605 );
1606 }
1607
1608 pub async fn create_application<
1609 Abi: ContractAbi,
1610 Parameters: Serialize,
1611 InstantiationArgument: Serialize,
1612 >(
1613 &self,
1614 chain_id: &ChainId,
1615 module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1616 parameters: &Parameters,
1617 argument: &InstantiationArgument,
1618 required_application_ids: &[ApplicationId],
1619 ) -> Result<ApplicationId<Abi>> {
1620 let module_id = module_id.forget_abi();
1621 let json_required_applications_ids = required_application_ids
1622 .iter()
1623 .map(ApplicationId::to_string)
1624 .collect::<Vec<_>>()
1625 .to_value();
1626 let new_parameters = serde_json::to_value(parameters)
1628 .context("could not create parameters JSON")?
1629 .to_value();
1630 let new_argument = serde_json::to_value(argument)
1631 .context("could not create argument JSON")?
1632 .to_value();
1633 let query = format!(
1634 "mutation {{ createApplication(\
1635 chainId: \"{chain_id}\",
1636 moduleId: \"{module_id}\", \
1637 parameters: {new_parameters}, \
1638 instantiationArgument: {new_argument}, \
1639 requiredApplicationIds: {json_required_applications_ids}) \
1640 }}"
1641 );
1642 let data = self.query_node(query).await?;
1643 let app_id_str = data["createApplication"]
1644 .as_str()
1645 .context("missing createApplication string in response")?
1646 .trim();
1647 Ok(app_id_str
1648 .parse::<ApplicationId>()
1649 .context("invalid application ID")?
1650 .with_abi())
1651 }
1652
1653 pub async fn chain_tip(&self, chain: ChainId) -> Result<Option<(CryptoHash, BlockHeight)>> {
1655 let query = format!(
1656 r#"query {{ block(chainId: "{chain}") {{
1657 hash
1658 block {{ header {{ height }} }}
1659 }} }}"#
1660 );
1661
1662 let mut response = self.query_node(&query).await?;
1663
1664 match (
1665 mem::take(&mut response["block"]["hash"]),
1666 mem::take(&mut response["block"]["block"]["header"]["height"]),
1667 ) {
1668 (Value::Null, Value::Null) => Ok(None),
1669 (Value::String(hash), Value::Number(height)) => Ok(Some((
1670 hash.parse()
1671 .context("Received an invalid hash {hash:?} for chain tip")?,
1672 BlockHeight(height.as_u64().unwrap()),
1673 ))),
1674 invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
1675 }
1676 }
1677
1678 pub async fn notifications(
1680 &self,
1681 chain_id: ChainId,
1682 ) -> Result<Pin<Box<impl Stream<Item = Result<Notification>>>>> {
1683 let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
1684 let url = format!("ws://localhost:{}/ws", self.port);
1685 let mut request = url.into_client_request()?;
1686 request.headers_mut().insert(
1687 "Sec-WebSocket-Protocol",
1688 HeaderValue::from_str("graphql-transport-ws")?,
1689 );
1690 let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1691 let init_json = json!({
1692 "type": "connection_init",
1693 "payload": {}
1694 });
1695 websocket.send(init_json.to_string().into()).await?;
1696 let text = websocket
1697 .next()
1698 .await
1699 .context("Failed to establish connection")??
1700 .into_text()?;
1701 ensure!(
1702 text == "{\"type\":\"connection_ack\"}",
1703 "Unexpected response: {text}"
1704 );
1705 let query_json = json!({
1706 "id": "1",
1707 "type": "start",
1708 "payload": {
1709 "query": query,
1710 "variables": {},
1711 "operationName": null
1712 }
1713 });
1714 websocket.send(query_json.to_string().into()).await?;
1715 Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
1716 |message| async {
1717 let text = message.into_text()?;
1718 let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1719 if let Some(errors) = value["payload"].get("errors") {
1720 bail!("Notification subscription failed: {errors:?}");
1721 }
1722 serde_json::from_value(value["payload"]["data"]["notifications"].clone())
1723 .context("Failed to deserialize notification")
1724 },
1725 )))
1726 }
1727}
1728
1729pub struct FaucetService {
1731 port: u16,
1732 child: Child,
1733 _temp_dir: tempfile::TempDir,
1734}
1735
1736impl FaucetService {
1737 fn new(port: u16, child: Child, temp_dir: tempfile::TempDir) -> Self {
1738 Self {
1739 port,
1740 child,
1741 _temp_dir: temp_dir,
1742 }
1743 }
1744
1745 pub async fn terminate(mut self) -> Result<()> {
1746 self.child
1747 .kill()
1748 .await
1749 .context("terminating faucet service")
1750 }
1751
1752 pub fn ensure_is_running(&mut self) -> Result<()> {
1753 self.child.ensure_is_running()
1754 }
1755
1756 pub fn instance(&self) -> Faucet {
1757 Faucet::new(format!("http://localhost:{}/", self.port))
1758 }
1759}
1760
1761pub struct ApplicationWrapper<A> {
1763 uri: String,
1764 _phantom: PhantomData<A>,
1765}
1766
1767impl<A> ApplicationWrapper<A> {
1768 pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
1769 let query = query.as_ref();
1770 let value = self.run_json_query(json!({ "query": query })).await?;
1771 Ok(value["data"].clone())
1772 }
1773
1774 pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
1775 const MAX_RETRIES: usize = 5;
1776
1777 for i in 0.. {
1778 let client = reqwest_client();
1779 let result = client.post(&self.uri).json(&query).send().await;
1780 let response = match result {
1781 Ok(response) => response,
1782 Err(error) if i < MAX_RETRIES => {
1783 tracing::warn!(
1784 "Failed to post query \"{}\": {error}; retrying",
1785 truncate_query_output_serialize(&query),
1786 );
1787 continue;
1788 }
1789 Err(error) => {
1790 let query = truncate_query_output_serialize(&query);
1791 return Err(error)
1792 .with_context(|| format!("run_json_query: failed to post query={query}"));
1793 }
1794 };
1795 ensure!(
1796 response.status().is_success(),
1797 "Query \"{}\" failed: {}",
1798 truncate_query_output_serialize(&query),
1799 response
1800 .text()
1801 .await
1802 .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1803 );
1804 let value: Value = response.json().await.context("invalid JSON")?;
1805 if let Some(errors) = value.get("errors") {
1806 bail!(
1807 "Query \"{}\" failed: {}",
1808 truncate_query_output_serialize(&query),
1809 errors
1810 );
1811 }
1812 return Ok(value);
1813 }
1814 unreachable!()
1815 }
1816
1817 pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
1818 let query = query.as_ref();
1819 self.run_graphql_query(&format!("query {{ {query} }}"))
1820 .await
1821 }
1822
1823 pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
1824 let query = query.as_ref().trim();
1825 let name = query
1826 .split_once(|ch: char| !ch.is_alphanumeric())
1827 .map_or(query, |(name, _)| name);
1828 let data = self.query(query).await?;
1829 serde_json::from_value(data[name].clone())
1830 .with_context(|| format!("{name} field missing in response"))
1831 }
1832
1833 pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
1834 let mutation = mutation.as_ref();
1835 self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
1836 .await
1837 }
1838
1839 pub async fn multiple_mutate(&self, mutations: &[String]) -> Result<Value> {
1840 let mut out = String::from("mutation {\n");
1841 for (index, mutation) in mutations.iter().enumerate() {
1842 out = format!("{} u{}: {}\n", out, index, mutation);
1843 }
1844 out.push_str("}\n");
1845 self.run_graphql_query(&out).await
1846 }
1847}
1848
1849impl<A> From<String> for ApplicationWrapper<A> {
1850 fn from(uri: String) -> ApplicationWrapper<A> {
1851 ApplicationWrapper {
1852 uri,
1853 _phantom: PhantomData,
1854 }
1855 }
1856}
1857
1858#[cfg(with_testing)]
1861fn notification_timeout() -> Duration {
1862 const NOTIFICATION_TIMEOUT_MS_ENV: &str = "LINERA_TEST_NOTIFICATION_TIMEOUT_MS";
1863 const NOTIFICATION_TIMEOUT_MS_DEFAULT: u64 = 10_000;
1864
1865 match env::var(NOTIFICATION_TIMEOUT_MS_ENV) {
1866 Ok(var) => Duration::from_millis(var.parse().unwrap_or_else(|error| {
1867 panic!("{NOTIFICATION_TIMEOUT_MS_ENV} is not a valid number: {error}")
1868 })),
1869 Err(env::VarError::NotPresent) => Duration::from_millis(NOTIFICATION_TIMEOUT_MS_DEFAULT),
1870 Err(env::VarError::NotUnicode(_)) => {
1871 panic!("{NOTIFICATION_TIMEOUT_MS_ENV} must be valid Unicode")
1872 }
1873 }
1874}
1875
1876#[cfg(with_testing)]
1877pub trait NotificationsExt {
1878 fn wait_for<T>(
1880 &mut self,
1881 f: impl FnMut(Notification) -> Option<T>,
1882 ) -> impl Future<Output = Result<T>>;
1883
1884 fn wait_for_events(
1887 &mut self,
1888 expected_height: impl Into<Option<BlockHeight>>,
1889 ) -> impl Future<Output = Result<BTreeSet<StreamId>>> {
1890 let expected_height = expected_height.into();
1891 self.wait_for(move |notification| {
1892 if let Reason::NewEvents {
1893 height,
1894 event_streams,
1895 ..
1896 } = notification.reason
1897 {
1898 if expected_height.is_none_or(|h| h == height) {
1899 return Some(event_streams);
1900 }
1901 }
1902 None
1903 })
1904 }
1905
1906 fn wait_for_block(
1909 &mut self,
1910 expected_height: impl Into<Option<BlockHeight>>,
1911 ) -> impl Future<Output = Result<CryptoHash>> {
1912 let expected_height = expected_height.into();
1913 self.wait_for(move |notification| {
1914 if let Reason::NewBlock { height, hash, .. } = notification.reason {
1915 if expected_height.is_none_or(|h| h == height) {
1916 return Some(hash);
1917 }
1918 }
1919 None
1920 })
1921 }
1922
1923 fn wait_for_bundle(
1926 &mut self,
1927 expected_origin: ChainId,
1928 expected_height: impl Into<Option<BlockHeight>>,
1929 ) -> impl Future<Output = Result<()>> {
1930 let expected_height = expected_height.into();
1931 self.wait_for(move |notification| {
1932 if let Reason::NewIncomingBundle { height, origin } = notification.reason {
1933 if expected_height.is_none_or(|h| h == height) && origin == expected_origin {
1934 return Some(());
1935 }
1936 }
1937 None
1938 })
1939 }
1940}
1941
1942#[cfg(with_testing)]
1943impl<S: Stream<Item = Result<Notification>>> NotificationsExt for Pin<Box<S>> {
1944 async fn wait_for<T>(&mut self, mut f: impl FnMut(Notification) -> Option<T>) -> Result<T> {
1945 let mut timeout = Box::pin(linera_base::time::timer::sleep(notification_timeout())).fuse();
1946 loop {
1947 let notification = futures::select! {
1948 () = timeout => bail!("Timeout waiting for notification"),
1949 notification = self.next().fuse() => notification.context("Stream closed")??,
1950 };
1951 if let Some(t) = f(notification) {
1952 return Ok(t);
1953 }
1954 }
1955 }
1956}