1use std::{
5 borrow::Cow,
6 collections::BTreeMap,
7 env,
8 marker::PhantomData,
9 mem,
10 path::{Path, PathBuf},
11 pin::Pin,
12 process::Stdio,
13 str::FromStr,
14 sync,
15 time::Duration,
16};
17
18use anyhow::{bail, ensure, Context, Result};
19use async_graphql::InputType;
20use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
21use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
22use heck::ToKebabCase;
23use linera_base::{
24 abi::ContractAbi,
25 command::{resolve_binary, CommandExt},
26 crypto::{CryptoHash, InMemorySigner},
27 data_types::{Amount, ApplicationPermissions, BlockHeight, Bytecode, Epoch},
28 identifiers::{
29 Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
30 },
31 vm::VmRuntime,
32};
33use linera_client::client_options::ResourceControlPolicyConfig;
34use linera_core::worker::Notification;
35use linera_execution::committee::Committee;
36use linera_faucet_client::Faucet;
37use serde::{de::DeserializeOwned, ser::Serialize};
38use serde_command_opts::to_args;
39use serde_json::{json, Value};
40use tempfile::TempDir;
41use tokio::{
42 io::{AsyncBufReadExt, BufReader},
43 process::{Child, Command},
44 sync::oneshot,
45 task::JoinHandle,
46};
47#[cfg(with_testing)]
48use {
49 futures::FutureExt as _,
50 linera_core::worker::Reason,
51 std::{collections::BTreeSet, future::Future},
52};
53
54use crate::{
55 cli::command::BenchmarkCommand,
56 cli_wrappers::{
57 local_net::{PathProvider, ProcessInbox},
58 Network,
59 },
60 util::{self, ChildExt},
61 wallet::Wallet,
62};
63
64const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
67
68fn reqwest_client() -> reqwest::Client {
69 reqwest::ClientBuilder::new()
70 .timeout(Duration::from_secs(30))
71 .build()
72 .unwrap()
73}
74
75pub struct ClientWrapper {
77 binary_path: sync::Mutex<Option<PathBuf>>,
78 testing_prng_seed: Option<u64>,
79 storage: String,
80 wallet: String,
81 keystore: String,
82 max_pending_message_bundles: usize,
83 network: Network,
84 pub path_provider: PathProvider,
85 on_drop: OnClientDrop,
86 extra_args: Vec<String>,
87}
88
89#[derive(Clone, Copy, Debug, Eq, PartialEq)]
91pub enum OnClientDrop {
92 CloseChains,
94 LeakChains,
96}
97
98impl ClientWrapper {
99 pub fn new(
100 path_provider: PathProvider,
101 network: Network,
102 testing_prng_seed: Option<u64>,
103 id: usize,
104 on_drop: OnClientDrop,
105 ) -> Self {
106 Self::new_with_extra_args(
107 path_provider,
108 network,
109 testing_prng_seed,
110 id,
111 on_drop,
112 vec!["--wait-for-outgoing-messages".to_string()],
113 None,
114 )
115 }
116
117 pub fn new_with_extra_args(
118 path_provider: PathProvider,
119 network: Network,
120 testing_prng_seed: Option<u64>,
121 id: usize,
122 on_drop: OnClientDrop,
123 extra_args: Vec<String>,
124 binary_dir: Option<PathBuf>,
125 ) -> Self {
126 let storage = format!(
127 "rocksdb:{}/client_{}.db",
128 path_provider.path().display(),
129 id
130 );
131 let wallet = format!("wallet_{}.json", id);
132 let keystore = format!("keystore_{}.json", id);
133 let binary_path = binary_dir.map(|dir| dir.join("linera"));
134 Self {
135 binary_path: sync::Mutex::new(binary_path),
136 testing_prng_seed,
137 storage,
138 wallet,
139 keystore,
140 max_pending_message_bundles: 10_000,
141 network,
142 path_provider,
143 on_drop,
144 extra_args,
145 }
146 }
147
148 pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
150 let tmp = TempDir::new()?;
151 let mut command = self.command().await?;
152 command
153 .current_dir(tmp.path())
154 .arg("project")
155 .arg("new")
156 .arg(project_name)
157 .arg("--linera-root")
158 .arg(linera_root)
159 .spawn_and_wait_for_stdout()
160 .await?;
161 Ok(tmp)
162 }
163
164 pub async fn project_publish<T: Serialize>(
166 &self,
167 path: PathBuf,
168 required_application_ids: Vec<String>,
169 publisher: impl Into<Option<ChainId>>,
170 argument: &T,
171 ) -> Result<String> {
172 let json_parameters = serde_json::to_string(&())?;
173 let json_argument = serde_json::to_string(argument)?;
174 let mut command = self.command().await?;
175 command
176 .arg("project")
177 .arg("publish-and-create")
178 .arg(path)
179 .args(publisher.into().iter().map(ChainId::to_string))
180 .args(["--json-parameters", &json_parameters])
181 .args(["--json-argument", &json_argument]);
182 if !required_application_ids.is_empty() {
183 command.arg("--required-application-ids");
184 command.args(required_application_ids);
185 }
186 let stdout = command.spawn_and_wait_for_stdout().await?;
187 Ok(stdout.trim().to_string())
188 }
189
190 pub async fn project_test(&self, path: &Path) -> Result<()> {
192 self.command()
193 .await
194 .context("failed to create project test command")?
195 .current_dir(path)
196 .arg("project")
197 .arg("test")
198 .spawn_and_wait_for_stdout()
199 .await?;
200 Ok(())
201 }
202
203 async fn command_with_envs_and_arguments(
204 &self,
205 envs: &[(&str, &str)],
206 arguments: impl IntoIterator<Item = Cow<'_, str>>,
207 ) -> Result<Command> {
208 let mut command = self.command_binary().await?;
209 command.current_dir(self.path_provider.path());
210 for (key, value) in envs {
211 command.env(key, value);
212 }
213 for argument in arguments {
214 command.arg(&*argument);
215 }
216 Ok(command)
217 }
218
219 async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
220 self.command_with_envs_and_arguments(envs, self.command_arguments())
221 .await
222 }
223
224 async fn command_with_arguments(
225 &self,
226 arguments: impl IntoIterator<Item = Cow<'_, str>>,
227 ) -> Result<Command> {
228 self.command_with_envs_and_arguments(
229 &[(
230 "RUST_LOG",
231 &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
232 )],
233 arguments,
234 )
235 .await
236 }
237
238 async fn command(&self) -> Result<Command> {
239 self.command_with_envs(&[(
240 "RUST_LOG",
241 &std::env::var("RUST_LOG").unwrap_or(String::from("linera=debug")),
242 )])
243 .await
244 }
245
246 fn required_command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
247 [
248 "--wallet".into(),
249 self.wallet.as_str().into(),
250 "--keystore".into(),
251 self.keystore.as_str().into(),
252 "--storage".into(),
253 self.storage.as_str().into(),
254 "--send-timeout-ms".into(),
255 "500000".into(),
256 "--recv-timeout-ms".into(),
257 "500000".into(),
258 ]
259 .into_iter()
260 .chain(self.extra_args.iter().map(|s| s.as_str().into()))
261 }
262
263 fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
265 self.required_command_arguments().chain([
266 "--max-pending-message-bundles".into(),
267 self.max_pending_message_bundles.to_string().into(),
268 ])
269 }
270
271 async fn command_binary(&self) -> Result<Command> {
275 match self.command_with_cached_binary_path() {
276 Some(command) => Ok(command),
277 None => {
278 let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
279 let command = Command::new(&resolved_path);
280
281 self.set_cached_binary_path(resolved_path);
282
283 Ok(command)
284 }
285 }
286 }
287
288 fn command_with_cached_binary_path(&self) -> Option<Command> {
290 let binary_path = self.binary_path.lock().unwrap();
291
292 binary_path.as_ref().map(Command::new)
293 }
294
295 fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
303 let mut binary_path = self.binary_path.lock().unwrap();
304
305 if binary_path.is_none() {
306 *binary_path = Some(new_binary_path);
307 } else {
308 assert_eq!(*binary_path, Some(new_binary_path));
309 }
310 }
311
312 pub async fn create_genesis_config(
314 &self,
315 num_other_initial_chains: u32,
316 initial_funding: Amount,
317 policy_config: ResourceControlPolicyConfig,
318 http_allow_list: Option<Vec<String>>,
319 ) -> Result<()> {
320 let mut command = self.command().await?;
321 command
322 .args([
323 "create-genesis-config",
324 &num_other_initial_chains.to_string(),
325 ])
326 .args(["--initial-funding", &initial_funding.to_string()])
327 .args(["--committee", "committee.json"])
328 .args(["--genesis", "genesis.json"])
329 .args([
330 "--policy-config",
331 &policy_config.to_string().to_kebab_case(),
332 ]);
333 if let Some(allow_list) = http_allow_list {
334 command
335 .arg("--http-request-allow-list")
336 .arg(allow_list.join(","));
337 }
338 if let Some(seed) = self.testing_prng_seed {
339 command.arg("--testing-prng-seed").arg(seed.to_string());
340 }
341 command.spawn_and_wait_for_stdout().await?;
342 Ok(())
343 }
344
345 pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
348 let mut command = self.command().await?;
349 command.args(["wallet", "init"]);
350 match faucet {
351 None => command.args(["--genesis", "genesis.json"]),
352 Some(faucet) => command.args(["--faucet", faucet.url()]),
353 };
354 if let Some(seed) = self.testing_prng_seed {
355 command.arg("--testing-prng-seed").arg(seed.to_string());
356 }
357 command.spawn_and_wait_for_stdout().await?;
358 Ok(())
359 }
360
361 pub async fn request_chain(
363 &self,
364 faucet: &Faucet,
365 set_default: bool,
366 ) -> Result<(ChainId, AccountOwner)> {
367 let mut command = self.command().await?;
368 command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
369 if set_default {
370 command.arg("--set-default");
371 }
372 let stdout = command.spawn_and_wait_for_stdout().await?;
373 let mut lines = stdout.split_whitespace();
374 let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
375 let owner = lines.next().context("missing chain owner")?.parse()?;
376 Ok((chain_id, owner))
377 }
378
379 #[expect(clippy::too_many_arguments)]
381 pub async fn publish_and_create<
382 A: ContractAbi,
383 Parameters: Serialize,
384 InstantiationArgument: Serialize,
385 >(
386 &self,
387 contract: PathBuf,
388 service: PathBuf,
389 vm_runtime: VmRuntime,
390 parameters: &Parameters,
391 argument: &InstantiationArgument,
392 required_application_ids: &[ApplicationId],
393 publisher: impl Into<Option<ChainId>>,
394 ) -> Result<ApplicationId<A>> {
395 let json_parameters = serde_json::to_string(parameters)?;
396 let json_argument = serde_json::to_string(argument)?;
397 let mut command = self.command().await?;
398 let vm_runtime = format!("{}", vm_runtime);
399 command
400 .arg("publish-and-create")
401 .args([contract, service])
402 .args(["--vm-runtime", &vm_runtime.to_lowercase()])
403 .args(publisher.into().iter().map(ChainId::to_string))
404 .args(["--json-parameters", &json_parameters])
405 .args(["--json-argument", &json_argument]);
406 if !required_application_ids.is_empty() {
407 command.arg("--required-application-ids");
408 command.args(
409 required_application_ids
410 .iter()
411 .map(ApplicationId::to_string),
412 );
413 }
414 let stdout = command.spawn_and_wait_for_stdout().await?;
415 Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
416 }
417
418 pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
420 &self,
421 contract: PathBuf,
422 service: PathBuf,
423 vm_runtime: VmRuntime,
424 publisher: impl Into<Option<ChainId>>,
425 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
426 let stdout = self
427 .command()
428 .await?
429 .arg("publish-module")
430 .args([contract, service])
431 .args(["--vm-runtime", &format!("{}", vm_runtime).to_lowercase()])
432 .args(publisher.into().iter().map(ChainId::to_string))
433 .spawn_and_wait_for_stdout()
434 .await?;
435 let module_id: ModuleId = stdout.trim().parse()?;
436 Ok(module_id.with_abi())
437 }
438
439 pub async fn create_application<
441 Abi: ContractAbi,
442 Parameters: Serialize,
443 InstantiationArgument: Serialize,
444 >(
445 &self,
446 module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
447 parameters: &Parameters,
448 argument: &InstantiationArgument,
449 required_application_ids: &[ApplicationId],
450 creator: impl Into<Option<ChainId>>,
451 ) -> Result<ApplicationId<Abi>> {
452 let json_parameters = serde_json::to_string(parameters)?;
453 let json_argument = serde_json::to_string(argument)?;
454 let mut command = self.command().await?;
455 command
456 .arg("create-application")
457 .arg(module_id.forget_abi().to_string())
458 .args(["--json-parameters", &json_parameters])
459 .args(["--json-argument", &json_argument])
460 .args(creator.into().iter().map(ChainId::to_string));
461 if !required_application_ids.is_empty() {
462 command.arg("--required-application-ids");
463 command.args(
464 required_application_ids
465 .iter()
466 .map(ApplicationId::to_string),
467 );
468 }
469 let stdout = command.spawn_and_wait_for_stdout().await?;
470 Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
471 }
472
473 pub async fn run_node_service(
475 &self,
476 port: impl Into<Option<u16>>,
477 process_inbox: ProcessInbox,
478 ) -> Result<NodeService> {
479 self.run_node_service_with_options(port, process_inbox, &[], &[], false)
480 .await
481 }
482
483 pub async fn run_node_service_with_options(
485 &self,
486 port: impl Into<Option<u16>>,
487 process_inbox: ProcessInbox,
488 operator_application_ids: &[ApplicationId],
489 operators: &[(String, PathBuf)],
490 read_only: bool,
491 ) -> Result<NodeService> {
492 self.run_node_service_with_all_options(
493 port,
494 process_inbox,
495 operator_application_ids,
496 operators,
497 read_only,
498 &[],
499 )
500 .await
501 }
502
503 pub async fn run_node_service_with_all_options(
505 &self,
506 port: impl Into<Option<u16>>,
507 process_inbox: ProcessInbox,
508 operator_application_ids: &[ApplicationId],
509 operators: &[(String, PathBuf)],
510 read_only: bool,
511 allowed_subscriptions: &[String],
512 ) -> Result<NodeService> {
513 let port = port.into().unwrap_or(8080);
514 let mut command = self.command().await?;
515 command.arg("service");
516 if let ProcessInbox::Skip = process_inbox {
517 command.arg("--listener-skip-process-inbox");
518 }
519 if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
520 command.args(var.split_whitespace());
521 }
522 for app_id in operator_application_ids {
523 command.args(["--operator-application-ids", &app_id.to_string()]);
524 }
525 for (name, path) in operators {
526 command.args(["--operators", &format!("{}={}", name, path.display())]);
527 }
528 if read_only {
529 command.arg("--read-only");
530 }
531 for query in allowed_subscriptions {
532 command.args(["--allow-subscription", query]);
533 }
534 let child = command
535 .args(["--port".to_string(), port.to_string()])
536 .spawn_into()?;
537 let client = reqwest_client();
538 for i in 0..10 {
539 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
540 let request = client
541 .get(format!("http://localhost:{}/", port))
542 .send()
543 .await;
544 if request.is_ok() {
545 tracing::info!("Node service has started");
546 return Ok(NodeService::new(port, child));
547 } else {
548 tracing::warn!("Waiting for node service to start");
549 }
550 }
551 bail!("Failed to start node service");
552 }
553
554 pub async fn run_node_service_with_controller(
556 &self,
557 port: impl Into<Option<u16>>,
558 process_inbox: ProcessInbox,
559 controller_id: &ApplicationId,
560 operators: &[(String, PathBuf)],
561 ) -> Result<NodeService> {
562 let port = port.into().unwrap_or(8080);
563 let mut command = self.command().await?;
564 command.arg("service");
565 if let ProcessInbox::Skip = process_inbox {
566 command.arg("--listener-skip-process-inbox");
567 }
568 if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
569 command.args(var.split_whitespace());
570 }
571 command.args(["--controller-id", &controller_id.to_string()]);
572 for (name, path) in operators {
573 command.args(["--operators", &format!("{}={}", name, path.display())]);
574 }
575 let child = command
576 .args(["--port".to_string(), port.to_string()])
577 .spawn_into()?;
578 let client = reqwest_client();
579 for i in 0..10 {
580 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
581 let request = client
582 .get(format!("http://localhost:{}/", port))
583 .send()
584 .await;
585 if request.is_ok() {
586 tracing::info!("Node service has started");
587 return Ok(NodeService::new(port, child));
588 } else {
589 tracing::warn!("Waiting for node service to start");
590 }
591 }
592 bail!("Failed to start node service");
593 }
594
595 pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
597 let mut command = self.command().await?;
598 command.arg("validator").arg("query").arg(address);
599 let stdout = command.spawn_and_wait_for_stdout().await?;
600
601 let hash = stdout
604 .lines()
605 .find_map(|line| {
606 line.strip_prefix("Genesis config hash: ")
607 .and_then(|hash_str| hash_str.trim().parse().ok())
608 })
609 .context("error while parsing the result of `linera validator query`")?;
610 Ok(hash)
611 }
612
613 pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
615 let mut command = self.command().await?;
616 command.arg("validator").arg("list");
617 if let Some(chain_id) = chain_id {
618 command.args(["--chain-id", &chain_id.to_string()]);
619 }
620 command.spawn_and_wait_for_stdout().await?;
621 Ok(())
622 }
623
624 pub async fn sync_validator(
626 &self,
627 chain_ids: impl IntoIterator<Item = &ChainId>,
628 validator_address: impl Into<String>,
629 ) -> Result<()> {
630 let mut command = self.command().await?;
631 command
632 .arg("validator")
633 .arg("sync")
634 .arg(validator_address.into());
635 let mut chain_ids = chain_ids.into_iter().peekable();
636 if chain_ids.peek().is_some() {
637 command
638 .arg("--chains")
639 .args(chain_ids.map(ChainId::to_string));
640 }
641 command.spawn_and_wait_for_stdout().await?;
642 Ok(())
643 }
644
645 pub async fn run_faucet(
647 &self,
648 port: impl Into<Option<u16>>,
649 chain_id: Option<ChainId>,
650 amount: Amount,
651 ) -> Result<FaucetService> {
652 let port = port.into().unwrap_or(8080);
653 let temp_dir = tempfile::tempdir()
654 .context("Failed to create temporary directory for faucet storage")?;
655 let storage_path = temp_dir.path().join("faucet_storage.sqlite");
656 let mut command = self.command().await?;
657 let command = command
658 .arg("faucet")
659 .args(["--port".to_string(), port.to_string()])
660 .args(["--amount".to_string(), amount.to_string()])
661 .args([
662 "--storage-path".to_string(),
663 storage_path.to_string_lossy().to_string(),
664 ]);
665 if let Some(chain_id) = chain_id {
666 command.arg(chain_id.to_string());
667 }
668 let child = command.spawn_into()?;
669 let client = reqwest_client();
670 for i in 0..10 {
671 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
672 let request = client
673 .get(format!("http://localhost:{}/", port))
674 .send()
675 .await;
676 if request.is_ok() {
677 tracing::info!("Faucet has started");
678 return Ok(FaucetService::new(port, child, temp_dir));
679 } else {
680 tracing::debug!("Waiting for faucet to start");
681 }
682 }
683 bail!("Failed to start faucet");
684 }
685
686 pub async fn local_balance(&self, account: Account) -> Result<Amount> {
688 let stdout = self
689 .command()
690 .await?
691 .arg("local-balance")
692 .arg(account.to_string())
693 .spawn_and_wait_for_stdout()
694 .await?;
695 let amount = stdout
696 .trim()
697 .parse()
698 .context("error while parsing the result of `linera local-balance`")?;
699 Ok(amount)
700 }
701
702 pub async fn query_balance(&self, account: Account) -> Result<Amount> {
704 let stdout = self
705 .command()
706 .await?
707 .arg("query-balance")
708 .arg(account.to_string())
709 .spawn_and_wait_for_stdout()
710 .await?;
711 let amount = stdout
712 .trim()
713 .parse()
714 .context("error while parsing the result of `linera query-balance`")?;
715 Ok(amount)
716 }
717
718 pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
720 self.command()
721 .await?
722 .arg("sync")
723 .arg(chain_id.to_string())
724 .spawn_and_wait_for_stdout()
725 .await?;
726 Ok(())
727 }
728
729 pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
731 self.command()
732 .await?
733 .arg("process-inbox")
734 .arg(chain_id.to_string())
735 .spawn_and_wait_for_stdout()
736 .await?;
737 Ok(())
738 }
739
740 pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
742 self.command()
743 .await?
744 .arg("transfer")
745 .arg(amount.to_string())
746 .args(["--from", &from.to_string()])
747 .args(["--to", &to.to_string()])
748 .spawn_and_wait_for_stdout()
749 .await?;
750 Ok(())
751 }
752
753 pub async fn transfer_with_silent_logs(
755 &self,
756 amount: Amount,
757 from: ChainId,
758 to: ChainId,
759 ) -> Result<()> {
760 self.command()
761 .await?
762 .env("RUST_LOG", "off")
763 .arg("transfer")
764 .arg(amount.to_string())
765 .args(["--from", &from.to_string()])
766 .args(["--to", &to.to_string()])
767 .spawn_and_wait_for_stdout()
768 .await?;
769 Ok(())
770 }
771
772 pub async fn transfer_with_accounts(
774 &self,
775 amount: Amount,
776 from: Account,
777 to: Account,
778 ) -> Result<()> {
779 self.command()
780 .await?
781 .arg("transfer")
782 .arg(amount.to_string())
783 .args(["--from", &from.to_string()])
784 .args(["--to", &to.to_string()])
785 .spawn_and_wait_for_stdout()
786 .await?;
787 Ok(())
788 }
789
790 fn benchmark_command_internal(command: &mut Command, args: &BenchmarkCommand) -> Result<()> {
791 let mut formatted_args = to_args(&args)?;
792 let subcommand = formatted_args.remove(0);
793 formatted_args.remove(0);
796 let options = formatted_args
797 .chunks_exact(2)
798 .flat_map(|pair| {
799 let option = format!("--{}", pair[0]);
800 match pair[1].as_str() {
801 "true" => vec![option],
802 "false" => vec![],
803 _ => vec![option, pair[1].clone()],
804 }
805 })
806 .collect::<Vec<_>>();
807 command
808 .args([
809 "--max-pending-message-bundles",
810 &args.transactions_per_block().to_string(),
811 ])
812 .arg("benchmark")
813 .arg(subcommand)
814 .args(options);
815 Ok(())
816 }
817
818 async fn benchmark_command_with_envs(
819 &self,
820 args: BenchmarkCommand,
821 envs: &[(&str, &str)],
822 ) -> Result<Command> {
823 let mut command = self
824 .command_with_envs_and_arguments(envs, self.required_command_arguments())
825 .await?;
826 Self::benchmark_command_internal(&mut command, &args)?;
827 Ok(command)
828 }
829
830 async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
831 let mut command = self
832 .command_with_arguments(self.required_command_arguments())
833 .await?;
834 Self::benchmark_command_internal(&mut command, &args)?;
835 Ok(command)
836 }
837
838 pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
840 let mut command = self.benchmark_command(args).await?;
841 command.spawn_and_wait_for_stdout().await?;
842 Ok(())
843 }
844
845 pub async fn benchmark_detached(
848 &self,
849 args: BenchmarkCommand,
850 tx: oneshot::Sender<()>,
851 ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
852 let mut child = self
853 .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
854 .await?
855 .kill_on_drop(true)
856 .stdin(Stdio::piped())
857 .stdout(Stdio::piped())
858 .stderr(Stdio::piped())
859 .spawn()?;
860
861 let pid = child.id().expect("failed to get pid");
862 let stdout = child.stdout.take().expect("stdout not open");
863 let stdout_handle = tokio::spawn(async move {
864 let mut lines = BufReader::new(stdout).lines();
865 while let Ok(Some(line)) = lines.next_line().await {
866 println!("benchmark{{pid={pid}}} {line}");
867 }
868 });
869
870 let stderr = child.stderr.take().expect("stderr not open");
871 let stderr_handle = tokio::spawn(async move {
872 let mut lines = BufReader::new(stderr).lines();
873 let mut tx = Some(tx);
874 while let Ok(Some(line)) = lines.next_line().await {
875 if line.contains("Ready to start benchmark") {
876 tx.take()
877 .expect("Should only send signal once")
878 .send(())
879 .expect("failed to send ready signal to main thread");
880 } else {
881 println!("benchmark{{pid={pid}}} {line}");
882 }
883 }
884 });
885 Ok((child, stdout_handle, stderr_handle))
886 }
887
888 async fn open_chain_internal(
889 &self,
890 from: ChainId,
891 owner: Option<AccountOwner>,
892 initial_balance: Amount,
893 super_owner: bool,
894 ) -> Result<(ChainId, AccountOwner)> {
895 let mut command = self.command().await?;
896 command
897 .arg("open-chain")
898 .args(["--from", &from.to_string()])
899 .args(["--initial-balance", &initial_balance.to_string()]);
900
901 if let Some(owner) = owner {
902 command.args(["--owner", &owner.to_string()]);
903 }
904
905 if super_owner {
906 command.arg("--super-owner");
907 }
908
909 let stdout = command.spawn_and_wait_for_stdout().await?;
910 let mut split = stdout.split('\n');
911 let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
912 let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
913 if let Some(owner) = owner {
914 assert_eq!(owner, new_owner);
915 }
916 Ok((chain_id, new_owner))
917 }
918
919 pub async fn open_chain_super_owner(
921 &self,
922 from: ChainId,
923 owner: Option<AccountOwner>,
924 initial_balance: Amount,
925 ) -> Result<(ChainId, AccountOwner)> {
926 self.open_chain_internal(from, owner, initial_balance, true)
927 .await
928 }
929
930 pub async fn open_chain(
932 &self,
933 from: ChainId,
934 owner: Option<AccountOwner>,
935 initial_balance: Amount,
936 ) -> Result<(ChainId, AccountOwner)> {
937 self.open_chain_internal(from, owner, initial_balance, false)
938 .await
939 }
940
941 pub async fn open_and_assign(
943 &self,
944 client: &ClientWrapper,
945 initial_balance: Amount,
946 ) -> Result<ChainId> {
947 let our_chain = self
948 .load_wallet()?
949 .default_chain()
950 .context("no default chain found")?;
951 let owner = client.keygen().await?;
952 let (new_chain, _) = self
953 .open_chain(our_chain, Some(owner), initial_balance)
954 .await?;
955 client.assign(owner, new_chain).await?;
956 Ok(new_chain)
957 }
958
959 pub async fn open_multi_owner_chain(
960 &self,
961 from: ChainId,
962 owners: BTreeMap<AccountOwner, u64>,
963 multi_leader_rounds: u32,
964 balance: Amount,
965 base_timeout_ms: u64,
966 ) -> Result<ChainId> {
967 let mut command = self.command().await?;
968 command
969 .arg("open-multi-owner-chain")
970 .args(["--from", &from.to_string()])
971 .arg("--owners")
972 .arg(serde_json::to_string(&owners)?)
973 .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
974 command
975 .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
976 .args(["--initial-balance", &balance.to_string()]);
977
978 let stdout = command.spawn_and_wait_for_stdout().await?;
979 let mut split = stdout.split('\n');
980 let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
981
982 Ok(chain_id)
983 }
984
985 pub async fn change_ownership(
986 &self,
987 chain_id: ChainId,
988 super_owners: Vec<AccountOwner>,
989 owners: Vec<AccountOwner>,
990 ) -> Result<()> {
991 let mut command = self.command().await?;
992 command
993 .arg("change-ownership")
994 .args(["--chain-id", &chain_id.to_string()]);
995 command
996 .arg("--super-owners")
997 .arg(serde_json::to_string(&super_owners)?);
998 command.arg("--owners").arg(serde_json::to_string(
999 &owners
1000 .into_iter()
1001 .zip(std::iter::repeat(100u64))
1002 .collect::<BTreeMap<_, _>>(),
1003 )?);
1004 command.spawn_and_wait_for_stdout().await?;
1005 Ok(())
1006 }
1007
1008 pub async fn change_application_permissions(
1009 &self,
1010 chain_id: ChainId,
1011 application_permissions: ApplicationPermissions,
1012 ) -> Result<()> {
1013 let mut command = self.command().await?;
1014 command
1015 .arg("change-application-permissions")
1016 .args(["--chain-id", &chain_id.to_string()]);
1017 command.arg("--manage-chain").arg(serde_json::to_string(
1018 &application_permissions.manage_chain,
1019 )?);
1020 command.spawn_and_wait_for_stdout().await?;
1022 Ok(())
1023 }
1024
1025 pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
1027 let mut command = self.command().await?;
1028 command
1029 .args(["wallet", "follow-chain"])
1030 .arg(chain_id.to_string());
1031 if sync {
1032 command.arg("--sync");
1033 }
1034 command.spawn_and_wait_for_stdout().await?;
1035 Ok(())
1036 }
1037
1038 pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
1040 let mut command = self.command().await?;
1041 command
1042 .args(["wallet", "forget-chain"])
1043 .arg(chain_id.to_string());
1044 command.spawn_and_wait_for_stdout().await?;
1045 Ok(())
1046 }
1047
1048 pub async fn set_default_chain(&self, chain_id: ChainId) -> Result<()> {
1050 let mut command = self.command().await?;
1051 command
1052 .args(["wallet", "set-default"])
1053 .arg(chain_id.to_string());
1054 command.spawn_and_wait_for_stdout().await?;
1055 Ok(())
1056 }
1057
1058 pub async fn retry_pending_block(
1059 &self,
1060 chain_id: Option<ChainId>,
1061 ) -> Result<Option<CryptoHash>> {
1062 let mut command = self.command().await?;
1063 command.arg("retry-pending-block");
1064 if let Some(chain_id) = chain_id {
1065 command.arg(chain_id.to_string());
1066 }
1067 let stdout = command.spawn_and_wait_for_stdout().await?;
1068 let stdout = stdout.trim();
1069 if stdout.is_empty() {
1070 Ok(None)
1071 } else {
1072 Ok(Some(CryptoHash::from_str(stdout)?))
1073 }
1074 }
1075
1076 pub async fn publish_data_blob(
1078 &self,
1079 path: &Path,
1080 chain_id: Option<ChainId>,
1081 ) -> Result<CryptoHash> {
1082 let mut command = self.command().await?;
1083 command.arg("publish-data-blob").arg(path);
1084 if let Some(chain_id) = chain_id {
1085 command.arg(chain_id.to_string());
1086 }
1087 let stdout = command.spawn_and_wait_for_stdout().await?;
1088 let stdout = stdout.trim();
1089 Ok(CryptoHash::from_str(stdout)?)
1090 }
1091
1092 pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
1094 let mut command = self.command().await?;
1095 command.arg("read-data-blob").arg(hash.to_string());
1096 if let Some(chain_id) = chain_id {
1097 command.arg(chain_id.to_string());
1098 }
1099 command.spawn_and_wait_for_stdout().await?;
1100 Ok(())
1101 }
1102
1103 pub fn load_wallet(&self) -> Result<Wallet> {
1104 Ok(Wallet::read(&self.wallet_path())?)
1105 }
1106
1107 pub fn load_keystore(&self) -> Result<InMemorySigner> {
1108 util::read_json(self.keystore_path())
1109 }
1110
1111 pub fn wallet_path(&self) -> PathBuf {
1112 self.path_provider.path().join(&self.wallet)
1113 }
1114
1115 pub fn keystore_path(&self) -> PathBuf {
1116 self.path_provider.path().join(&self.keystore)
1117 }
1118
1119 pub fn storage_path(&self) -> &str {
1120 &self.storage
1121 }
1122
1123 pub fn get_owner(&self) -> Option<AccountOwner> {
1124 let wallet = self.load_wallet().ok()?;
1125 wallet
1126 .get(wallet.default_chain()?)
1127 .expect("default chain must be in wallet")
1128 .owner
1129 }
1130
1131 pub fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
1132 self.load_wallet()
1133 .ok()
1134 .is_some_and(|wallet| wallet.get(chain).is_some())
1135 }
1136
1137 pub async fn set_validator(
1138 &self,
1139 validator_key: &(String, String),
1140 port: usize,
1141 votes: usize,
1142 ) -> Result<()> {
1143 let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1144 self.command()
1145 .await?
1146 .arg("validator")
1147 .arg("add")
1148 .args(["--public-key", &validator_key.0])
1149 .args(["--account-key", &validator_key.1])
1150 .args(["--address", &address])
1151 .args(["--votes", &votes.to_string()])
1152 .spawn_and_wait_for_stdout()
1153 .await?;
1154 Ok(())
1155 }
1156
1157 pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
1158 self.command()
1159 .await?
1160 .arg("validator")
1161 .arg("remove")
1162 .args(["--public-key", validator_key])
1163 .spawn_and_wait_for_stdout()
1164 .await?;
1165 Ok(())
1166 }
1167
1168 pub async fn change_validators(
1169 &self,
1170 add_validators: &[(String, String, usize, usize)], modify_validators: &[(String, String, usize, usize)], remove_validators: &[String],
1173 ) -> Result<()> {
1174 use std::str::FromStr;
1175
1176 use linera_base::crypto::{AccountPublicKey, ValidatorPublicKey};
1177
1178 let mut changes = std::collections::HashMap::new();
1181
1182 for (public_key_str, account_key_str, port, votes) in
1184 add_validators.iter().chain(modify_validators.iter())
1185 {
1186 let public_key = ValidatorPublicKey::from_str(public_key_str)
1187 .with_context(|| format!("Invalid validator public key: {}", public_key_str))?;
1188
1189 let account_key = AccountPublicKey::from_str(account_key_str)
1190 .with_context(|| format!("Invalid account public key: {}", account_key_str))?;
1191
1192 let address = format!("{}:127.0.0.1:{}", self.network.short(), port)
1193 .parse()
1194 .unwrap();
1195
1196 let change = crate::cli::validator::Change {
1198 account_key,
1199 address,
1200 votes: crate::cli::validator::Votes(
1201 std::num::NonZero::new(*votes as u64).context("Votes must be non-zero")?,
1202 ),
1203 };
1204
1205 changes.insert(public_key, Some(change));
1206 }
1207
1208 for validator_key_str in remove_validators {
1210 let public_key = ValidatorPublicKey::from_str(validator_key_str)
1211 .with_context(|| format!("Invalid validator public key: {}", validator_key_str))?;
1212 changes.insert(public_key, None);
1213 }
1214
1215 let temp_file = tempfile::NamedTempFile::new()
1217 .context("Failed to create temporary file for validator changes")?;
1218 serde_json::to_writer(&temp_file, &changes)
1219 .context("Failed to write validator changes to file")?;
1220 let temp_path = temp_file.path();
1221
1222 self.command()
1223 .await?
1224 .arg("validator")
1225 .arg("update")
1226 .arg(temp_path)
1227 .arg("--yes") .spawn_and_wait_for_stdout()
1229 .await?;
1230
1231 Ok(())
1232 }
1233
1234 pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
1235 self.command()
1236 .await?
1237 .arg("revoke-epochs")
1238 .arg(epoch.to_string())
1239 .spawn_and_wait_for_stdout()
1240 .await?;
1241 Ok(())
1242 }
1243
1244 pub async fn keygen(&self) -> Result<AccountOwner> {
1246 let stdout = self
1247 .command()
1248 .await?
1249 .arg("keygen")
1250 .spawn_and_wait_for_stdout()
1251 .await?;
1252 AccountOwner::from_str(stdout.as_str().trim())
1253 }
1254
1255 pub fn default_chain(&self) -> Option<ChainId> {
1257 self.load_wallet().ok()?.default_chain()
1258 }
1259
1260 pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
1262 let _stdout = self
1263 .command()
1264 .await?
1265 .arg("assign")
1266 .args(["--owner", &owner.to_string()])
1267 .args(["--chain-id", &chain_id.to_string()])
1268 .spawn_and_wait_for_stdout()
1269 .await?;
1270 Ok(())
1271 }
1272
1273 pub async fn set_preferred_owner(
1275 &self,
1276 chain_id: ChainId,
1277 owner: Option<AccountOwner>,
1278 ) -> Result<()> {
1279 let mut owner_arg = vec!["--owner".to_string()];
1280 if let Some(owner) = owner {
1281 owner_arg.push(owner.to_string());
1282 };
1283 self.command()
1284 .await?
1285 .arg("set-preferred-owner")
1286 .args(["--chain-id", &chain_id.to_string()])
1287 .args(owner_arg)
1288 .spawn_and_wait_for_stdout()
1289 .await?;
1290 Ok(())
1291 }
1292
1293 pub async fn build_application(
1294 &self,
1295 path: &Path,
1296 name: &str,
1297 is_workspace: bool,
1298 ) -> Result<(PathBuf, PathBuf)> {
1299 Command::new("cargo")
1300 .current_dir(self.path_provider.path())
1301 .arg("build")
1302 .arg("--release")
1303 .args(["--target", "wasm32-unknown-unknown"])
1304 .arg("--manifest-path")
1305 .arg(path.join("Cargo.toml"))
1306 .spawn_and_wait_for_stdout()
1307 .await?;
1308
1309 let release_dir = match is_workspace {
1310 true => path.join("../target/wasm32-unknown-unknown/release"),
1311 false => path.join("target/wasm32-unknown-unknown/release"),
1312 };
1313
1314 let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1315 let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1316
1317 let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1318 let service_size = fs_err::tokio::metadata(&service).await?.len();
1319 tracing::info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1320
1321 Ok((contract, service))
1322 }
1323}
1324
1325impl Drop for ClientWrapper {
1326 fn drop(&mut self) {
1327 use std::process::Command as SyncCommand;
1328
1329 if self.on_drop != OnClientDrop::CloseChains {
1330 return;
1331 }
1332
1333 let Ok(binary_path) = self.binary_path.lock() else {
1334 tracing::error!(
1335 "Failed to close chains because a thread panicked with a lock to `binary_path`"
1336 );
1337 return;
1338 };
1339
1340 let Some(binary_path) = binary_path.as_ref() else {
1341 tracing::warn!(
1342 "Assuming no chains need to be closed, because the command binary was never \
1343 resolved and therefore presumably never called"
1344 );
1345 return;
1346 };
1347
1348 let working_directory = self.path_provider.path();
1349 let mut wallet_show_command = SyncCommand::new(binary_path);
1350
1351 for argument in self.command_arguments() {
1352 wallet_show_command.arg(&*argument);
1353 }
1354
1355 let Ok(wallet_show_output) = wallet_show_command
1356 .current_dir(working_directory)
1357 .args(["wallet", "show", "--short", "--owned"])
1358 .output()
1359 else {
1360 tracing::warn!("Failed to execute `wallet show --short` to list chains to close");
1361 return;
1362 };
1363
1364 if !wallet_show_output.status.success() {
1365 tracing::warn!("Failed to list chains in the wallet to close them");
1366 return;
1367 }
1368
1369 let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1370 tracing::warn!(
1371 "Failed to close chains because `linera wallet show --short` \
1372 returned a non-UTF-8 output"
1373 );
1374 return;
1375 };
1376
1377 let chain_ids = chain_list_string
1378 .split('\n')
1379 .map(|line| line.trim())
1380 .filter(|line| !line.is_empty());
1381
1382 for chain_id in chain_ids {
1383 let mut close_chain_command = SyncCommand::new(binary_path);
1384
1385 for argument in self.command_arguments() {
1386 close_chain_command.arg(&*argument);
1387 }
1388
1389 close_chain_command.current_dir(working_directory);
1390
1391 match close_chain_command.args(["close-chain", chain_id]).status() {
1392 Ok(status) if status.success() => (),
1393 Ok(failure) => tracing::warn!("Failed to close chain {chain_id}: {failure}"),
1394 Err(error) => tracing::warn!("Failed to close chain {chain_id}: {error}"),
1395 }
1396 }
1397 }
1398}
1399
1400#[cfg(with_testing)]
1401impl ClientWrapper {
1402 pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1403 self.build_application(Self::example_path(name)?.as_path(), name, true)
1404 .await
1405 }
1406
1407 pub fn example_path(name: &str) -> Result<PathBuf> {
1408 Ok(env::current_dir()?.join("../examples/").join(name))
1409 }
1410}
1411
1412fn truncate_query_output(input: &str) -> String {
1413 let max_len = 1000;
1414 if input.len() < max_len {
1415 input.to_string()
1416 } else {
1417 format!("{} ...", input.get(..max_len).unwrap())
1418 }
1419}
1420
1421fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1422 let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1423 let max_len = 200;
1424 if query.len() < max_len {
1425 query
1426 } else {
1427 format!("{} ...", query.get(..max_len).unwrap())
1428 }
1429}
1430
1431pub struct NodeService {
1433 port: u16,
1434 child: Child,
1435}
1436
1437impl NodeService {
1438 fn new(port: u16, child: Child) -> Self {
1439 Self { port, child }
1440 }
1441
1442 pub async fn terminate(mut self) -> Result<()> {
1443 self.child.kill().await.context("terminating node service")
1444 }
1445
1446 pub fn port(&self) -> u16 {
1447 self.port
1448 }
1449
1450 pub fn ensure_is_running(&mut self) -> Result<()> {
1451 self.child.ensure_is_running()
1452 }
1453
1454 pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1455 let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1456 let mut data = self.query_node(query).await?;
1457 Ok(serde_json::from_value(data["processInbox"].take())?)
1458 }
1459
1460 pub async fn sync(&self, chain_id: &ChainId) -> Result<u64> {
1461 let query = format!("mutation {{ sync(chainId: \"{chain_id}\") }}");
1462 let mut data = self.query_node(query).await?;
1463 Ok(serde_json::from_value(data["sync"].take())?)
1464 }
1465
1466 pub async fn transfer(
1467 &self,
1468 chain_id: ChainId,
1469 owner: AccountOwner,
1470 recipient: Account,
1471 amount: Amount,
1472 ) -> Result<CryptoHash> {
1473 let json_owner = owner.to_value();
1474 let json_recipient = recipient.to_value();
1475 let query = format!(
1476 "mutation {{ transfer(\
1477 chainId: \"{chain_id}\", \
1478 owner: {json_owner}, \
1479 recipient: {json_recipient}, \
1480 amount: \"{amount}\") \
1481 }}"
1482 );
1483 let data = self.query_node(query).await?;
1484 serde_json::from_value(data["transfer"].clone())
1485 .context("missing transfer field in response")
1486 }
1487
1488 pub async fn balance(&self, account: &Account) -> Result<Amount> {
1489 let chain = account.chain_id;
1490 let owner = account.owner;
1491 if matches!(owner, AccountOwner::CHAIN) {
1492 let query = format!(
1493 "query {{ chain(chainId:\"{chain}\") {{
1494 executionState {{ system {{ balance }} }}
1495 }} }}"
1496 );
1497 let response = self.query_node(query).await?;
1498 let balance = &response["chain"]["executionState"]["system"]["balance"]
1499 .as_str()
1500 .unwrap();
1501 return Ok(Amount::from_str(balance)?);
1502 }
1503 let query = format!(
1504 "query {{ chain(chainId:\"{chain}\") {{
1505 executionState {{ system {{ balances {{
1506 entry(key:\"{owner}\") {{ value }}
1507 }} }} }}
1508 }} }}"
1509 );
1510 let response = self.query_node(query).await?;
1511 let balances = &response["chain"]["executionState"]["system"]["balances"];
1512 let balance = balances["entry"]["value"].as_str();
1513 match balance {
1514 None => Ok(Amount::ZERO),
1515 Some(amount) => Ok(Amount::from_str(amount)?),
1516 }
1517 }
1518
1519 pub fn make_application<A: ContractAbi>(
1520 &self,
1521 chain_id: &ChainId,
1522 application_id: &ApplicationId<A>,
1523 ) -> Result<ApplicationWrapper<A>> {
1524 let application_id = application_id.forget_abi().to_string();
1525 let link = format!(
1526 "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1527 self.port
1528 );
1529 Ok(ApplicationWrapper::from(link))
1530 }
1531
1532 pub async fn publish_data_blob(
1533 &self,
1534 chain_id: &ChainId,
1535 bytes: Vec<u8>,
1536 ) -> Result<CryptoHash> {
1537 let query = format!(
1538 "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1539 chain_id.to_value(),
1540 bytes.to_value(),
1541 );
1542 let data = self.query_node(query).await?;
1543 serde_json::from_value(data["publishDataBlob"].clone())
1544 .context("missing publishDataBlob field in response")
1545 }
1546
1547 pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1548 &self,
1549 chain_id: &ChainId,
1550 contract: PathBuf,
1551 service: PathBuf,
1552 vm_runtime: VmRuntime,
1553 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1554 let contract_code = Bytecode::load_from_file(&contract).await?;
1555 let service_code = Bytecode::load_from_file(&service).await?;
1556 let query = format!(
1557 "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1558 chain_id.to_value(),
1559 contract_code.to_value(),
1560 service_code.to_value(),
1561 vm_runtime.to_value(),
1562 );
1563 let data = self.query_node(query).await?;
1564 let module_str = data["publishModule"]
1565 .as_str()
1566 .context("module ID not found")?;
1567 let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1568 Ok(module_id.with_abi())
1569 }
1570
1571 pub async fn query_committees(&self, chain_id: &ChainId) -> Result<BTreeMap<Epoch, Committee>> {
1572 let query = format!(
1573 "query {{ chain(chainId:\"{chain_id}\") {{
1574 executionState {{ system {{ committees }} }}
1575 }} }}"
1576 );
1577 let mut response = self.query_node(query).await?;
1578 let committees = response["chain"]["executionState"]["system"]["committees"].take();
1579 Ok(serde_json::from_value(committees)?)
1580 }
1581
1582 pub async fn events_from_index(
1583 &self,
1584 chain_id: &ChainId,
1585 stream_id: &StreamId,
1586 start_index: u32,
1587 ) -> Result<Vec<IndexAndEvent>> {
1588 let query = format!(
1589 "query {{
1590 eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1591 {{ index event }}
1592 }}",
1593 stream_id.to_value()
1594 );
1595 let mut response = self.query_node(query).await?;
1596 let response = response["eventsFromIndex"].take();
1597 Ok(serde_json::from_value(response)?)
1598 }
1599
1600 pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1601 let n_try = 5;
1602 let query = query.as_ref();
1603 for i in 0..n_try {
1604 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1605 let url = format!("http://localhost:{}/", self.port);
1606 let client = reqwest_client();
1607 let result = client
1608 .post(url)
1609 .json(&json!({ "query": query }))
1610 .send()
1611 .await;
1612 if matches!(result, Err(ref error) if error.is_timeout()) {
1613 tracing::warn!(
1614 "Timeout when sending query {} to the node service",
1615 truncate_query_output(query)
1616 );
1617 continue;
1618 }
1619 let response = result.with_context(|| {
1620 format!(
1621 "query_node: failed to post query={}",
1622 truncate_query_output(query)
1623 )
1624 })?;
1625 ensure!(
1626 response.status().is_success(),
1627 "Query \"{}\" failed: {}",
1628 truncate_query_output(query),
1629 response
1630 .text()
1631 .await
1632 .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1633 );
1634 let value: Value = response.json().await.context("invalid JSON")?;
1635 if let Some(errors) = value.get("errors") {
1636 tracing::warn!(
1637 "Query \"{}\" failed: {}",
1638 truncate_query_output(query),
1639 errors
1640 );
1641 } else {
1642 return Ok(value["data"].clone());
1643 }
1644 }
1645 bail!(
1646 "Query \"{}\" failed after {} retries.",
1647 truncate_query_output(query),
1648 n_try
1649 );
1650 }
1651
1652 pub async fn create_application<
1653 Abi: ContractAbi,
1654 Parameters: Serialize,
1655 InstantiationArgument: Serialize,
1656 >(
1657 &self,
1658 chain_id: &ChainId,
1659 module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1660 parameters: &Parameters,
1661 argument: &InstantiationArgument,
1662 required_application_ids: &[ApplicationId],
1663 ) -> Result<ApplicationId<Abi>> {
1664 let module_id = module_id.forget_abi();
1665 let json_required_applications_ids = required_application_ids
1666 .iter()
1667 .map(ApplicationId::to_string)
1668 .collect::<Vec<_>>()
1669 .to_value();
1670 let new_parameters = serde_json::to_value(parameters)
1672 .context("could not create parameters JSON")?
1673 .to_value();
1674 let new_argument = serde_json::to_value(argument)
1675 .context("could not create argument JSON")?
1676 .to_value();
1677 let query = format!(
1678 "mutation {{ createApplication(\
1679 chainId: \"{chain_id}\",
1680 moduleId: \"{module_id}\", \
1681 parameters: {new_parameters}, \
1682 instantiationArgument: {new_argument}, \
1683 requiredApplicationIds: {json_required_applications_ids}) \
1684 }}"
1685 );
1686 let data = self.query_node(query).await?;
1687 let app_id_str = data["createApplication"]
1688 .as_str()
1689 .context("missing createApplication string in response")?
1690 .trim();
1691 Ok(app_id_str
1692 .parse::<ApplicationId>()
1693 .context("invalid application ID")?
1694 .with_abi())
1695 }
1696
1697 pub async fn chain_tip(&self, chain: ChainId) -> Result<Option<(CryptoHash, BlockHeight)>> {
1699 let query = format!(
1700 r#"query {{ block(chainId: "{chain}") {{
1701 hash
1702 block {{ header {{ height }} }}
1703 }} }}"#
1704 );
1705
1706 let mut response = self.query_node(&query).await?;
1707
1708 match (
1709 mem::take(&mut response["block"]["hash"]),
1710 mem::take(&mut response["block"]["block"]["header"]["height"]),
1711 ) {
1712 (Value::Null, Value::Null) => Ok(None),
1713 (Value::String(hash), Value::Number(height)) => Ok(Some((
1714 hash.parse()
1715 .context("Received an invalid hash {hash:?} for chain tip")?,
1716 BlockHeight(height.as_u64().unwrap()),
1717 ))),
1718 invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
1719 }
1720 }
1721
1722 pub async fn notifications(
1724 &self,
1725 chain_id: ChainId,
1726 ) -> Result<Pin<Box<impl Stream<Item = Result<Notification>>>>> {
1727 let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
1728 let url = format!("ws://localhost:{}/ws", self.port);
1729 let mut request = url.into_client_request()?;
1730 request.headers_mut().insert(
1731 "Sec-WebSocket-Protocol",
1732 HeaderValue::from_str("graphql-transport-ws")?,
1733 );
1734 let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1735 let init_json = json!({
1736 "type": "connection_init",
1737 "payload": {}
1738 });
1739 websocket.send(init_json.to_string().into()).await?;
1740 let text = websocket
1741 .next()
1742 .await
1743 .context("Failed to establish connection")??
1744 .into_text()?;
1745 ensure!(
1746 text == "{\"type\":\"connection_ack\"}",
1747 "Unexpected response: {text}"
1748 );
1749 let query_json = json!({
1750 "id": "1",
1751 "type": "start",
1752 "payload": {
1753 "query": query,
1754 "variables": {},
1755 "operationName": null
1756 }
1757 });
1758 websocket.send(query_json.to_string().into()).await?;
1759 Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
1760 |message| async {
1761 let text = message.into_text()?;
1762 let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1763 if let Some(errors) = value["payload"].get("errors") {
1764 bail!("Notification subscription failed: {errors:?}");
1765 }
1766 serde_json::from_value(value["payload"]["data"]["notifications"].clone())
1767 .context("Failed to deserialize notification")
1768 },
1769 )))
1770 }
1771
1772 pub async fn query_result(
1774 &self,
1775 name: &str,
1776 chain_id: ChainId,
1777 application_id: &ApplicationId,
1778 ) -> Result<Pin<Box<impl Stream<Item = Result<Value>>>>> {
1779 let query = format!(
1780 r#"subscription {{ queryResult(name: "{name}", chainId: "{chain_id}", applicationId: "{application_id}") }}"#,
1781 );
1782 let url = format!("ws://localhost:{}/ws", self.port);
1783 let mut request = url.into_client_request()?;
1784 request.headers_mut().insert(
1785 "Sec-WebSocket-Protocol",
1786 HeaderValue::from_str("graphql-transport-ws")?,
1787 );
1788 let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1789 let init_json = json!({
1790 "type": "connection_init",
1791 "payload": {}
1792 });
1793 websocket.send(init_json.to_string().into()).await?;
1794 let text = websocket
1795 .next()
1796 .await
1797 .context("Failed to establish connection")??
1798 .into_text()?;
1799 ensure!(
1800 text == "{\"type\":\"connection_ack\"}",
1801 "Unexpected response: {text}"
1802 );
1803 let query_json = json!({
1804 "id": "1",
1805 "type": "start",
1806 "payload": {
1807 "query": query,
1808 "variables": {},
1809 "operationName": null
1810 }
1811 });
1812 websocket.send(query_json.to_string().into()).await?;
1813 Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
1814 |message| async {
1815 let text = message.into_text()?;
1816 let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1817 if let Some(errors) = value["payload"].get("errors") {
1818 bail!("Query result subscription failed: {errors:?}");
1819 }
1820 Ok(value["payload"]["data"]["queryResult"].clone())
1821 },
1822 )))
1823 }
1824}
1825
1826pub struct FaucetService {
1828 port: u16,
1829 child: Child,
1830 _temp_dir: tempfile::TempDir,
1831}
1832
1833impl FaucetService {
1834 fn new(port: u16, child: Child, temp_dir: tempfile::TempDir) -> Self {
1835 Self {
1836 port,
1837 child,
1838 _temp_dir: temp_dir,
1839 }
1840 }
1841
1842 pub async fn terminate(mut self) -> Result<()> {
1843 self.child
1844 .kill()
1845 .await
1846 .context("terminating faucet service")
1847 }
1848
1849 pub fn ensure_is_running(&mut self) -> Result<()> {
1850 self.child.ensure_is_running()
1851 }
1852
1853 pub fn instance(&self) -> Faucet {
1854 Faucet::new(format!("http://localhost:{}/", self.port))
1855 }
1856}
1857
1858pub struct ApplicationWrapper<A> {
1860 uri: String,
1861 _phantom: PhantomData<A>,
1862}
1863
1864impl<A> ApplicationWrapper<A> {
1865 pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
1866 let query = query.as_ref();
1867 let value = self.run_json_query(json!({ "query": query })).await?;
1868 Ok(value["data"].clone())
1869 }
1870
1871 pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
1872 const MAX_RETRIES: usize = 5;
1873
1874 for i in 0.. {
1875 let client = reqwest_client();
1876 let result = client.post(&self.uri).json(&query).send().await;
1877 let response = match result {
1878 Ok(response) => response,
1879 Err(error) if i < MAX_RETRIES => {
1880 tracing::warn!(
1881 "Failed to post query \"{}\": {error}; retrying",
1882 truncate_query_output_serialize(&query),
1883 );
1884 continue;
1885 }
1886 Err(error) => {
1887 let query = truncate_query_output_serialize(&query);
1888 return Err(error)
1889 .with_context(|| format!("run_json_query: failed to post query={query}"));
1890 }
1891 };
1892 ensure!(
1893 response.status().is_success(),
1894 "Query \"{}\" failed: {}",
1895 truncate_query_output_serialize(&query),
1896 response
1897 .text()
1898 .await
1899 .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1900 );
1901 let value: Value = response.json().await.context("invalid JSON")?;
1902 if let Some(errors) = value.get("errors") {
1903 bail!(
1904 "Query \"{}\" failed: {}",
1905 truncate_query_output_serialize(&query),
1906 errors
1907 );
1908 }
1909 return Ok(value);
1910 }
1911 unreachable!()
1912 }
1913
1914 pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
1915 let query = query.as_ref();
1916 self.run_graphql_query(&format!("query {{ {query} }}"))
1917 .await
1918 }
1919
1920 pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
1921 let query = query.as_ref().trim();
1922 let name = query
1923 .split_once(|ch: char| !ch.is_alphanumeric())
1924 .map_or(query, |(name, _)| name);
1925 let data = self.query(query).await?;
1926 serde_json::from_value(data[name].clone())
1927 .with_context(|| format!("{name} field missing in response"))
1928 }
1929
1930 pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
1931 let mutation = mutation.as_ref();
1932 self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
1933 .await
1934 }
1935
1936 pub async fn multiple_mutate(&self, mutations: &[String]) -> Result<Value> {
1937 let mut out = String::from("mutation {\n");
1938 for (index, mutation) in mutations.iter().enumerate() {
1939 out = format!("{} u{}: {}\n", out, index, mutation);
1940 }
1941 out.push_str("}\n");
1942 self.run_graphql_query(&out).await
1943 }
1944}
1945
1946impl<A> From<String> for ApplicationWrapper<A> {
1947 fn from(uri: String) -> ApplicationWrapper<A> {
1948 ApplicationWrapper {
1949 uri,
1950 _phantom: PhantomData,
1951 }
1952 }
1953}
1954
1955#[cfg(with_testing)]
1958fn notification_timeout() -> Duration {
1959 const NOTIFICATION_TIMEOUT_MS_ENV: &str = "LINERA_TEST_NOTIFICATION_TIMEOUT_MS";
1960 const NOTIFICATION_TIMEOUT_MS_DEFAULT: u64 = 10_000;
1961
1962 match env::var(NOTIFICATION_TIMEOUT_MS_ENV) {
1963 Ok(var) => Duration::from_millis(var.parse().unwrap_or_else(|error| {
1964 panic!("{NOTIFICATION_TIMEOUT_MS_ENV} is not a valid number: {error}")
1965 })),
1966 Err(env::VarError::NotPresent) => Duration::from_millis(NOTIFICATION_TIMEOUT_MS_DEFAULT),
1967 Err(env::VarError::NotUnicode(_)) => {
1968 panic!("{NOTIFICATION_TIMEOUT_MS_ENV} must be valid Unicode")
1969 }
1970 }
1971}
1972
1973#[cfg(with_testing)]
1974pub trait NotificationsExt {
1975 fn wait_for<T>(
1977 &mut self,
1978 f: impl FnMut(Notification) -> Option<T>,
1979 ) -> impl Future<Output = Result<T>>;
1980
1981 fn wait_for_events(
1984 &mut self,
1985 expected_height: impl Into<Option<BlockHeight>>,
1986 ) -> impl Future<Output = Result<BTreeSet<StreamId>>> {
1987 let expected_height = expected_height.into();
1988 self.wait_for(move |notification| {
1989 if let Reason::NewEvents {
1990 height,
1991 event_streams,
1992 ..
1993 } = notification.reason
1994 {
1995 if expected_height.is_none_or(|h| h == height) {
1996 return Some(event_streams);
1997 }
1998 }
1999 None
2000 })
2001 }
2002
2003 fn wait_for_block(
2006 &mut self,
2007 expected_height: impl Into<Option<BlockHeight>>,
2008 ) -> impl Future<Output = Result<CryptoHash>> {
2009 let expected_height = expected_height.into();
2010 self.wait_for(move |notification| {
2011 if let Reason::NewBlock { height, hash, .. } = notification.reason {
2012 if expected_height.is_none_or(|h| h == height) {
2013 return Some(hash);
2014 }
2015 }
2016 None
2017 })
2018 }
2019
2020 fn wait_for_bundle(
2023 &mut self,
2024 expected_origin: ChainId,
2025 expected_height: impl Into<Option<BlockHeight>>,
2026 ) -> impl Future<Output = Result<()>> {
2027 let expected_height = expected_height.into();
2028 self.wait_for(move |notification| {
2029 if let Reason::NewIncomingBundle { height, origin } = notification.reason {
2030 if expected_height.is_none_or(|h| h == height) && origin == expected_origin {
2031 return Some(());
2032 }
2033 }
2034 None
2035 })
2036 }
2037}
2038
2039#[cfg(with_testing)]
2040impl<S: Stream<Item = Result<Notification>>> NotificationsExt for Pin<Box<S>> {
2041 async fn wait_for<T>(&mut self, mut f: impl FnMut(Notification) -> Option<T>) -> Result<T> {
2042 let mut timeout = Box::pin(linera_base::time::timer::sleep(notification_timeout())).fuse();
2043 loop {
2044 let notification = futures::select! {
2045 () = timeout => bail!("Timeout waiting for notification"),
2046 notification = self.next().fuse() => notification.context("Stream closed")??,
2047 };
2048 if let Some(t) = f(notification) {
2049 return Ok(t);
2050 }
2051 }
2052 }
2053}