1use std::{
5 borrow::Cow,
6 collections::BTreeMap,
7 env,
8 marker::PhantomData,
9 mem,
10 path::{Path, PathBuf},
11 pin::Pin,
12 process::Stdio,
13 str::FromStr,
14 sync,
15 time::Duration,
16};
17
18use anyhow::{bail, ensure, Context, Result};
19use async_graphql::InputType;
20use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
21use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
22use heck::ToKebabCase;
23use linera_base::{
24 abi::ContractAbi,
25 command::{resolve_binary, CommandExt},
26 crypto::{CryptoHash, InMemorySigner},
27 data_types::{Amount, ApplicationPermissions, BlockHeight, Bytecode, Epoch},
28 identifiers::{
29 Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
30 },
31 vm::VmRuntime,
32};
33use linera_client::client_options::ResourceControlPolicyConfig;
34use linera_core::worker::Notification;
35use linera_execution::committee::Committee;
36use linera_faucet_client::Faucet;
37use serde::{de::DeserializeOwned, ser::Serialize};
38use serde_command_opts::to_args;
39use serde_json::{json, Value};
40use tempfile::TempDir;
41use tokio::{
42 io::{AsyncBufReadExt, BufReader},
43 process::{Child, Command},
44 sync::oneshot,
45 task::JoinHandle,
46};
47#[cfg(with_testing)]
48use {
49 futures::FutureExt as _,
50 linera_core::worker::Reason,
51 std::{collections::BTreeSet, future::Future},
52};
53
54use crate::{
55 cli::command::BenchmarkCommand,
56 cli_wrappers::{
57 local_net::{PathProvider, ProcessInbox},
58 Network,
59 },
60 util::{self, ChildExt},
61 wallet::Wallet,
62};
63
64const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
67
68fn reqwest_client() -> reqwest::Client {
69 reqwest::ClientBuilder::new()
70 .timeout(Duration::from_secs(30))
71 .build()
72 .unwrap()
73}
74
75pub struct ClientWrapper {
77 binary_path: sync::Mutex<Option<PathBuf>>,
78 testing_prng_seed: Option<u64>,
79 storage: String,
80 wallet: String,
81 keystore: String,
82 max_pending_message_bundles: usize,
83 network: Network,
84 pub path_provider: PathProvider,
85 on_drop: OnClientDrop,
86 extra_args: Vec<String>,
87}
88
89#[derive(Clone, Copy, Debug, Eq, PartialEq)]
91pub enum OnClientDrop {
92 CloseChains,
94 LeakChains,
96}
97
98impl ClientWrapper {
99 pub fn new(
100 path_provider: PathProvider,
101 network: Network,
102 testing_prng_seed: Option<u64>,
103 id: usize,
104 on_drop: OnClientDrop,
105 ) -> Self {
106 Self::new_with_extra_args(
107 path_provider,
108 network,
109 testing_prng_seed,
110 id,
111 on_drop,
112 vec!["--wait-for-outgoing-messages".to_string()],
113 None,
114 )
115 }
116
117 pub fn new_with_extra_args(
118 path_provider: PathProvider,
119 network: Network,
120 testing_prng_seed: Option<u64>,
121 id: usize,
122 on_drop: OnClientDrop,
123 extra_args: Vec<String>,
124 binary_dir: Option<PathBuf>,
125 ) -> Self {
126 let storage = format!(
127 "rocksdb:{}/client_{}.db",
128 path_provider.path().display(),
129 id
130 );
131 let wallet = format!("wallet_{}.json", id);
132 let keystore = format!("keystore_{}.json", id);
133 let binary_path = binary_dir.map(|dir| dir.join("linera"));
134 Self {
135 binary_path: sync::Mutex::new(binary_path),
136 testing_prng_seed,
137 storage,
138 wallet,
139 keystore,
140 max_pending_message_bundles: 10_000,
141 network,
142 path_provider,
143 on_drop,
144 extra_args,
145 }
146 }
147
148 pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
150 let tmp = TempDir::new()?;
151 let mut command = self.command().await?;
152 command
153 .current_dir(tmp.path())
154 .arg("project")
155 .arg("new")
156 .arg(project_name)
157 .arg("--linera-root")
158 .arg(linera_root)
159 .spawn_and_wait_for_stdout()
160 .await?;
161 Ok(tmp)
162 }
163
164 pub async fn project_publish<T: Serialize>(
166 &self,
167 path: PathBuf,
168 required_application_ids: Vec<String>,
169 publisher: impl Into<Option<ChainId>>,
170 argument: &T,
171 ) -> Result<String> {
172 let json_parameters = serde_json::to_string(&())?;
173 let json_argument = serde_json::to_string(argument)?;
174 let mut command = self.command().await?;
175 command
176 .arg("project")
177 .arg("publish-and-create")
178 .arg(path)
179 .args(publisher.into().iter().map(ChainId::to_string))
180 .args(["--json-parameters", &json_parameters])
181 .args(["--json-argument", &json_argument]);
182 if !required_application_ids.is_empty() {
183 command.arg("--required-application-ids");
184 command.args(required_application_ids);
185 }
186 let stdout = command.spawn_and_wait_for_stdout().await?;
187 Ok(stdout.trim().to_string())
188 }
189
190 pub async fn project_test(&self, path: &Path) -> Result<()> {
192 self.command()
193 .await
194 .context("failed to create project test command")?
195 .current_dir(path)
196 .arg("project")
197 .arg("test")
198 .spawn_and_wait_for_stdout()
199 .await?;
200 Ok(())
201 }
202
203 async fn command_with_envs_and_arguments(
204 &self,
205 envs: &[(&str, &str)],
206 arguments: impl IntoIterator<Item = Cow<'_, str>>,
207 ) -> Result<Command> {
208 let mut command = self.command_binary().await?;
209 command.current_dir(self.path_provider.path());
210 for (key, value) in envs {
211 command.env(key, value);
212 }
213 for argument in arguments {
214 command.arg(&*argument);
215 }
216 Ok(command)
217 }
218
219 async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
220 self.command_with_envs_and_arguments(envs, self.command_arguments())
221 .await
222 }
223
224 async fn command_with_arguments(
225 &self,
226 arguments: impl IntoIterator<Item = Cow<'_, str>>,
227 ) -> Result<Command> {
228 self.command_with_envs_and_arguments(
229 &[(
230 "RUST_LOG",
231 &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
232 )],
233 arguments,
234 )
235 .await
236 }
237
238 async fn command(&self) -> Result<Command> {
239 self.command_with_envs(&[(
240 "RUST_LOG",
241 &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
242 )])
243 .await
244 }
245
246 fn required_command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
247 [
248 "--wallet".into(),
249 self.wallet.as_str().into(),
250 "--keystore".into(),
251 self.keystore.as_str().into(),
252 "--storage".into(),
253 self.storage.as_str().into(),
254 "--send-timeout-ms".into(),
255 "500000".into(),
256 "--recv-timeout-ms".into(),
257 "500000".into(),
258 ]
259 .into_iter()
260 .chain(self.extra_args.iter().map(|s| s.as_str().into()))
261 }
262
263 fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
265 self.required_command_arguments().chain([
266 "--max-pending-message-bundles".into(),
267 self.max_pending_message_bundles.to_string().into(),
268 ])
269 }
270
271 async fn command_binary(&self) -> Result<Command> {
275 match self.command_with_cached_binary_path() {
276 Some(command) => Ok(command),
277 None => {
278 let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
279 let command = Command::new(&resolved_path);
280
281 self.set_cached_binary_path(resolved_path);
282
283 Ok(command)
284 }
285 }
286 }
287
288 fn command_with_cached_binary_path(&self) -> Option<Command> {
290 let binary_path = self.binary_path.lock().unwrap();
291
292 binary_path.as_ref().map(Command::new)
293 }
294
295 fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
303 let mut binary_path = self.binary_path.lock().unwrap();
304
305 if binary_path.is_none() {
306 *binary_path = Some(new_binary_path);
307 } else {
308 assert_eq!(*binary_path, Some(new_binary_path));
309 }
310 }
311
312 pub async fn create_genesis_config(
314 &self,
315 num_other_initial_chains: u32,
316 initial_funding: Amount,
317 policy_config: ResourceControlPolicyConfig,
318 http_allow_list: Option<Vec<String>>,
319 ) -> Result<()> {
320 let mut command = self.command().await?;
321 command
322 .args([
323 "create-genesis-config",
324 &num_other_initial_chains.to_string(),
325 ])
326 .args(["--initial-funding", &initial_funding.to_string()])
327 .args(["--committee", "committee.json"])
328 .args(["--genesis", "genesis.json"])
329 .args([
330 "--policy-config",
331 &policy_config.to_string().to_kebab_case(),
332 ]);
333 if let Some(allow_list) = http_allow_list {
334 command
335 .arg("--http-request-allow-list")
336 .arg(allow_list.join(","));
337 }
338 if let Some(seed) = self.testing_prng_seed {
339 command.arg("--testing-prng-seed").arg(seed.to_string());
340 }
341 command.spawn_and_wait_for_stdout().await?;
342 Ok(())
343 }
344
345 pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
348 let mut command = self.command().await?;
349 command.args(["wallet", "init"]);
350 match faucet {
351 None => command.args(["--genesis", "genesis.json"]),
352 Some(faucet) => command.args(["--faucet", faucet.url()]),
353 };
354 if let Some(seed) = self.testing_prng_seed {
355 command.arg("--testing-prng-seed").arg(seed.to_string());
356 }
357 command.spawn_and_wait_for_stdout().await?;
358 Ok(())
359 }
360
361 pub async fn request_chain(
363 &self,
364 faucet: &Faucet,
365 set_default: bool,
366 ) -> Result<(ChainId, AccountOwner)> {
367 let mut command = self.command().await?;
368 command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
369 if set_default {
370 command.arg("--set-default");
371 }
372 let stdout = command.spawn_and_wait_for_stdout().await?;
373 let mut lines = stdout.split_whitespace();
374 let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
375 let owner = lines.next().context("missing chain owner")?.parse()?;
376 Ok((chain_id, owner))
377 }
378
379 #[expect(clippy::too_many_arguments)]
381 pub async fn publish_and_create<
382 A: ContractAbi,
383 Parameters: Serialize,
384 InstantiationArgument: Serialize,
385 >(
386 &self,
387 contract: PathBuf,
388 service: PathBuf,
389 vm_runtime: VmRuntime,
390 parameters: &Parameters,
391 argument: &InstantiationArgument,
392 required_application_ids: &[ApplicationId],
393 publisher: impl Into<Option<ChainId>>,
394 ) -> Result<ApplicationId<A>> {
395 let json_parameters = serde_json::to_string(parameters)?;
396 let json_argument = serde_json::to_string(argument)?;
397 let mut command = self.command().await?;
398 let vm_runtime = format!("{}", vm_runtime);
399 command
400 .arg("publish-and-create")
401 .args([contract, service])
402 .args(["--vm-runtime", &vm_runtime.to_lowercase()])
403 .args(publisher.into().iter().map(ChainId::to_string))
404 .args(["--json-parameters", &json_parameters])
405 .args(["--json-argument", &json_argument]);
406 if !required_application_ids.is_empty() {
407 command.arg("--required-application-ids");
408 command.args(
409 required_application_ids
410 .iter()
411 .map(ApplicationId::to_string),
412 );
413 }
414 let stdout = command.spawn_and_wait_for_stdout().await?;
415 Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
416 }
417
418 pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
420 &self,
421 contract: PathBuf,
422 service: PathBuf,
423 vm_runtime: VmRuntime,
424 publisher: impl Into<Option<ChainId>>,
425 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
426 let stdout = self
427 .command()
428 .await?
429 .arg("publish-module")
430 .args([contract, service])
431 .args(["--vm-runtime", &format!("{}", vm_runtime).to_lowercase()])
432 .args(publisher.into().iter().map(ChainId::to_string))
433 .spawn_and_wait_for_stdout()
434 .await?;
435 let module_id: ModuleId = stdout.trim().parse()?;
436 Ok(module_id.with_abi())
437 }
438
439 pub async fn create_application<
441 Abi: ContractAbi,
442 Parameters: Serialize,
443 InstantiationArgument: Serialize,
444 >(
445 &self,
446 module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
447 parameters: &Parameters,
448 argument: &InstantiationArgument,
449 required_application_ids: &[ApplicationId],
450 creator: impl Into<Option<ChainId>>,
451 ) -> Result<ApplicationId<Abi>> {
452 let json_parameters = serde_json::to_string(parameters)?;
453 let json_argument = serde_json::to_string(argument)?;
454 let mut command = self.command().await?;
455 command
456 .arg("create-application")
457 .arg(module_id.forget_abi().to_string())
458 .args(["--json-parameters", &json_parameters])
459 .args(["--json-argument", &json_argument])
460 .args(creator.into().iter().map(ChainId::to_string));
461 if !required_application_ids.is_empty() {
462 command.arg("--required-application-ids");
463 command.args(
464 required_application_ids
465 .iter()
466 .map(ApplicationId::to_string),
467 );
468 }
469 let stdout = command.spawn_and_wait_for_stdout().await?;
470 Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
471 }
472
473 pub async fn run_node_service(
475 &self,
476 port: impl Into<Option<u16>>,
477 process_inbox: ProcessInbox,
478 ) -> Result<NodeService> {
479 self.run_node_service_with_options(port, process_inbox, &[], &[], false)
480 .await
481 }
482
483 pub async fn run_node_service_with_options(
485 &self,
486 port: impl Into<Option<u16>>,
487 process_inbox: ProcessInbox,
488 operator_application_ids: &[ApplicationId],
489 operators: &[(String, PathBuf)],
490 read_only: bool,
491 ) -> Result<NodeService> {
492 let port = port.into().unwrap_or(8080);
493 let mut command = self.command().await?;
494 command.arg("service");
495 if let ProcessInbox::Skip = process_inbox {
496 command.arg("--listener-skip-process-inbox");
497 }
498 if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
499 command.args(var.split_whitespace());
500 }
501 for app_id in operator_application_ids {
502 command.args(["--operator-application-ids", &app_id.to_string()]);
503 }
504 for (name, path) in operators {
505 command.args(["--operators", &format!("{}={}", name, path.display())]);
506 }
507 if read_only {
508 command.arg("--read-only");
509 }
510 let child = command
511 .args(["--port".to_string(), port.to_string()])
512 .spawn_into()?;
513 let client = reqwest_client();
514 for i in 0..10 {
515 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
516 let request = client
517 .get(format!("http://localhost:{}/", port))
518 .send()
519 .await;
520 if request.is_ok() {
521 tracing::info!("Node service has started");
522 return Ok(NodeService::new(port, child));
523 } else {
524 tracing::warn!("Waiting for node service to start");
525 }
526 }
527 bail!("Failed to start node service");
528 }
529
530 pub async fn run_node_service_with_controller(
532 &self,
533 port: impl Into<Option<u16>>,
534 process_inbox: ProcessInbox,
535 controller_id: &ApplicationId,
536 operators: &[(String, PathBuf)],
537 ) -> Result<NodeService> {
538 let port = port.into().unwrap_or(8080);
539 let mut command = self.command().await?;
540 command.arg("service");
541 if let ProcessInbox::Skip = process_inbox {
542 command.arg("--listener-skip-process-inbox");
543 }
544 if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
545 command.args(var.split_whitespace());
546 }
547 command.args(["--controller-id", &controller_id.to_string()]);
548 for (name, path) in operators {
549 command.args(["--operators", &format!("{}={}", name, path.display())]);
550 }
551 let child = command
552 .args(["--port".to_string(), port.to_string()])
553 .spawn_into()?;
554 let client = reqwest_client();
555 for i in 0..10 {
556 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
557 let request = client
558 .get(format!("http://localhost:{}/", port))
559 .send()
560 .await;
561 if request.is_ok() {
562 tracing::info!("Node service has started");
563 return Ok(NodeService::new(port, child));
564 } else {
565 tracing::warn!("Waiting for node service to start");
566 }
567 }
568 bail!("Failed to start node service");
569 }
570
571 pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
573 let mut command = self.command().await?;
574 command.arg("validator").arg("query").arg(address);
575 let stdout = command.spawn_and_wait_for_stdout().await?;
576
577 let hash = stdout
580 .lines()
581 .find_map(|line| {
582 line.strip_prefix("Genesis config hash: ")
583 .and_then(|hash_str| hash_str.trim().parse().ok())
584 })
585 .context("error while parsing the result of `linera validator query`")?;
586 Ok(hash)
587 }
588
589 pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
591 let mut command = self.command().await?;
592 command.arg("validator").arg("list");
593 if let Some(chain_id) = chain_id {
594 command.args(["--chain-id", &chain_id.to_string()]);
595 }
596 command.spawn_and_wait_for_stdout().await?;
597 Ok(())
598 }
599
600 pub async fn sync_validator(
602 &self,
603 chain_ids: impl IntoIterator<Item = &ChainId>,
604 validator_address: impl Into<String>,
605 ) -> Result<()> {
606 let mut command = self.command().await?;
607 command
608 .arg("validator")
609 .arg("sync")
610 .arg(validator_address.into());
611 let mut chain_ids = chain_ids.into_iter().peekable();
612 if chain_ids.peek().is_some() {
613 command
614 .arg("--chains")
615 .args(chain_ids.map(ChainId::to_string));
616 }
617 command.spawn_and_wait_for_stdout().await?;
618 Ok(())
619 }
620
621 pub async fn run_faucet(
623 &self,
624 port: impl Into<Option<u16>>,
625 chain_id: Option<ChainId>,
626 amount: Amount,
627 ) -> Result<FaucetService> {
628 let port = port.into().unwrap_or(8080);
629 let temp_dir = tempfile::tempdir()
630 .context("Failed to create temporary directory for faucet storage")?;
631 let storage_path = temp_dir.path().join("faucet_storage.sqlite");
632 let mut command = self.command().await?;
633 let command = command
634 .arg("faucet")
635 .args(["--port".to_string(), port.to_string()])
636 .args(["--amount".to_string(), amount.to_string()])
637 .args([
638 "--storage-path".to_string(),
639 storage_path.to_string_lossy().to_string(),
640 ]);
641 if let Some(chain_id) = chain_id {
642 command.arg(chain_id.to_string());
643 }
644 let child = command.spawn_into()?;
645 let client = reqwest_client();
646 for i in 0..10 {
647 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
648 let request = client
649 .get(format!("http://localhost:{}/", port))
650 .send()
651 .await;
652 if request.is_ok() {
653 tracing::info!("Faucet has started");
654 return Ok(FaucetService::new(port, child, temp_dir));
655 } else {
656 tracing::debug!("Waiting for faucet to start");
657 }
658 }
659 bail!("Failed to start faucet");
660 }
661
662 pub async fn local_balance(&self, account: Account) -> Result<Amount> {
664 let stdout = self
665 .command()
666 .await?
667 .arg("local-balance")
668 .arg(account.to_string())
669 .spawn_and_wait_for_stdout()
670 .await?;
671 let amount = stdout
672 .trim()
673 .parse()
674 .context("error while parsing the result of `linera local-balance`")?;
675 Ok(amount)
676 }
677
678 pub async fn query_balance(&self, account: Account) -> Result<Amount> {
680 let stdout = self
681 .command()
682 .await?
683 .arg("query-balance")
684 .arg(account.to_string())
685 .spawn_and_wait_for_stdout()
686 .await?;
687 let amount = stdout
688 .trim()
689 .parse()
690 .context("error while parsing the result of `linera query-balance`")?;
691 Ok(amount)
692 }
693
694 pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
696 self.command()
697 .await?
698 .arg("sync")
699 .arg(chain_id.to_string())
700 .spawn_and_wait_for_stdout()
701 .await?;
702 Ok(())
703 }
704
705 pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
707 self.command()
708 .await?
709 .arg("process-inbox")
710 .arg(chain_id.to_string())
711 .spawn_and_wait_for_stdout()
712 .await?;
713 Ok(())
714 }
715
716 pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
718 self.command()
719 .await?
720 .arg("transfer")
721 .arg(amount.to_string())
722 .args(["--from", &from.to_string()])
723 .args(["--to", &to.to_string()])
724 .spawn_and_wait_for_stdout()
725 .await?;
726 Ok(())
727 }
728
729 pub async fn transfer_with_silent_logs(
731 &self,
732 amount: Amount,
733 from: ChainId,
734 to: ChainId,
735 ) -> Result<()> {
736 self.command()
737 .await?
738 .env("RUST_LOG", "off")
739 .arg("transfer")
740 .arg(amount.to_string())
741 .args(["--from", &from.to_string()])
742 .args(["--to", &to.to_string()])
743 .spawn_and_wait_for_stdout()
744 .await?;
745 Ok(())
746 }
747
748 pub async fn transfer_with_accounts(
750 &self,
751 amount: Amount,
752 from: Account,
753 to: Account,
754 ) -> Result<()> {
755 self.command()
756 .await?
757 .arg("transfer")
758 .arg(amount.to_string())
759 .args(["--from", &from.to_string()])
760 .args(["--to", &to.to_string()])
761 .spawn_and_wait_for_stdout()
762 .await?;
763 Ok(())
764 }
765
766 fn benchmark_command_internal(command: &mut Command, args: BenchmarkCommand) -> Result<()> {
767 let mut formatted_args = to_args(&args)?;
768 let subcommand = formatted_args.remove(0);
769 formatted_args.remove(0);
772 let options = formatted_args
773 .chunks_exact(2)
774 .flat_map(|pair| {
775 let option = format!("--{}", pair[0]);
776 match pair[1].as_str() {
777 "true" => vec![option],
778 "false" => vec![],
779 _ => vec![option, pair[1].clone()],
780 }
781 })
782 .collect::<Vec<_>>();
783 command
784 .args([
785 "--max-pending-message-bundles",
786 &args.transactions_per_block().to_string(),
787 ])
788 .arg("benchmark")
789 .arg(subcommand)
790 .args(options);
791 Ok(())
792 }
793
794 async fn benchmark_command_with_envs(
795 &self,
796 args: BenchmarkCommand,
797 envs: &[(&str, &str)],
798 ) -> Result<Command> {
799 let mut command = self
800 .command_with_envs_and_arguments(envs, self.required_command_arguments())
801 .await?;
802 Self::benchmark_command_internal(&mut command, args)?;
803 Ok(command)
804 }
805
806 async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
807 let mut command = self
808 .command_with_arguments(self.required_command_arguments())
809 .await?;
810 Self::benchmark_command_internal(&mut command, args)?;
811 Ok(command)
812 }
813
814 pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
816 let mut command = self.benchmark_command(args).await?;
817 command.spawn_and_wait_for_stdout().await?;
818 Ok(())
819 }
820
821 pub async fn benchmark_detached(
824 &self,
825 args: BenchmarkCommand,
826 tx: oneshot::Sender<()>,
827 ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
828 let mut child = self
829 .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
830 .await?
831 .kill_on_drop(true)
832 .stdin(Stdio::piped())
833 .stdout(Stdio::piped())
834 .stderr(Stdio::piped())
835 .spawn()?;
836
837 let pid = child.id().expect("failed to get pid");
838 let stdout = child.stdout.take().expect("stdout not open");
839 let stdout_handle = tokio::spawn(async move {
840 let mut lines = BufReader::new(stdout).lines();
841 while let Ok(Some(line)) = lines.next_line().await {
842 println!("benchmark{{pid={pid}}} {line}");
843 }
844 });
845
846 let stderr = child.stderr.take().expect("stderr not open");
847 let stderr_handle = tokio::spawn(async move {
848 let mut lines = BufReader::new(stderr).lines();
849 let mut tx = Some(tx);
850 while let Ok(Some(line)) = lines.next_line().await {
851 if line.contains("Ready to start benchmark") {
852 tx.take()
853 .expect("Should only send signal once")
854 .send(())
855 .expect("failed to send ready signal to main thread");
856 } else {
857 println!("benchmark{{pid={pid}}} {line}");
858 }
859 }
860 });
861 Ok((child, stdout_handle, stderr_handle))
862 }
863
864 async fn open_chain_internal(
865 &self,
866 from: ChainId,
867 owner: Option<AccountOwner>,
868 initial_balance: Amount,
869 super_owner: bool,
870 ) -> Result<(ChainId, AccountOwner)> {
871 let mut command = self.command().await?;
872 command
873 .arg("open-chain")
874 .args(["--from", &from.to_string()])
875 .args(["--initial-balance", &initial_balance.to_string()]);
876
877 if let Some(owner) = owner {
878 command.args(["--owner", &owner.to_string()]);
879 }
880
881 if super_owner {
882 command.arg("--super-owner");
883 }
884
885 let stdout = command.spawn_and_wait_for_stdout().await?;
886 let mut split = stdout.split('\n');
887 let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
888 let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
889 if let Some(owner) = owner {
890 assert_eq!(owner, new_owner);
891 }
892 Ok((chain_id, new_owner))
893 }
894
895 pub async fn open_chain_super_owner(
897 &self,
898 from: ChainId,
899 owner: Option<AccountOwner>,
900 initial_balance: Amount,
901 ) -> Result<(ChainId, AccountOwner)> {
902 self.open_chain_internal(from, owner, initial_balance, true)
903 .await
904 }
905
906 pub async fn open_chain(
908 &self,
909 from: ChainId,
910 owner: Option<AccountOwner>,
911 initial_balance: Amount,
912 ) -> Result<(ChainId, AccountOwner)> {
913 self.open_chain_internal(from, owner, initial_balance, false)
914 .await
915 }
916
917 pub async fn open_and_assign(
919 &self,
920 client: &ClientWrapper,
921 initial_balance: Amount,
922 ) -> Result<ChainId> {
923 let our_chain = self
924 .load_wallet()?
925 .default_chain()
926 .context("no default chain found")?;
927 let owner = client.keygen().await?;
928 let (new_chain, _) = self
929 .open_chain(our_chain, Some(owner), initial_balance)
930 .await?;
931 client.assign(owner, new_chain).await?;
932 Ok(new_chain)
933 }
934
935 pub async fn open_multi_owner_chain(
936 &self,
937 from: ChainId,
938 owners: BTreeMap<AccountOwner, u64>,
939 multi_leader_rounds: u32,
940 balance: Amount,
941 base_timeout_ms: u64,
942 ) -> Result<ChainId> {
943 let mut command = self.command().await?;
944 command
945 .arg("open-multi-owner-chain")
946 .args(["--from", &from.to_string()])
947 .arg("--owners")
948 .arg(serde_json::to_string(&owners)?)
949 .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
950 command
951 .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
952 .args(["--initial-balance", &balance.to_string()]);
953
954 let stdout = command.spawn_and_wait_for_stdout().await?;
955 let mut split = stdout.split('\n');
956 let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
957
958 Ok(chain_id)
959 }
960
961 pub async fn change_ownership(
962 &self,
963 chain_id: ChainId,
964 super_owners: Vec<AccountOwner>,
965 owners: Vec<AccountOwner>,
966 ) -> Result<()> {
967 let mut command = self.command().await?;
968 command
969 .arg("change-ownership")
970 .args(["--chain-id", &chain_id.to_string()]);
971 command
972 .arg("--super-owners")
973 .arg(serde_json::to_string(&super_owners)?);
974 command.arg("--owners").arg(serde_json::to_string(
975 &owners
976 .into_iter()
977 .zip(std::iter::repeat(100u64))
978 .collect::<BTreeMap<_, _>>(),
979 )?);
980 command.spawn_and_wait_for_stdout().await?;
981 Ok(())
982 }
983
984 pub async fn change_application_permissions(
985 &self,
986 chain_id: ChainId,
987 application_permissions: ApplicationPermissions,
988 ) -> Result<()> {
989 let mut command = self.command().await?;
990 command
991 .arg("change-application-permissions")
992 .args(["--chain-id", &chain_id.to_string()]);
993 command.arg("--manage-chain").arg(serde_json::to_string(
994 &application_permissions.manage_chain,
995 )?);
996 command.spawn_and_wait_for_stdout().await?;
998 Ok(())
999 }
1000
1001 pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
1003 let mut command = self.command().await?;
1004 command
1005 .args(["wallet", "follow-chain"])
1006 .arg(chain_id.to_string());
1007 if sync {
1008 command.arg("--sync");
1009 }
1010 command.spawn_and_wait_for_stdout().await?;
1011 Ok(())
1012 }
1013
1014 pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
1016 let mut command = self.command().await?;
1017 command
1018 .args(["wallet", "forget-chain"])
1019 .arg(chain_id.to_string());
1020 command.spawn_and_wait_for_stdout().await?;
1021 Ok(())
1022 }
1023
1024 pub async fn set_default_chain(&self, chain_id: ChainId) -> Result<()> {
1026 let mut command = self.command().await?;
1027 command
1028 .args(["wallet", "set-default"])
1029 .arg(chain_id.to_string());
1030 command.spawn_and_wait_for_stdout().await?;
1031 Ok(())
1032 }
1033
1034 pub async fn retry_pending_block(
1035 &self,
1036 chain_id: Option<ChainId>,
1037 ) -> Result<Option<CryptoHash>> {
1038 let mut command = self.command().await?;
1039 command.arg("retry-pending-block");
1040 if let Some(chain_id) = chain_id {
1041 command.arg(chain_id.to_string());
1042 }
1043 let stdout = command.spawn_and_wait_for_stdout().await?;
1044 let stdout = stdout.trim();
1045 if stdout.is_empty() {
1046 Ok(None)
1047 } else {
1048 Ok(Some(CryptoHash::from_str(stdout)?))
1049 }
1050 }
1051
1052 pub async fn publish_data_blob(
1054 &self,
1055 path: &Path,
1056 chain_id: Option<ChainId>,
1057 ) -> Result<CryptoHash> {
1058 let mut command = self.command().await?;
1059 command.arg("publish-data-blob").arg(path);
1060 if let Some(chain_id) = chain_id {
1061 command.arg(chain_id.to_string());
1062 }
1063 let stdout = command.spawn_and_wait_for_stdout().await?;
1064 let stdout = stdout.trim();
1065 Ok(CryptoHash::from_str(stdout)?)
1066 }
1067
1068 pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
1070 let mut command = self.command().await?;
1071 command.arg("read-data-blob").arg(hash.to_string());
1072 if let Some(chain_id) = chain_id {
1073 command.arg(chain_id.to_string());
1074 }
1075 command.spawn_and_wait_for_stdout().await?;
1076 Ok(())
1077 }
1078
1079 pub fn load_wallet(&self) -> Result<Wallet> {
1080 Ok(Wallet::read(&self.wallet_path())?)
1081 }
1082
1083 pub fn load_keystore(&self) -> Result<InMemorySigner> {
1084 util::read_json(self.keystore_path())
1085 }
1086
1087 pub fn wallet_path(&self) -> PathBuf {
1088 self.path_provider.path().join(&self.wallet)
1089 }
1090
1091 pub fn keystore_path(&self) -> PathBuf {
1092 self.path_provider.path().join(&self.keystore)
1093 }
1094
1095 pub fn storage_path(&self) -> &str {
1096 &self.storage
1097 }
1098
1099 pub fn get_owner(&self) -> Option<AccountOwner> {
1100 let wallet = self.load_wallet().ok()?;
1101 wallet
1102 .get(wallet.default_chain()?)
1103 .expect("default chain must be in wallet")
1104 .owner
1105 }
1106
1107 pub fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
1108 self.load_wallet()
1109 .ok()
1110 .is_some_and(|wallet| wallet.get(chain).is_some())
1111 }
1112
1113 pub async fn set_validator(
1114 &self,
1115 validator_key: &(String, String),
1116 port: usize,
1117 votes: usize,
1118 ) -> Result<()> {
1119 let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1120 self.command()
1121 .await?
1122 .arg("validator")
1123 .arg("add")
1124 .args(["--public-key", &validator_key.0])
1125 .args(["--account-key", &validator_key.1])
1126 .args(["--address", &address])
1127 .args(["--votes", &votes.to_string()])
1128 .spawn_and_wait_for_stdout()
1129 .await?;
1130 Ok(())
1131 }
1132
1133 pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
1134 self.command()
1135 .await?
1136 .arg("validator")
1137 .arg("remove")
1138 .args(["--public-key", validator_key])
1139 .spawn_and_wait_for_stdout()
1140 .await?;
1141 Ok(())
1142 }
1143
1144 pub async fn change_validators(
1145 &self,
1146 add_validators: &[(String, String, usize, usize)], modify_validators: &[(String, String, usize, usize)], remove_validators: &[String],
1149 ) -> Result<()> {
1150 use std::str::FromStr;
1151
1152 use linera_base::crypto::{AccountPublicKey, ValidatorPublicKey};
1153
1154 let mut changes = std::collections::HashMap::new();
1157
1158 for (public_key_str, account_key_str, port, votes) in
1160 add_validators.iter().chain(modify_validators.iter())
1161 {
1162 let public_key = ValidatorPublicKey::from_str(public_key_str)
1163 .with_context(|| format!("Invalid validator public key: {}", public_key_str))?;
1164
1165 let account_key = AccountPublicKey::from_str(account_key_str)
1166 .with_context(|| format!("Invalid account public key: {}", account_key_str))?;
1167
1168 let address = format!("{}:127.0.0.1:{}", self.network.short(), port)
1169 .parse()
1170 .unwrap();
1171
1172 let change = crate::cli::validator::Change {
1174 account_key,
1175 address,
1176 votes: crate::cli::validator::Votes(
1177 std::num::NonZero::new(*votes as u64).context("Votes must be non-zero")?,
1178 ),
1179 };
1180
1181 changes.insert(public_key, Some(change));
1182 }
1183
1184 for validator_key_str in remove_validators {
1186 let public_key = ValidatorPublicKey::from_str(validator_key_str)
1187 .with_context(|| format!("Invalid validator public key: {}", validator_key_str))?;
1188 changes.insert(public_key, None);
1189 }
1190
1191 let temp_file = tempfile::NamedTempFile::new()
1193 .context("Failed to create temporary file for validator changes")?;
1194 serde_json::to_writer(&temp_file, &changes)
1195 .context("Failed to write validator changes to file")?;
1196 let temp_path = temp_file.path();
1197
1198 self.command()
1199 .await?
1200 .arg("validator")
1201 .arg("update")
1202 .arg(temp_path)
1203 .arg("--yes") .spawn_and_wait_for_stdout()
1205 .await?;
1206
1207 Ok(())
1208 }
1209
1210 pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
1211 self.command()
1212 .await?
1213 .arg("revoke-epochs")
1214 .arg(epoch.to_string())
1215 .spawn_and_wait_for_stdout()
1216 .await?;
1217 Ok(())
1218 }
1219
1220 pub async fn keygen(&self) -> Result<AccountOwner> {
1222 let stdout = self
1223 .command()
1224 .await?
1225 .arg("keygen")
1226 .spawn_and_wait_for_stdout()
1227 .await?;
1228 AccountOwner::from_str(stdout.as_str().trim())
1229 }
1230
1231 pub fn default_chain(&self) -> Option<ChainId> {
1233 self.load_wallet().ok()?.default_chain()
1234 }
1235
1236 pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
1238 let _stdout = self
1239 .command()
1240 .await?
1241 .arg("assign")
1242 .args(["--owner", &owner.to_string()])
1243 .args(["--chain-id", &chain_id.to_string()])
1244 .spawn_and_wait_for_stdout()
1245 .await?;
1246 Ok(())
1247 }
1248
1249 pub async fn set_preferred_owner(
1251 &self,
1252 chain_id: ChainId,
1253 owner: Option<AccountOwner>,
1254 ) -> Result<()> {
1255 let mut owner_arg = vec!["--owner".to_string()];
1256 if let Some(owner) = owner {
1257 owner_arg.push(owner.to_string());
1258 };
1259 self.command()
1260 .await?
1261 .arg("set-preferred-owner")
1262 .args(["--chain-id", &chain_id.to_string()])
1263 .args(owner_arg)
1264 .spawn_and_wait_for_stdout()
1265 .await?;
1266 Ok(())
1267 }
1268
1269 pub async fn build_application(
1270 &self,
1271 path: &Path,
1272 name: &str,
1273 is_workspace: bool,
1274 ) -> Result<(PathBuf, PathBuf)> {
1275 Command::new("cargo")
1276 .current_dir(self.path_provider.path())
1277 .arg("build")
1278 .arg("--release")
1279 .args(["--target", "wasm32-unknown-unknown"])
1280 .arg("--manifest-path")
1281 .arg(path.join("Cargo.toml"))
1282 .spawn_and_wait_for_stdout()
1283 .await?;
1284
1285 let release_dir = match is_workspace {
1286 true => path.join("../target/wasm32-unknown-unknown/release"),
1287 false => path.join("target/wasm32-unknown-unknown/release"),
1288 };
1289
1290 let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1291 let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1292
1293 let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1294 let service_size = fs_err::tokio::metadata(&service).await?.len();
1295 tracing::info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1296
1297 Ok((contract, service))
1298 }
1299}
1300
1301impl Drop for ClientWrapper {
1302 fn drop(&mut self) {
1303 use std::process::Command as SyncCommand;
1304
1305 if self.on_drop != OnClientDrop::CloseChains {
1306 return;
1307 }
1308
1309 let Ok(binary_path) = self.binary_path.lock() else {
1310 tracing::error!(
1311 "Failed to close chains because a thread panicked with a lock to `binary_path`"
1312 );
1313 return;
1314 };
1315
1316 let Some(binary_path) = binary_path.as_ref() else {
1317 tracing::warn!(
1318 "Assuming no chains need to be closed, because the command binary was never \
1319 resolved and therefore presumably never called"
1320 );
1321 return;
1322 };
1323
1324 let working_directory = self.path_provider.path();
1325 let mut wallet_show_command = SyncCommand::new(binary_path);
1326
1327 for argument in self.command_arguments() {
1328 wallet_show_command.arg(&*argument);
1329 }
1330
1331 let Ok(wallet_show_output) = wallet_show_command
1332 .current_dir(working_directory)
1333 .args(["wallet", "show", "--short", "--owned"])
1334 .output()
1335 else {
1336 tracing::warn!("Failed to execute `wallet show --short` to list chains to close");
1337 return;
1338 };
1339
1340 if !wallet_show_output.status.success() {
1341 tracing::warn!("Failed to list chains in the wallet to close them");
1342 return;
1343 }
1344
1345 let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1346 tracing::warn!(
1347 "Failed to close chains because `linera wallet show --short` \
1348 returned a non-UTF-8 output"
1349 );
1350 return;
1351 };
1352
1353 let chain_ids = chain_list_string
1354 .split('\n')
1355 .map(|line| line.trim())
1356 .filter(|line| !line.is_empty());
1357
1358 for chain_id in chain_ids {
1359 let mut close_chain_command = SyncCommand::new(binary_path);
1360
1361 for argument in self.command_arguments() {
1362 close_chain_command.arg(&*argument);
1363 }
1364
1365 close_chain_command.current_dir(working_directory);
1366
1367 match close_chain_command.args(["close-chain", chain_id]).status() {
1368 Ok(status) if status.success() => (),
1369 Ok(failure) => tracing::warn!("Failed to close chain {chain_id}: {failure}"),
1370 Err(error) => tracing::warn!("Failed to close chain {chain_id}: {error}"),
1371 }
1372 }
1373 }
1374}
1375
1376#[cfg(with_testing)]
1377impl ClientWrapper {
1378 pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1379 self.build_application(Self::example_path(name)?.as_path(), name, true)
1380 .await
1381 }
1382
1383 pub fn example_path(name: &str) -> Result<PathBuf> {
1384 Ok(env::current_dir()?.join("../examples/").join(name))
1385 }
1386}
1387
1388fn truncate_query_output(input: &str) -> String {
1389 let max_len = 1000;
1390 if input.len() < max_len {
1391 input.to_string()
1392 } else {
1393 format!("{} ...", input.get(..max_len).unwrap())
1394 }
1395}
1396
1397fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1398 let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1399 let max_len = 200;
1400 if query.len() < max_len {
1401 query
1402 } else {
1403 format!("{} ...", query.get(..max_len).unwrap())
1404 }
1405}
1406
1407pub struct NodeService {
1409 port: u16,
1410 child: Child,
1411}
1412
1413impl NodeService {
1414 fn new(port: u16, child: Child) -> Self {
1415 Self { port, child }
1416 }
1417
1418 pub async fn terminate(mut self) -> Result<()> {
1419 self.child.kill().await.context("terminating node service")
1420 }
1421
1422 pub fn port(&self) -> u16 {
1423 self.port
1424 }
1425
1426 pub fn ensure_is_running(&mut self) -> Result<()> {
1427 self.child.ensure_is_running()
1428 }
1429
1430 pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1431 let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1432 let mut data = self.query_node(query).await?;
1433 Ok(serde_json::from_value(data["processInbox"].take())?)
1434 }
1435
1436 pub async fn sync(&self, chain_id: &ChainId) -> Result<u64> {
1437 let query = format!("mutation {{ sync(chainId: \"{chain_id}\") }}");
1438 let mut data = self.query_node(query).await?;
1439 Ok(serde_json::from_value(data["sync"].take())?)
1440 }
1441
1442 pub async fn transfer(
1443 &self,
1444 chain_id: ChainId,
1445 owner: AccountOwner,
1446 recipient: Account,
1447 amount: Amount,
1448 ) -> Result<CryptoHash> {
1449 let json_owner = owner.to_value();
1450 let json_recipient = recipient.to_value();
1451 let query = format!(
1452 "mutation {{ transfer(\
1453 chainId: \"{chain_id}\", \
1454 owner: {json_owner}, \
1455 recipient: {json_recipient}, \
1456 amount: \"{amount}\") \
1457 }}"
1458 );
1459 let data = self.query_node(query).await?;
1460 serde_json::from_value(data["transfer"].clone())
1461 .context("missing transfer field in response")
1462 }
1463
1464 pub async fn balance(&self, account: &Account) -> Result<Amount> {
1465 let chain = account.chain_id;
1466 let owner = account.owner;
1467 if matches!(owner, AccountOwner::CHAIN) {
1468 let query = format!(
1469 "query {{ chain(chainId:\"{chain}\") {{
1470 executionState {{ system {{ balance }} }}
1471 }} }}"
1472 );
1473 let response = self.query_node(query).await?;
1474 let balance = &response["chain"]["executionState"]["system"]["balance"]
1475 .as_str()
1476 .unwrap();
1477 return Ok(Amount::from_str(balance)?);
1478 }
1479 let query = format!(
1480 "query {{ chain(chainId:\"{chain}\") {{
1481 executionState {{ system {{ balances {{
1482 entry(key:\"{owner}\") {{ value }}
1483 }} }} }}
1484 }} }}"
1485 );
1486 let response = self.query_node(query).await?;
1487 let balances = &response["chain"]["executionState"]["system"]["balances"];
1488 let balance = balances["entry"]["value"].as_str();
1489 match balance {
1490 None => Ok(Amount::ZERO),
1491 Some(amount) => Ok(Amount::from_str(amount)?),
1492 }
1493 }
1494
1495 pub fn make_application<A: ContractAbi>(
1496 &self,
1497 chain_id: &ChainId,
1498 application_id: &ApplicationId<A>,
1499 ) -> Result<ApplicationWrapper<A>> {
1500 let application_id = application_id.forget_abi().to_string();
1501 let link = format!(
1502 "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1503 self.port
1504 );
1505 Ok(ApplicationWrapper::from(link))
1506 }
1507
1508 pub async fn publish_data_blob(
1509 &self,
1510 chain_id: &ChainId,
1511 bytes: Vec<u8>,
1512 ) -> Result<CryptoHash> {
1513 let query = format!(
1514 "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1515 chain_id.to_value(),
1516 bytes.to_value(),
1517 );
1518 let data = self.query_node(query).await?;
1519 serde_json::from_value(data["publishDataBlob"].clone())
1520 .context("missing publishDataBlob field in response")
1521 }
1522
1523 pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1524 &self,
1525 chain_id: &ChainId,
1526 contract: PathBuf,
1527 service: PathBuf,
1528 vm_runtime: VmRuntime,
1529 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1530 let contract_code = Bytecode::load_from_file(&contract).await?;
1531 let service_code = Bytecode::load_from_file(&service).await?;
1532 let query = format!(
1533 "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1534 chain_id.to_value(),
1535 contract_code.to_value(),
1536 service_code.to_value(),
1537 vm_runtime.to_value(),
1538 );
1539 let data = self.query_node(query).await?;
1540 let module_str = data["publishModule"]
1541 .as_str()
1542 .context("module ID not found")?;
1543 let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1544 Ok(module_id.with_abi())
1545 }
1546
1547 pub async fn query_committees(&self, chain_id: &ChainId) -> Result<BTreeMap<Epoch, Committee>> {
1548 let query = format!(
1549 "query {{ chain(chainId:\"{chain_id}\") {{
1550 executionState {{ system {{ committees }} }}
1551 }} }}"
1552 );
1553 let mut response = self.query_node(query).await?;
1554 let committees = response["chain"]["executionState"]["system"]["committees"].take();
1555 Ok(serde_json::from_value(committees)?)
1556 }
1557
1558 pub async fn events_from_index(
1559 &self,
1560 chain_id: &ChainId,
1561 stream_id: &StreamId,
1562 start_index: u32,
1563 ) -> Result<Vec<IndexAndEvent>> {
1564 let query = format!(
1565 "query {{
1566 eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1567 {{ index event }}
1568 }}",
1569 stream_id.to_value()
1570 );
1571 let mut response = self.query_node(query).await?;
1572 let response = response["eventsFromIndex"].take();
1573 Ok(serde_json::from_value(response)?)
1574 }
1575
1576 pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1577 let n_try = 5;
1578 let query = query.as_ref();
1579 for i in 0..n_try {
1580 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1581 let url = format!("http://localhost:{}/", self.port);
1582 let client = reqwest_client();
1583 let result = client
1584 .post(url)
1585 .json(&json!({ "query": query }))
1586 .send()
1587 .await;
1588 if matches!(result, Err(ref error) if error.is_timeout()) {
1589 tracing::warn!(
1590 "Timeout when sending query {} to the node service",
1591 truncate_query_output(query)
1592 );
1593 continue;
1594 }
1595 let response = result.with_context(|| {
1596 format!(
1597 "query_node: failed to post query={}",
1598 truncate_query_output(query)
1599 )
1600 })?;
1601 ensure!(
1602 response.status().is_success(),
1603 "Query \"{}\" failed: {}",
1604 truncate_query_output(query),
1605 response
1606 .text()
1607 .await
1608 .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1609 );
1610 let value: Value = response.json().await.context("invalid JSON")?;
1611 if let Some(errors) = value.get("errors") {
1612 tracing::warn!(
1613 "Query \"{}\" failed: {}",
1614 truncate_query_output(query),
1615 errors
1616 );
1617 } else {
1618 return Ok(value["data"].clone());
1619 }
1620 }
1621 bail!(
1622 "Query \"{}\" failed after {} retries.",
1623 truncate_query_output(query),
1624 n_try
1625 );
1626 }
1627
1628 pub async fn create_application<
1629 Abi: ContractAbi,
1630 Parameters: Serialize,
1631 InstantiationArgument: Serialize,
1632 >(
1633 &self,
1634 chain_id: &ChainId,
1635 module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1636 parameters: &Parameters,
1637 argument: &InstantiationArgument,
1638 required_application_ids: &[ApplicationId],
1639 ) -> Result<ApplicationId<Abi>> {
1640 let module_id = module_id.forget_abi();
1641 let json_required_applications_ids = required_application_ids
1642 .iter()
1643 .map(ApplicationId::to_string)
1644 .collect::<Vec<_>>()
1645 .to_value();
1646 let new_parameters = serde_json::to_value(parameters)
1648 .context("could not create parameters JSON")?
1649 .to_value();
1650 let new_argument = serde_json::to_value(argument)
1651 .context("could not create argument JSON")?
1652 .to_value();
1653 let query = format!(
1654 "mutation {{ createApplication(\
1655 chainId: \"{chain_id}\",
1656 moduleId: \"{module_id}\", \
1657 parameters: {new_parameters}, \
1658 instantiationArgument: {new_argument}, \
1659 requiredApplicationIds: {json_required_applications_ids}) \
1660 }}"
1661 );
1662 let data = self.query_node(query).await?;
1663 let app_id_str = data["createApplication"]
1664 .as_str()
1665 .context("missing createApplication string in response")?
1666 .trim();
1667 Ok(app_id_str
1668 .parse::<ApplicationId>()
1669 .context("invalid application ID")?
1670 .with_abi())
1671 }
1672
1673 pub async fn chain_tip(&self, chain: ChainId) -> Result<Option<(CryptoHash, BlockHeight)>> {
1675 let query = format!(
1676 r#"query {{ block(chainId: "{chain}") {{
1677 hash
1678 block {{ header {{ height }} }}
1679 }} }}"#
1680 );
1681
1682 let mut response = self.query_node(&query).await?;
1683
1684 match (
1685 mem::take(&mut response["block"]["hash"]),
1686 mem::take(&mut response["block"]["block"]["header"]["height"]),
1687 ) {
1688 (Value::Null, Value::Null) => Ok(None),
1689 (Value::String(hash), Value::Number(height)) => Ok(Some((
1690 hash.parse()
1691 .context("Received an invalid hash {hash:?} for chain tip")?,
1692 BlockHeight(height.as_u64().unwrap()),
1693 ))),
1694 invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
1695 }
1696 }
1697
1698 pub async fn notifications(
1700 &self,
1701 chain_id: ChainId,
1702 ) -> Result<Pin<Box<impl Stream<Item = Result<Notification>>>>> {
1703 let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
1704 let url = format!("ws://localhost:{}/ws", self.port);
1705 let mut request = url.into_client_request()?;
1706 request.headers_mut().insert(
1707 "Sec-WebSocket-Protocol",
1708 HeaderValue::from_str("graphql-transport-ws")?,
1709 );
1710 let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1711 let init_json = json!({
1712 "type": "connection_init",
1713 "payload": {}
1714 });
1715 websocket.send(init_json.to_string().into()).await?;
1716 let text = websocket
1717 .next()
1718 .await
1719 .context("Failed to establish connection")??
1720 .into_text()?;
1721 ensure!(
1722 text == "{\"type\":\"connection_ack\"}",
1723 "Unexpected response: {text}"
1724 );
1725 let query_json = json!({
1726 "id": "1",
1727 "type": "start",
1728 "payload": {
1729 "query": query,
1730 "variables": {},
1731 "operationName": null
1732 }
1733 });
1734 websocket.send(query_json.to_string().into()).await?;
1735 Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
1736 |message| async {
1737 let text = message.into_text()?;
1738 let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1739 if let Some(errors) = value["payload"].get("errors") {
1740 bail!("Notification subscription failed: {errors:?}");
1741 }
1742 serde_json::from_value(value["payload"]["data"]["notifications"].clone())
1743 .context("Failed to deserialize notification")
1744 },
1745 )))
1746 }
1747}
1748
1749pub struct FaucetService {
1751 port: u16,
1752 child: Child,
1753 _temp_dir: tempfile::TempDir,
1754}
1755
1756impl FaucetService {
1757 fn new(port: u16, child: Child, temp_dir: tempfile::TempDir) -> Self {
1758 Self {
1759 port,
1760 child,
1761 _temp_dir: temp_dir,
1762 }
1763 }
1764
1765 pub async fn terminate(mut self) -> Result<()> {
1766 self.child
1767 .kill()
1768 .await
1769 .context("terminating faucet service")
1770 }
1771
1772 pub fn ensure_is_running(&mut self) -> Result<()> {
1773 self.child.ensure_is_running()
1774 }
1775
1776 pub fn instance(&self) -> Faucet {
1777 Faucet::new(format!("http://localhost:{}/", self.port))
1778 }
1779}
1780
1781pub struct ApplicationWrapper<A> {
1783 uri: String,
1784 _phantom: PhantomData<A>,
1785}
1786
1787impl<A> ApplicationWrapper<A> {
1788 pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
1789 let query = query.as_ref();
1790 let value = self.run_json_query(json!({ "query": query })).await?;
1791 Ok(value["data"].clone())
1792 }
1793
1794 pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
1795 const MAX_RETRIES: usize = 5;
1796
1797 for i in 0.. {
1798 let client = reqwest_client();
1799 let result = client.post(&self.uri).json(&query).send().await;
1800 let response = match result {
1801 Ok(response) => response,
1802 Err(error) if i < MAX_RETRIES => {
1803 tracing::warn!(
1804 "Failed to post query \"{}\": {error}; retrying",
1805 truncate_query_output_serialize(&query),
1806 );
1807 continue;
1808 }
1809 Err(error) => {
1810 let query = truncate_query_output_serialize(&query);
1811 return Err(error)
1812 .with_context(|| format!("run_json_query: failed to post query={query}"));
1813 }
1814 };
1815 ensure!(
1816 response.status().is_success(),
1817 "Query \"{}\" failed: {}",
1818 truncate_query_output_serialize(&query),
1819 response
1820 .text()
1821 .await
1822 .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1823 );
1824 let value: Value = response.json().await.context("invalid JSON")?;
1825 if let Some(errors) = value.get("errors") {
1826 bail!(
1827 "Query \"{}\" failed: {}",
1828 truncate_query_output_serialize(&query),
1829 errors
1830 );
1831 }
1832 return Ok(value);
1833 }
1834 unreachable!()
1835 }
1836
1837 pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
1838 let query = query.as_ref();
1839 self.run_graphql_query(&format!("query {{ {query} }}"))
1840 .await
1841 }
1842
1843 pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
1844 let query = query.as_ref().trim();
1845 let name = query
1846 .split_once(|ch: char| !ch.is_alphanumeric())
1847 .map_or(query, |(name, _)| name);
1848 let data = self.query(query).await?;
1849 serde_json::from_value(data[name].clone())
1850 .with_context(|| format!("{name} field missing in response"))
1851 }
1852
1853 pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
1854 let mutation = mutation.as_ref();
1855 self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
1856 .await
1857 }
1858
1859 pub async fn multiple_mutate(&self, mutations: &[String]) -> Result<Value> {
1860 let mut out = String::from("mutation {\n");
1861 for (index, mutation) in mutations.iter().enumerate() {
1862 out = format!("{} u{}: {}\n", out, index, mutation);
1863 }
1864 out.push_str("}\n");
1865 self.run_graphql_query(&out).await
1866 }
1867}
1868
1869impl<A> From<String> for ApplicationWrapper<A> {
1870 fn from(uri: String) -> ApplicationWrapper<A> {
1871 ApplicationWrapper {
1872 uri,
1873 _phantom: PhantomData,
1874 }
1875 }
1876}
1877
1878#[cfg(with_testing)]
1881fn notification_timeout() -> Duration {
1882 const NOTIFICATION_TIMEOUT_MS_ENV: &str = "LINERA_TEST_NOTIFICATION_TIMEOUT_MS";
1883 const NOTIFICATION_TIMEOUT_MS_DEFAULT: u64 = 10_000;
1884
1885 match env::var(NOTIFICATION_TIMEOUT_MS_ENV) {
1886 Ok(var) => Duration::from_millis(var.parse().unwrap_or_else(|error| {
1887 panic!("{NOTIFICATION_TIMEOUT_MS_ENV} is not a valid number: {error}")
1888 })),
1889 Err(env::VarError::NotPresent) => Duration::from_millis(NOTIFICATION_TIMEOUT_MS_DEFAULT),
1890 Err(env::VarError::NotUnicode(_)) => {
1891 panic!("{NOTIFICATION_TIMEOUT_MS_ENV} must be valid Unicode")
1892 }
1893 }
1894}
1895
1896#[cfg(with_testing)]
1897pub trait NotificationsExt {
1898 fn wait_for<T>(
1900 &mut self,
1901 f: impl FnMut(Notification) -> Option<T>,
1902 ) -> impl Future<Output = Result<T>>;
1903
1904 fn wait_for_events(
1907 &mut self,
1908 expected_height: impl Into<Option<BlockHeight>>,
1909 ) -> impl Future<Output = Result<BTreeSet<StreamId>>> {
1910 let expected_height = expected_height.into();
1911 self.wait_for(move |notification| {
1912 if let Reason::NewEvents {
1913 height,
1914 event_streams,
1915 ..
1916 } = notification.reason
1917 {
1918 if expected_height.is_none_or(|h| h == height) {
1919 return Some(event_streams);
1920 }
1921 }
1922 None
1923 })
1924 }
1925
1926 fn wait_for_block(
1929 &mut self,
1930 expected_height: impl Into<Option<BlockHeight>>,
1931 ) -> impl Future<Output = Result<CryptoHash>> {
1932 let expected_height = expected_height.into();
1933 self.wait_for(move |notification| {
1934 if let Reason::NewBlock { height, hash, .. } = notification.reason {
1935 if expected_height.is_none_or(|h| h == height) {
1936 return Some(hash);
1937 }
1938 }
1939 None
1940 })
1941 }
1942
1943 fn wait_for_bundle(
1946 &mut self,
1947 expected_origin: ChainId,
1948 expected_height: impl Into<Option<BlockHeight>>,
1949 ) -> impl Future<Output = Result<()>> {
1950 let expected_height = expected_height.into();
1951 self.wait_for(move |notification| {
1952 if let Reason::NewIncomingBundle { height, origin } = notification.reason {
1953 if expected_height.is_none_or(|h| h == height) && origin == expected_origin {
1954 return Some(());
1955 }
1956 }
1957 None
1958 })
1959 }
1960}
1961
1962#[cfg(with_testing)]
1963impl<S: Stream<Item = Result<Notification>>> NotificationsExt for Pin<Box<S>> {
1964 async fn wait_for<T>(&mut self, mut f: impl FnMut(Notification) -> Option<T>) -> Result<T> {
1965 let mut timeout = Box::pin(linera_base::time::timer::sleep(notification_timeout())).fuse();
1966 loop {
1967 let notification = futures::select! {
1968 () = timeout => bail!("Timeout waiting for notification"),
1969 notification = self.next().fuse() => notification.context("Stream closed")??,
1970 };
1971 if let Some(t) = f(notification) {
1972 return Ok(t);
1973 }
1974 }
1975 }
1976}