1use std::{
5 borrow::Cow,
6 collections::BTreeMap,
7 env,
8 marker::PhantomData,
9 mem,
10 path::{Path, PathBuf},
11 pin::Pin,
12 process::Stdio,
13 str::FromStr,
14 sync,
15 time::Duration,
16};
17
18use anyhow::{bail, ensure, Context, Result};
19use async_graphql::InputType;
20use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
21use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
22use heck::ToKebabCase;
23use linera_base::{
24 abi::ContractAbi,
25 command::{resolve_binary, CommandExt},
26 crypto::{CryptoHash, InMemorySigner},
27 data_types::{Amount, ApplicationPermissions, BlockHeight, Bytecode, Epoch},
28 identifiers::{
29 Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
30 },
31 vm::VmRuntime,
32};
33use linera_client::client_options::ResourceControlPolicyConfig;
34use linera_core::worker::Notification;
35use linera_execution::committee::Committee;
36use linera_faucet_client::Faucet;
37use serde::{de::DeserializeOwned, ser::Serialize};
38use serde_command_opts::to_args;
39use serde_json::{json, Value};
40use tempfile::TempDir;
41use tokio::{
42 io::{AsyncBufReadExt, BufReader},
43 process::{Child, Command},
44 sync::oneshot,
45 task::JoinHandle,
46};
47#[cfg(with_testing)]
48use {
49 futures::FutureExt as _,
50 linera_core::worker::Reason,
51 std::{collections::BTreeSet, future::Future},
52};
53
54use crate::{
55 cli::command::BenchmarkCommand,
56 cli_wrappers::{
57 local_net::{PathProvider, ProcessInbox},
58 Network,
59 },
60 util::{self, ChildExt},
61 wallet::Wallet,
62};
63
64const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
67
68fn reqwest_client() -> reqwest::Client {
69 reqwest::ClientBuilder::new()
70 .timeout(Duration::from_secs(60))
71 .build()
72 .unwrap()
73}
74
75pub struct ClientWrapper {
77 binary_path: sync::Mutex<Option<PathBuf>>,
78 testing_prng_seed: Option<u64>,
79 storage: String,
80 wallet: String,
81 keystore: String,
82 max_pending_message_bundles: usize,
83 network: Network,
84 pub path_provider: PathProvider,
85 on_drop: OnClientDrop,
86 extra_args: Vec<String>,
87}
88
89#[derive(Clone, Copy, Debug, Eq, PartialEq)]
91pub enum OnClientDrop {
92 CloseChains,
94 LeakChains,
96}
97
98impl ClientWrapper {
99 pub fn new(
100 path_provider: PathProvider,
101 network: Network,
102 testing_prng_seed: Option<u64>,
103 id: usize,
104 on_drop: OnClientDrop,
105 ) -> Self {
106 Self::new_with_extra_args(
107 path_provider,
108 network,
109 testing_prng_seed,
110 id,
111 on_drop,
112 vec!["--wait-for-outgoing-messages".to_string()],
113 None,
114 )
115 }
116
117 pub fn new_with_extra_args(
118 path_provider: PathProvider,
119 network: Network,
120 testing_prng_seed: Option<u64>,
121 id: usize,
122 on_drop: OnClientDrop,
123 extra_args: Vec<String>,
124 binary_dir: Option<PathBuf>,
125 ) -> Self {
126 let storage = format!(
127 "rocksdb:{}/client_{}.db",
128 path_provider.path().display(),
129 id
130 );
131 let wallet = format!("wallet_{}.json", id);
132 let keystore = format!("keystore_{}.json", id);
133 let binary_path = binary_dir.map(|dir| dir.join("linera"));
134 Self {
135 binary_path: sync::Mutex::new(binary_path),
136 testing_prng_seed,
137 storage,
138 wallet,
139 keystore,
140 max_pending_message_bundles: 10_000,
141 network,
142 path_provider,
143 on_drop,
144 extra_args,
145 }
146 }
147
148 pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
150 let tmp = TempDir::new()?;
151 let mut command = self.command().await?;
152 command
153 .current_dir(tmp.path())
154 .arg("project")
155 .arg("new")
156 .arg(project_name)
157 .arg("--linera-root")
158 .arg(linera_root)
159 .spawn_and_wait_for_stdout()
160 .await?;
161 Ok(tmp)
162 }
163
164 pub async fn project_publish<T: Serialize>(
166 &self,
167 path: PathBuf,
168 required_application_ids: Vec<String>,
169 publisher: impl Into<Option<ChainId>>,
170 argument: &T,
171 ) -> Result<String> {
172 let json_parameters = serde_json::to_string(&())?;
173 let json_argument = serde_json::to_string(argument)?;
174 let mut command = self.command().await?;
175 command
176 .arg("project")
177 .arg("publish-and-create")
178 .arg(path)
179 .args(publisher.into().iter().map(ChainId::to_string))
180 .args(["--json-parameters", &json_parameters])
181 .args(["--json-argument", &json_argument]);
182 if !required_application_ids.is_empty() {
183 command.arg("--required-application-ids");
184 command.args(required_application_ids);
185 }
186 let stdout = command.spawn_and_wait_for_stdout().await?;
187 Ok(stdout.trim().to_string())
188 }
189
190 pub async fn project_test(&self, path: &Path) -> Result<()> {
192 self.command()
193 .await
194 .context("failed to create project test command")?
195 .current_dir(path)
196 .arg("project")
197 .arg("test")
198 .spawn_and_wait_for_stdout()
199 .await?;
200 Ok(())
201 }
202
203 async fn command_with_envs_and_arguments(
204 &self,
205 envs: &[(&str, &str)],
206 arguments: impl IntoIterator<Item = Cow<'_, str>>,
207 ) -> Result<Command> {
208 let mut command = self.command_binary().await?;
209 command.current_dir(self.path_provider.path());
210 for (key, value) in envs {
211 command.env(key, value);
212 }
213 for argument in arguments {
214 command.arg(&*argument);
215 }
216 Ok(command)
217 }
218
219 async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
220 self.command_with_envs_and_arguments(envs, self.command_arguments())
221 .await
222 }
223
224 async fn command_with_arguments(
225 &self,
226 arguments: impl IntoIterator<Item = Cow<'_, str>>,
227 ) -> Result<Command> {
228 self.command_with_envs_and_arguments(
229 &[(
230 "RUST_LOG",
231 &std::env::var("RUST_LOG").unwrap_or_else(|_| String::from("linera=debug")),
232 )],
233 arguments,
234 )
235 .await
236 }
237
238 async fn command(&self) -> Result<Command> {
239 self.command_with_envs(&[(
240 "RUST_LOG",
241 &std::env::var("RUST_LOG").unwrap_or_else(|_| String::from("linera=debug")),
242 )])
243 .await
244 }
245
246 fn required_command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
247 [
248 "--wallet".into(),
249 self.wallet.as_str().into(),
250 "--keystore".into(),
251 self.keystore.as_str().into(),
252 "--storage".into(),
253 self.storage.as_str().into(),
254 "--send-timeout-ms".into(),
255 "500000".into(),
256 "--recv-timeout-ms".into(),
257 "500000".into(),
258 ]
259 .into_iter()
260 .chain(self.extra_args.iter().map(|s| s.as_str().into()))
261 }
262
263 fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
265 self.required_command_arguments().chain([
266 "--max-pending-message-bundles".into(),
267 self.max_pending_message_bundles.to_string().into(),
268 ])
269 }
270
271 async fn command_binary(&self) -> Result<Command> {
275 match self.command_with_cached_binary_path() {
276 Some(command) => Ok(command),
277 None => {
278 let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
279 let command = Command::new(&resolved_path);
280
281 self.set_cached_binary_path(resolved_path);
282
283 Ok(command)
284 }
285 }
286 }
287
288 fn command_with_cached_binary_path(&self) -> Option<Command> {
290 let binary_path = self.binary_path.lock().unwrap();
291
292 binary_path.as_ref().map(Command::new)
293 }
294
295 fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
303 let mut binary_path = self.binary_path.lock().unwrap();
304
305 if binary_path.is_none() {
306 *binary_path = Some(new_binary_path);
307 } else {
308 assert_eq!(*binary_path, Some(new_binary_path));
309 }
310 }
311
312 pub async fn create_genesis_config(
314 &self,
315 num_other_initial_chains: u32,
316 initial_funding: Amount,
317 policy_config: ResourceControlPolicyConfig,
318 http_allow_list: Option<Vec<String>>,
319 ) -> Result<()> {
320 let mut command = self.command().await?;
321 command
322 .args([
323 "create-genesis-config",
324 &num_other_initial_chains.to_string(),
325 ])
326 .args(["--initial-funding", &initial_funding.to_string()])
327 .args(["--committee", "committee.json"])
328 .args(["--genesis", "genesis.json"])
329 .args([
330 "--policy-config",
331 &policy_config.to_string().to_kebab_case(),
332 ]);
333 if let Some(allow_list) = http_allow_list {
334 command
335 .arg("--http-request-allow-list")
336 .arg(allow_list.join(","));
337 }
338 if let Some(seed) = self.testing_prng_seed {
339 command.arg("--testing-prng-seed").arg(seed.to_string());
340 }
341 command.spawn_and_wait_for_stdout().await?;
342 Ok(())
343 }
344
345 pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
348 let mut command = self.command().await?;
349 command.args(["wallet", "init"]);
350 match faucet {
351 None => command.args(["--genesis", "genesis.json"]),
352 Some(faucet) => command.args(["--faucet", faucet.url()]),
353 };
354 if let Some(seed) = self.testing_prng_seed {
355 command.arg("--testing-prng-seed").arg(seed.to_string());
356 }
357 command.spawn_and_wait_for_stdout().await?;
358 Ok(())
359 }
360
361 pub async fn request_chain(
363 &self,
364 faucet: &Faucet,
365 set_default: bool,
366 ) -> Result<(ChainId, AccountOwner)> {
367 let mut command = self.command().await?;
368 command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
369 if set_default {
370 command.arg("--set-default");
371 }
372 let stdout = command.spawn_and_wait_for_stdout().await?;
373 let mut lines = stdout.split_whitespace();
374 let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
375 let owner = lines.next().context("missing chain owner")?.parse()?;
376 Ok((chain_id, owner))
377 }
378
379 #[expect(clippy::too_many_arguments)]
381 pub async fn publish_and_create<
382 A: ContractAbi,
383 Parameters: Serialize,
384 InstantiationArgument: Serialize,
385 >(
386 &self,
387 contract: PathBuf,
388 service: PathBuf,
389 vm_runtime: VmRuntime,
390 parameters: &Parameters,
391 argument: &InstantiationArgument,
392 required_application_ids: &[ApplicationId],
393 publisher: impl Into<Option<ChainId>>,
394 ) -> Result<ApplicationId<A>> {
395 let json_parameters = serde_json::to_string(parameters)?;
396 let json_argument = serde_json::to_string(argument)?;
397 let mut command = self.command().await?;
398 let vm_runtime = format!("{}", vm_runtime);
399 command
400 .arg("publish-and-create")
401 .args([contract, service])
402 .args(["--vm-runtime", &vm_runtime.to_lowercase()])
403 .args(publisher.into().iter().map(ChainId::to_string))
404 .args(["--json-parameters", &json_parameters])
405 .args(["--json-argument", &json_argument]);
406 if !required_application_ids.is_empty() {
407 command.arg("--required-application-ids");
408 command.args(
409 required_application_ids
410 .iter()
411 .map(ApplicationId::to_string),
412 );
413 }
414 let stdout = command.spawn_and_wait_for_stdout().await?;
415 Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
416 }
417
418 pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
420 &self,
421 contract: PathBuf,
422 service: PathBuf,
423 vm_runtime: VmRuntime,
424 publisher: impl Into<Option<ChainId>>,
425 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
426 let stdout = self
427 .command()
428 .await?
429 .arg("publish-module")
430 .args([contract, service])
431 .args(["--vm-runtime", &format!("{}", vm_runtime).to_lowercase()])
432 .args(publisher.into().iter().map(ChainId::to_string))
433 .spawn_and_wait_for_stdout()
434 .await?;
435 let module_id: ModuleId = stdout.trim().parse()?;
436 Ok(module_id.with_abi())
437 }
438
439 pub async fn create_application<
441 Abi: ContractAbi,
442 Parameters: Serialize,
443 InstantiationArgument: Serialize,
444 >(
445 &self,
446 module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
447 parameters: &Parameters,
448 argument: &InstantiationArgument,
449 required_application_ids: &[ApplicationId],
450 creator: impl Into<Option<ChainId>>,
451 ) -> Result<ApplicationId<Abi>> {
452 let json_parameters = serde_json::to_string(parameters)?;
453 let json_argument = serde_json::to_string(argument)?;
454 let mut command = self.command().await?;
455 command
456 .arg("create-application")
457 .arg(module_id.forget_abi().to_string())
458 .args(["--json-parameters", &json_parameters])
459 .args(["--json-argument", &json_argument])
460 .args(creator.into().iter().map(ChainId::to_string));
461 if !required_application_ids.is_empty() {
462 command.arg("--required-application-ids");
463 command.args(
464 required_application_ids
465 .iter()
466 .map(ApplicationId::to_string),
467 );
468 }
469 let stdout = command.spawn_and_wait_for_stdout().await?;
470 Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
471 }
472
473 pub async fn run_node_service(
475 &self,
476 port: impl Into<Option<u16>>,
477 process_inbox: ProcessInbox,
478 ) -> Result<NodeService> {
479 self.run_node_service_with_options(port, process_inbox, &[], &[], false)
480 .await
481 }
482
483 pub async fn run_node_service_with_options(
485 &self,
486 port: impl Into<Option<u16>>,
487 process_inbox: ProcessInbox,
488 operator_application_ids: &[ApplicationId],
489 operators: &[(String, PathBuf)],
490 read_only: bool,
491 ) -> Result<NodeService> {
492 self.run_node_service_with_all_options(
493 port,
494 process_inbox,
495 operator_application_ids,
496 operators,
497 read_only,
498 &[],
499 &[],
500 )
501 .await
502 }
503
504 #[allow(clippy::too_many_arguments)]
506 pub async fn run_node_service_with_all_options(
507 &self,
508 port: impl Into<Option<u16>>,
509 process_inbox: ProcessInbox,
510 operator_application_ids: &[ApplicationId],
511 operators: &[(String, PathBuf)],
512 read_only: bool,
513 allowed_subscriptions: &[String],
514 subscription_ttls: &[(String, u64)],
515 ) -> Result<NodeService> {
516 let port = port.into().unwrap_or(8080);
517 let mut command = self.command().await?;
518 command.arg("service");
519 if let ProcessInbox::Skip = process_inbox {
520 command.arg("--listener-skip-process-inbox");
521 }
522 if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
523 command.args(var.split_whitespace());
524 }
525 for app_id in operator_application_ids {
526 command.args(["--operator-application-ids", &app_id.to_string()]);
527 }
528 for (name, path) in operators {
529 command.args(["--operators", &format!("{}={}", name, path.display())]);
530 }
531 if read_only {
532 command.arg("--read-only");
533 }
534 for query in allowed_subscriptions {
535 command.args(["--allow-subscription", query]);
536 }
537 for (name, secs) in subscription_ttls {
538 command.args(["--subscription-ttl-secs", &format!("{name}={secs}")]);
539 }
540 let child = command
541 .args(["--port".to_string(), port.to_string()])
542 .spawn_into()?;
543 let client = reqwest_client();
544 for i in 0..10 {
545 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
546 let request = client
547 .get(format!("http://localhost:{}/", port))
548 .send()
549 .await;
550 if request.is_ok() {
551 tracing::info!("Node service has started");
552 return Ok(NodeService::new(port, child));
553 } else {
554 tracing::warn!("Waiting for node service to start");
555 }
556 }
557 bail!("Failed to start node service");
558 }
559
560 pub async fn run_node_service_with_controller(
562 &self,
563 port: impl Into<Option<u16>>,
564 process_inbox: ProcessInbox,
565 controller_id: &ApplicationId,
566 operators: &[(String, PathBuf)],
567 ) -> Result<NodeService> {
568 let port = port.into().unwrap_or(8080);
569 let mut command = self.command().await?;
570 command.arg("service");
571 if let ProcessInbox::Skip = process_inbox {
572 command.arg("--listener-skip-process-inbox");
573 }
574 if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
575 command.args(var.split_whitespace());
576 }
577 command.args(["--controller-id", &controller_id.to_string()]);
578 for (name, path) in operators {
579 command.args(["--operators", &format!("{}={}", name, path.display())]);
580 }
581 let child = command
582 .args(["--port".to_string(), port.to_string()])
583 .spawn_into()?;
584 let client = reqwest_client();
585 for i in 0..10 {
586 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
587 let request = client
588 .get(format!("http://localhost:{}/", port))
589 .send()
590 .await;
591 if request.is_ok() {
592 tracing::info!("Node service has started");
593 return Ok(NodeService::new(port, child));
594 } else {
595 tracing::warn!("Waiting for node service to start");
596 }
597 }
598 bail!("Failed to start node service");
599 }
600
601 pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
603 let mut command = self.command().await?;
604 command.arg("validator").arg("query").arg(address);
605 let stdout = command.spawn_and_wait_for_stdout().await?;
606
607 let hash = stdout
610 .lines()
611 .find_map(|line| {
612 line.strip_prefix("Genesis config hash: ")
613 .and_then(|hash_str| hash_str.trim().parse().ok())
614 })
615 .context("error while parsing the result of `linera validator query`")?;
616 Ok(hash)
617 }
618
619 pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
621 let mut command = self.command().await?;
622 command.arg("validator").arg("list");
623 if let Some(chain_id) = chain_id {
624 command.args(["--chain-id", &chain_id.to_string()]);
625 }
626 command.spawn_and_wait_for_stdout().await?;
627 Ok(())
628 }
629
630 pub async fn sync_validator(
632 &self,
633 chain_ids: impl IntoIterator<Item = &ChainId>,
634 validator_address: impl Into<String>,
635 ) -> Result<()> {
636 let mut command = self.command().await?;
637 command
638 .arg("validator")
639 .arg("sync")
640 .arg(validator_address.into());
641 let mut chain_ids = chain_ids.into_iter().peekable();
642 if chain_ids.peek().is_some() {
643 command
644 .arg("--chains")
645 .args(chain_ids.map(ChainId::to_string));
646 }
647 command.spawn_and_wait_for_stdout().await?;
648 Ok(())
649 }
650
651 pub async fn run_faucet(
653 &self,
654 port: impl Into<Option<u16>>,
655 chain_id: Option<ChainId>,
656 amount: Amount,
657 ) -> Result<FaucetService> {
658 let port = port.into().unwrap_or(8080);
659 let temp_dir = tempfile::tempdir()
660 .context("Failed to create temporary directory for faucet storage")?;
661 let storage_path = temp_dir.path().join("faucet_storage.sqlite");
662 let mut command = self.command().await?;
663 let command = command
664 .arg("faucet")
665 .args(["--port".to_string(), port.to_string()])
666 .args(["--amount".to_string(), amount.to_string()])
667 .args([
668 "--storage-path".to_string(),
669 storage_path.to_string_lossy().to_string(),
670 ]);
671 if let Some(chain_id) = chain_id {
672 command.arg(chain_id.to_string());
673 }
674 let child = command.spawn_into()?;
675 let client = reqwest_client();
676 for i in 0..10 {
677 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
678 let request = client
679 .get(format!("http://localhost:{}/", port))
680 .send()
681 .await;
682 if request.is_ok() {
683 tracing::info!("Faucet has started");
684 return Ok(FaucetService::new(port, child, temp_dir));
685 } else {
686 tracing::debug!("Waiting for faucet to start");
687 }
688 }
689 bail!("Failed to start faucet");
690 }
691
692 pub async fn local_balance(&self, account: Account) -> Result<Amount> {
694 let stdout = self
695 .command()
696 .await?
697 .arg("local-balance")
698 .arg(account.to_string())
699 .spawn_and_wait_for_stdout()
700 .await?;
701 let amount = stdout
702 .trim()
703 .parse()
704 .context("error while parsing the result of `linera local-balance`")?;
705 Ok(amount)
706 }
707
708 pub async fn query_balance(&self, account: Account) -> Result<Amount> {
710 let stdout = self
711 .command()
712 .await?
713 .arg("query-balance")
714 .arg(account.to_string())
715 .spawn_and_wait_for_stdout()
716 .await?;
717 let amount = stdout
718 .trim()
719 .parse()
720 .context("error while parsing the result of `linera query-balance`")?;
721 Ok(amount)
722 }
723
724 pub async fn query_application_json<T: DeserializeOwned>(
726 &self,
727 chain_id: ChainId,
728 application_id: ApplicationId,
729 query: impl AsRef<str>,
730 ) -> Result<T> {
731 let query = query.as_ref().trim();
732 let name = query
733 .split_once(|ch: char| !ch.is_alphanumeric())
734 .map_or(query, |(name, _)| name);
735 let stdout = self
736 .command()
737 .await?
738 .arg("query-application")
739 .arg("--chain-id")
740 .arg(chain_id.to_string())
741 .arg("--application-id")
742 .arg(application_id.to_string())
743 .arg(query)
744 .spawn_and_wait_for_stdout()
745 .await?;
746 let data: serde_json::Value =
747 serde_json::from_str(stdout.trim()).context("invalid JSON from query-application")?;
748 serde_json::from_value(data[name].clone())
749 .with_context(|| format!("{name} field missing in query-application response"))
750 }
751
752 pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
754 self.command()
755 .await?
756 .arg("sync")
757 .arg(chain_id.to_string())
758 .spawn_and_wait_for_stdout()
759 .await?;
760 Ok(())
761 }
762
763 pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
765 self.command()
766 .await?
767 .arg("process-inbox")
768 .arg(chain_id.to_string())
769 .spawn_and_wait_for_stdout()
770 .await?;
771 Ok(())
772 }
773
774 pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
776 self.command()
777 .await?
778 .arg("transfer")
779 .arg(amount.to_string())
780 .args(["--from", &from.to_string()])
781 .args(["--to", &to.to_string()])
782 .spawn_and_wait_for_stdout()
783 .await?;
784 Ok(())
785 }
786
787 pub async fn transfer_with_silent_logs(
789 &self,
790 amount: Amount,
791 from: ChainId,
792 to: ChainId,
793 ) -> Result<()> {
794 self.command()
795 .await?
796 .env("RUST_LOG", "off")
797 .arg("transfer")
798 .arg(amount.to_string())
799 .args(["--from", &from.to_string()])
800 .args(["--to", &to.to_string()])
801 .spawn_and_wait_for_stdout()
802 .await?;
803 Ok(())
804 }
805
806 pub async fn transfer_with_accounts(
808 &self,
809 amount: Amount,
810 from: Account,
811 to: Account,
812 ) -> Result<()> {
813 self.command()
814 .await?
815 .arg("transfer")
816 .arg(amount.to_string())
817 .args(["--from", &from.to_string()])
818 .args(["--to", &to.to_string()])
819 .spawn_and_wait_for_stdout()
820 .await?;
821 Ok(())
822 }
823
824 fn benchmark_command_internal(command: &mut Command, args: &BenchmarkCommand) -> Result<()> {
825 let mut formatted_args = to_args(&args)?;
826 let subcommand = formatted_args.remove(0);
827 formatted_args.remove(0);
830 let options = formatted_args
831 .chunks_exact(2)
832 .flat_map(|pair| {
833 let option = format!("--{}", pair[0]);
834 match pair[1].as_str() {
835 "true" => vec![option],
836 "false" => vec![],
837 _ => vec![option, pair[1].clone()],
838 }
839 })
840 .collect::<Vec<_>>();
841 command
842 .args([
843 "--max-pending-message-bundles",
844 &args.transactions_per_block().to_string(),
845 ])
846 .arg("benchmark")
847 .arg(subcommand)
848 .args(options);
849 Ok(())
850 }
851
852 async fn benchmark_command_with_envs(
853 &self,
854 args: BenchmarkCommand,
855 envs: &[(&str, &str)],
856 ) -> Result<Command> {
857 let mut command = self
858 .command_with_envs_and_arguments(envs, self.required_command_arguments())
859 .await?;
860 Self::benchmark_command_internal(&mut command, &args)?;
861 Ok(command)
862 }
863
864 async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
865 let mut command = self
866 .command_with_arguments(self.required_command_arguments())
867 .await?;
868 Self::benchmark_command_internal(&mut command, &args)?;
869 Ok(command)
870 }
871
872 pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
874 let mut command = self.benchmark_command(args).await?;
875 command.spawn_and_wait_for_stdout().await?;
876 Ok(())
877 }
878
879 pub async fn benchmark_detached(
882 &self,
883 args: BenchmarkCommand,
884 tx: oneshot::Sender<()>,
885 ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
886 let mut child = self
887 .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
888 .await?
889 .kill_on_drop(true)
890 .stdin(Stdio::piped())
891 .stdout(Stdio::piped())
892 .stderr(Stdio::piped())
893 .spawn()?;
894
895 let pid = child.id().expect("failed to get pid");
896 let stdout = child.stdout.take().expect("stdout not open");
897 let stdout_handle = tokio::spawn(async move {
898 let mut lines = BufReader::new(stdout).lines();
899 while let Ok(Some(line)) = lines.next_line().await {
900 println!("benchmark{{pid={pid}}} {line}");
901 }
902 });
903
904 let stderr = child.stderr.take().expect("stderr not open");
905 let stderr_handle = tokio::spawn(async move {
906 let mut lines = BufReader::new(stderr).lines();
907 let mut tx = Some(tx);
908 while let Ok(Some(line)) = lines.next_line().await {
909 if line.contains("Ready to start benchmark") {
910 tx.take()
911 .expect("Should only send signal once")
912 .send(())
913 .expect("failed to send ready signal to main thread");
914 } else {
915 println!("benchmark{{pid={pid}}} {line}");
916 }
917 }
918 });
919 Ok((child, stdout_handle, stderr_handle))
920 }
921
922 async fn open_chain_internal(
923 &self,
924 from: ChainId,
925 owner: Option<AccountOwner>,
926 initial_balance: Amount,
927 super_owner: bool,
928 ) -> Result<(ChainId, AccountOwner)> {
929 let mut command = self.command().await?;
930 command
931 .arg("open-chain")
932 .args(["--from", &from.to_string()])
933 .args(["--initial-balance", &initial_balance.to_string()]);
934
935 if let Some(owner) = owner {
936 command.args(["--owner", &owner.to_string()]);
937 }
938
939 if super_owner {
940 command.arg("--super-owner");
941 }
942
943 let stdout = command.spawn_and_wait_for_stdout().await?;
944 let mut split = stdout.split('\n');
945 let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
946 let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
947 if let Some(owner) = owner {
948 assert_eq!(owner, new_owner);
949 }
950 Ok((chain_id, new_owner))
951 }
952
953 pub async fn open_chain_super_owner(
955 &self,
956 from: ChainId,
957 owner: Option<AccountOwner>,
958 initial_balance: Amount,
959 ) -> Result<(ChainId, AccountOwner)> {
960 self.open_chain_internal(from, owner, initial_balance, true)
961 .await
962 }
963
964 pub async fn open_chain(
966 &self,
967 from: ChainId,
968 owner: Option<AccountOwner>,
969 initial_balance: Amount,
970 ) -> Result<(ChainId, AccountOwner)> {
971 self.open_chain_internal(from, owner, initial_balance, false)
972 .await
973 }
974
975 pub async fn open_and_assign(
977 &self,
978 client: &ClientWrapper,
979 initial_balance: Amount,
980 ) -> Result<ChainId> {
981 let our_chain = self
982 .load_wallet()?
983 .default_chain()
984 .context("no default chain found")?;
985 let owner = client.keygen().await?;
986 let (new_chain, _) = self
987 .open_chain(our_chain, Some(owner), initial_balance)
988 .await?;
989 client.assign(owner, new_chain).await?;
990 Ok(new_chain)
991 }
992
993 pub async fn open_multi_owner_chain(
994 &self,
995 from: ChainId,
996 owners: BTreeMap<AccountOwner, u64>,
997 multi_leader_rounds: u32,
998 balance: Amount,
999 base_timeout_ms: u64,
1000 ) -> Result<ChainId> {
1001 let mut command = self.command().await?;
1002 command
1003 .arg("open-multi-owner-chain")
1004 .args(["--from", &from.to_string()])
1005 .arg("--owners")
1006 .arg(serde_json::to_string(&owners)?)
1007 .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
1008 command
1009 .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
1010 .args(["--initial-balance", &balance.to_string()]);
1011
1012 let stdout = command.spawn_and_wait_for_stdout().await?;
1013 let mut split = stdout.split('\n');
1014 let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
1015
1016 Ok(chain_id)
1017 }
1018
1019 pub async fn change_ownership(
1020 &self,
1021 chain_id: ChainId,
1022 super_owners: Vec<AccountOwner>,
1023 owners: Vec<AccountOwner>,
1024 ) -> Result<()> {
1025 let mut command = self.command().await?;
1026 command
1027 .arg("change-ownership")
1028 .args(["--chain-id", &chain_id.to_string()]);
1029 command
1030 .arg("--super-owners")
1031 .arg(serde_json::to_string(&super_owners)?);
1032 command.arg("--owners").arg(serde_json::to_string(
1033 &owners
1034 .into_iter()
1035 .zip(std::iter::repeat(100u64))
1036 .collect::<BTreeMap<_, _>>(),
1037 )?);
1038 command.spawn_and_wait_for_stdout().await?;
1039 Ok(())
1040 }
1041
1042 pub async fn change_application_permissions(
1043 &self,
1044 chain_id: ChainId,
1045 application_permissions: ApplicationPermissions,
1046 ) -> Result<()> {
1047 let mut command = self.command().await?;
1048 command
1049 .arg("change-application-permissions")
1050 .args(["--chain-id", &chain_id.to_string()]);
1051 command.arg("--manage-chain").arg(serde_json::to_string(
1052 &application_permissions.manage_chain,
1053 )?);
1054 command.spawn_and_wait_for_stdout().await?;
1056 Ok(())
1057 }
1058
1059 pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
1061 let mut command = self.command().await?;
1062 command
1063 .args(["wallet", "follow-chain"])
1064 .arg(chain_id.to_string());
1065 if sync {
1066 command.arg("--sync");
1067 }
1068 command.spawn_and_wait_for_stdout().await?;
1069 Ok(())
1070 }
1071
1072 pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
1074 let mut command = self.command().await?;
1075 command
1076 .args(["wallet", "forget-chain"])
1077 .arg(chain_id.to_string());
1078 command.spawn_and_wait_for_stdout().await?;
1079 Ok(())
1080 }
1081
1082 pub async fn set_default_chain(&self, chain_id: ChainId) -> Result<()> {
1084 let mut command = self.command().await?;
1085 command
1086 .args(["wallet", "set-default"])
1087 .arg(chain_id.to_string());
1088 command.spawn_and_wait_for_stdout().await?;
1089 Ok(())
1090 }
1091
1092 pub async fn retry_pending_block(
1093 &self,
1094 chain_id: Option<ChainId>,
1095 ) -> Result<Option<CryptoHash>> {
1096 let mut command = self.command().await?;
1097 command.arg("retry-pending-block");
1098 if let Some(chain_id) = chain_id {
1099 command.arg(chain_id.to_string());
1100 }
1101 let stdout = command.spawn_and_wait_for_stdout().await?;
1102 let stdout = stdout.trim();
1103 if stdout.is_empty() {
1104 Ok(None)
1105 } else {
1106 Ok(Some(CryptoHash::from_str(stdout)?))
1107 }
1108 }
1109
1110 pub async fn publish_data_blob(
1112 &self,
1113 path: &Path,
1114 chain_id: Option<ChainId>,
1115 ) -> Result<CryptoHash> {
1116 let mut command = self.command().await?;
1117 command.arg("publish-data-blob").arg(path);
1118 if let Some(chain_id) = chain_id {
1119 command.arg(chain_id.to_string());
1120 }
1121 let stdout = command.spawn_and_wait_for_stdout().await?;
1122 let stdout = stdout.trim();
1123 Ok(CryptoHash::from_str(stdout)?)
1124 }
1125
1126 pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
1128 let mut command = self.command().await?;
1129 command.arg("read-data-blob").arg(hash.to_string());
1130 if let Some(chain_id) = chain_id {
1131 command.arg(chain_id.to_string());
1132 }
1133 command.spawn_and_wait_for_stdout().await?;
1134 Ok(())
1135 }
1136
1137 pub fn load_wallet(&self) -> Result<Wallet> {
1138 Ok(Wallet::read(&self.wallet_path())?)
1139 }
1140
1141 pub fn load_keystore(&self) -> Result<InMemorySigner> {
1142 util::read_json(self.keystore_path())
1143 }
1144
1145 pub fn wallet_path(&self) -> PathBuf {
1146 self.path_provider.path().join(&self.wallet)
1147 }
1148
1149 pub fn keystore_path(&self) -> PathBuf {
1150 self.path_provider.path().join(&self.keystore)
1151 }
1152
1153 pub fn storage_path(&self) -> &str {
1154 &self.storage
1155 }
1156
1157 pub fn get_owner(&self) -> Option<AccountOwner> {
1158 let wallet = self.load_wallet().ok()?;
1159 wallet
1160 .get(wallet.default_chain()?)
1161 .expect("default chain must be in wallet")
1162 .owner
1163 }
1164
1165 pub fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
1166 self.load_wallet()
1167 .ok()
1168 .is_some_and(|wallet| wallet.get(chain).is_some())
1169 }
1170
1171 pub async fn set_validator(
1172 &self,
1173 validator_key: &(String, String),
1174 port: usize,
1175 votes: usize,
1176 ) -> Result<()> {
1177 let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1178 self.command()
1179 .await?
1180 .arg("validator")
1181 .arg("add")
1182 .args(["--public-key", &validator_key.0])
1183 .args(["--account-key", &validator_key.1])
1184 .args(["--address", &address])
1185 .args(["--votes", &votes.to_string()])
1186 .spawn_and_wait_for_stdout()
1187 .await?;
1188 Ok(())
1189 }
1190
1191 pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
1192 self.command()
1193 .await?
1194 .arg("validator")
1195 .arg("remove")
1196 .args(["--public-key", validator_key])
1197 .spawn_and_wait_for_stdout()
1198 .await?;
1199 Ok(())
1200 }
1201
1202 pub async fn change_validators(
1203 &self,
1204 add_validators: &[(String, String, usize, usize)], modify_validators: &[(String, String, usize, usize)], remove_validators: &[String],
1207 ) -> Result<()> {
1208 use std::str::FromStr;
1209
1210 use linera_base::crypto::{AccountPublicKey, ValidatorPublicKey};
1211
1212 let mut changes = std::collections::HashMap::new();
1215
1216 for (public_key_str, account_key_str, port, votes) in
1218 add_validators.iter().chain(modify_validators.iter())
1219 {
1220 let public_key = ValidatorPublicKey::from_str(public_key_str)
1221 .with_context(|| format!("Invalid validator public key: {}", public_key_str))?;
1222
1223 let account_key = AccountPublicKey::from_str(account_key_str)
1224 .with_context(|| format!("Invalid account public key: {}", account_key_str))?;
1225
1226 let address = format!("{}:127.0.0.1:{}", self.network.short(), port)
1227 .parse()
1228 .unwrap();
1229
1230 let change = crate::cli::validator::Change {
1232 account_key,
1233 address,
1234 votes: crate::cli::validator::Votes(
1235 std::num::NonZero::new(*votes as u64).context("Votes must be non-zero")?,
1236 ),
1237 };
1238
1239 changes.insert(public_key, Some(change));
1240 }
1241
1242 for validator_key_str in remove_validators {
1244 let public_key = ValidatorPublicKey::from_str(validator_key_str)
1245 .with_context(|| format!("Invalid validator public key: {}", validator_key_str))?;
1246 changes.insert(public_key, None);
1247 }
1248
1249 let temp_file = tempfile::NamedTempFile::new()
1251 .context("Failed to create temporary file for validator changes")?;
1252 serde_json::to_writer(&temp_file, &changes)
1253 .context("Failed to write validator changes to file")?;
1254 let temp_path = temp_file.path();
1255
1256 self.command()
1257 .await?
1258 .arg("validator")
1259 .arg("update")
1260 .arg(temp_path)
1261 .arg("--yes") .spawn_and_wait_for_stdout()
1263 .await?;
1264
1265 Ok(())
1266 }
1267
1268 pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
1269 self.command()
1270 .await?
1271 .arg("revoke-epochs")
1272 .arg(epoch.to_string())
1273 .spawn_and_wait_for_stdout()
1274 .await?;
1275 Ok(())
1276 }
1277
1278 pub async fn keygen(&self) -> Result<AccountOwner> {
1280 let stdout = self
1281 .command()
1282 .await?
1283 .arg("keygen")
1284 .spawn_and_wait_for_stdout()
1285 .await?;
1286 AccountOwner::from_str(stdout.as_str().trim())
1287 }
1288
1289 pub fn default_chain(&self) -> Option<ChainId> {
1291 self.load_wallet().ok()?.default_chain()
1292 }
1293
1294 pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
1296 let _stdout = self
1297 .command()
1298 .await?
1299 .arg("assign")
1300 .args(["--owner", &owner.to_string()])
1301 .args(["--chain-id", &chain_id.to_string()])
1302 .spawn_and_wait_for_stdout()
1303 .await?;
1304 Ok(())
1305 }
1306
1307 pub async fn set_preferred_owner(
1309 &self,
1310 chain_id: ChainId,
1311 owner: Option<AccountOwner>,
1312 ) -> Result<()> {
1313 let mut owner_arg = vec!["--owner".to_string()];
1314 if let Some(owner) = owner {
1315 owner_arg.push(owner.to_string());
1316 };
1317 self.command()
1318 .await?
1319 .arg("set-preferred-owner")
1320 .args(["--chain-id", &chain_id.to_string()])
1321 .args(owner_arg)
1322 .spawn_and_wait_for_stdout()
1323 .await?;
1324 Ok(())
1325 }
1326
1327 pub async fn build_application(
1328 &self,
1329 path: &Path,
1330 name: &str,
1331 is_workspace: bool,
1332 ) -> Result<(PathBuf, PathBuf)> {
1333 Command::new("cargo")
1334 .current_dir(self.path_provider.path())
1335 .arg("build")
1336 .arg("--release")
1337 .args(["--target", "wasm32-unknown-unknown"])
1338 .arg("--manifest-path")
1339 .arg(path.join("Cargo.toml"))
1340 .spawn_and_wait_for_stdout()
1341 .await?;
1342
1343 let release_dir = match is_workspace {
1344 true => path.join("../target/wasm32-unknown-unknown/release"),
1345 false => path.join("target/wasm32-unknown-unknown/release"),
1346 };
1347
1348 let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1349 let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1350
1351 let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1352 let service_size = fs_err::tokio::metadata(&service).await?.len();
1353 tracing::info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1354
1355 Ok((contract, service))
1356 }
1357}
1358
1359impl Drop for ClientWrapper {
1360 fn drop(&mut self) {
1361 use std::process::Command as SyncCommand;
1362
1363 if self.on_drop != OnClientDrop::CloseChains {
1364 return;
1365 }
1366
1367 let Ok(binary_path) = self.binary_path.lock() else {
1368 tracing::error!(
1369 "Failed to close chains because a thread panicked with a lock to `binary_path`"
1370 );
1371 return;
1372 };
1373
1374 let Some(binary_path) = binary_path.as_ref() else {
1375 tracing::warn!(
1376 "Assuming no chains need to be closed, because the command binary was never \
1377 resolved and therefore presumably never called"
1378 );
1379 return;
1380 };
1381
1382 let working_directory = self.path_provider.path();
1383 let mut wallet_show_command = SyncCommand::new(binary_path);
1384
1385 for argument in self.command_arguments() {
1386 wallet_show_command.arg(&*argument);
1387 }
1388
1389 let Ok(wallet_show_output) = wallet_show_command
1390 .current_dir(working_directory)
1391 .args(["wallet", "show", "--short", "--owned"])
1392 .output()
1393 else {
1394 tracing::warn!("Failed to execute `wallet show --short` to list chains to close");
1395 return;
1396 };
1397
1398 if !wallet_show_output.status.success() {
1399 tracing::warn!("Failed to list chains in the wallet to close them");
1400 return;
1401 }
1402
1403 let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1404 tracing::warn!(
1405 "Failed to close chains because `linera wallet show --short` \
1406 returned a non-UTF-8 output"
1407 );
1408 return;
1409 };
1410
1411 let chain_ids = chain_list_string
1412 .split('\n')
1413 .map(|line| line.trim())
1414 .filter(|line| !line.is_empty());
1415
1416 for chain_id in chain_ids {
1417 let mut close_chain_command = SyncCommand::new(binary_path);
1418
1419 for argument in self.command_arguments() {
1420 close_chain_command.arg(&*argument);
1421 }
1422
1423 close_chain_command.current_dir(working_directory);
1424
1425 match close_chain_command.args(["close-chain", chain_id]).status() {
1426 Ok(status) if status.success() => (),
1427 Ok(failure) => tracing::warn!("Failed to close chain {chain_id}: {failure}"),
1428 Err(error) => tracing::warn!("Failed to close chain {chain_id}: {error}"),
1429 }
1430 }
1431 }
1432}
1433
1434#[cfg(with_testing)]
1435impl ClientWrapper {
1436 pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1437 self.build_application(Self::example_path(name)?.as_path(), name, true)
1438 .await
1439 }
1440
1441 pub fn example_path(name: &str) -> Result<PathBuf> {
1442 Ok(env::current_dir()?.join("../examples/").join(name))
1443 }
1444}
1445
1446fn truncate_query_output(input: &str) -> String {
1447 let max_len = 1000;
1448 if input.len() < max_len {
1449 input.to_string()
1450 } else {
1451 format!("{} ...", input.get(..max_len).unwrap())
1452 }
1453}
1454
1455fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1456 let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1457 let max_len = 200;
1458 if query.len() < max_len {
1459 query
1460 } else {
1461 format!("{} ...", query.get(..max_len).unwrap())
1462 }
1463}
1464
1465fn log_unexpected_exit(child: &mut Child, service_kind: &str, port: u16) {
1469 match child.try_wait() {
1470 Ok(Some(status)) => {
1471 #[cfg(unix)]
1472 {
1473 use std::os::unix::process::ExitStatusExt as _;
1474 if let Some(signal) = status.signal() {
1475 tracing::error!(
1476 port,
1477 signal,
1478 "The {service_kind} service was killed by signal {signal}",
1479 );
1480 return;
1481 }
1482 }
1483 if !status.success() {
1484 tracing::error!(
1485 port,
1486 %status,
1487 "The {service_kind} service exited unexpectedly with {status}",
1488 );
1489 }
1490 }
1491 Ok(None) => {} Err(error) => {
1493 tracing::warn!(
1494 port,
1495 %error,
1496 "Failed to check {service_kind} service status",
1497 );
1498 }
1499 }
1500}
1501
1502pub struct NodeService {
1503 port: u16,
1504 child: Child,
1505 terminated: bool,
1506}
1507
1508impl Drop for NodeService {
1509 fn drop(&mut self) {
1510 if !self.terminated {
1511 log_unexpected_exit(&mut self.child, "node", self.port);
1512 }
1513 }
1514}
1515
1516impl NodeService {
1517 fn new(port: u16, child: Child) -> Self {
1518 Self {
1519 port,
1520 child,
1521 terminated: false,
1522 }
1523 }
1524
1525 pub async fn terminate(mut self) -> Result<()> {
1526 self.terminated = true;
1527 self.child.kill().await.context("terminating node service")
1528 }
1529
1530 pub fn port(&self) -> u16 {
1531 self.port
1532 }
1533
1534 pub fn ensure_is_running(&mut self) -> Result<()> {
1535 self.child.ensure_is_running()
1536 }
1537
1538 pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1539 let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1540 let mut data = self.query_node(query).await?;
1541 Ok(serde_json::from_value(data["processInbox"].take())?)
1542 }
1543
1544 pub async fn sync(&self, chain_id: &ChainId) -> Result<u64> {
1545 let query = format!("mutation {{ sync(chainId: \"{chain_id}\") }}");
1546 let mut data = self.query_node(query).await?;
1547 Ok(serde_json::from_value(data["sync"].take())?)
1548 }
1549
1550 pub async fn transfer(
1551 &self,
1552 chain_id: ChainId,
1553 owner: AccountOwner,
1554 recipient: Account,
1555 amount: Amount,
1556 ) -> Result<CryptoHash> {
1557 let json_owner = owner.to_value();
1558 let json_recipient = recipient.to_value();
1559 let query = format!(
1560 "mutation {{ transfer(\
1561 chainId: \"{chain_id}\", \
1562 owner: {json_owner}, \
1563 recipient: {json_recipient}, \
1564 amount: \"{amount}\") \
1565 }}"
1566 );
1567 let data = self.query_node(query).await?;
1568 serde_json::from_value(data["transfer"].clone())
1569 .context("missing transfer field in response")
1570 }
1571
1572 pub async fn balance(&self, account: &Account) -> Result<Amount> {
1573 let chain = account.chain_id;
1574 let owner = account.owner;
1575 if matches!(owner, AccountOwner::CHAIN) {
1576 let query = format!(
1577 "query {{ chain(chainId:\"{chain}\") {{
1578 executionState {{ system {{ balance }} }}
1579 }} }}"
1580 );
1581 let response = self.query_node(query).await?;
1582 let balance = &response["chain"]["executionState"]["system"]["balance"]
1583 .as_str()
1584 .unwrap();
1585 return Ok(Amount::from_str(balance)?);
1586 }
1587 let query = format!(
1588 "query {{ chain(chainId:\"{chain}\") {{
1589 executionState {{ system {{ balances {{
1590 entry(key:\"{owner}\") {{ value }}
1591 }} }} }}
1592 }} }}"
1593 );
1594 let response = self.query_node(query).await?;
1595 let balances = &response["chain"]["executionState"]["system"]["balances"];
1596 let balance = balances["entry"]["value"].as_str();
1597 match balance {
1598 None => Ok(Amount::ZERO),
1599 Some(amount) => Ok(Amount::from_str(amount)?),
1600 }
1601 }
1602
1603 pub fn make_application<A: ContractAbi>(
1604 &self,
1605 chain_id: &ChainId,
1606 application_id: &ApplicationId<A>,
1607 ) -> Result<ApplicationWrapper<A>> {
1608 let application_id = application_id.forget_abi().to_string();
1609 let link = format!(
1610 "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1611 self.port
1612 );
1613 Ok(ApplicationWrapper::from(link))
1614 }
1615
1616 pub async fn publish_data_blob(
1617 &self,
1618 chain_id: &ChainId,
1619 bytes: Vec<u8>,
1620 ) -> Result<CryptoHash> {
1621 let query = format!(
1622 "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1623 chain_id.to_value(),
1624 bytes.to_value(),
1625 );
1626 let data = self.query_node(query).await?;
1627 serde_json::from_value(data["publishDataBlob"].clone())
1628 .context("missing publishDataBlob field in response")
1629 }
1630
1631 pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1632 &self,
1633 chain_id: &ChainId,
1634 contract: PathBuf,
1635 service: PathBuf,
1636 vm_runtime: VmRuntime,
1637 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1638 let contract_code = Bytecode::load_from_file(&contract).await?;
1639 let service_code = Bytecode::load_from_file(&service).await?;
1640 let query = format!(
1641 "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1642 chain_id.to_value(),
1643 contract_code.to_value(),
1644 service_code.to_value(),
1645 vm_runtime.to_value(),
1646 );
1647 let data = self.query_node(query).await?;
1648 let module_str = data["publishModule"]
1649 .as_str()
1650 .context("module ID not found")?;
1651 let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1652 Ok(module_id.with_abi())
1653 }
1654
1655 pub async fn query_committees(&self, chain_id: &ChainId) -> Result<BTreeMap<Epoch, Committee>> {
1656 let query = format!(
1657 "query {{ chain(chainId:\"{chain_id}\") {{
1658 executionState {{ system {{ committees }} }}
1659 }} }}"
1660 );
1661 let mut response = self.query_node(query).await?;
1662 let committees = response["chain"]["executionState"]["system"]["committees"].take();
1663 Ok(serde_json::from_value(committees)?)
1664 }
1665
1666 pub async fn events_from_index(
1667 &self,
1668 chain_id: &ChainId,
1669 stream_id: &StreamId,
1670 start_index: u32,
1671 ) -> Result<Vec<IndexAndEvent>> {
1672 let query = format!(
1673 "query {{
1674 eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1675 {{ index event }}
1676 }}",
1677 stream_id.to_value()
1678 );
1679 let mut response = self.query_node(query).await?;
1680 let response = response["eventsFromIndex"].take();
1681 Ok(serde_json::from_value(response)?)
1682 }
1683
1684 pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1685 let n_try = 5;
1686 let query = query.as_ref();
1687 for i in 0..n_try {
1688 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1689 let url = format!("http://localhost:{}/", self.port);
1690 let client = reqwest_client();
1691 let result = client
1692 .post(url)
1693 .json(&json!({ "query": query }))
1694 .send()
1695 .await;
1696 if matches!(result, Err(ref error) if error.is_timeout()) {
1697 tracing::warn!(
1698 "Timeout when sending query {} to the node service",
1699 truncate_query_output(query)
1700 );
1701 continue;
1702 }
1703 let response = result.with_context(|| {
1704 format!(
1705 "query_node: failed to post query={}",
1706 truncate_query_output(query)
1707 )
1708 })?;
1709 ensure!(
1710 response.status().is_success(),
1711 "Query \"{}\" failed: {}",
1712 truncate_query_output(query),
1713 response
1714 .text()
1715 .await
1716 .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1717 );
1718 let value: Value = response.json().await.context("invalid JSON")?;
1719 if let Some(errors) = value.get("errors") {
1720 tracing::warn!(
1721 "Query \"{}\" failed: {}",
1722 truncate_query_output(query),
1723 errors
1724 );
1725 } else {
1726 return Ok(value["data"].clone());
1727 }
1728 }
1729 bail!(
1730 "Query \"{}\" failed after {} retries.",
1731 truncate_query_output(query),
1732 n_try
1733 );
1734 }
1735
1736 pub async fn create_application<
1737 Abi: ContractAbi,
1738 Parameters: Serialize,
1739 InstantiationArgument: Serialize,
1740 >(
1741 &self,
1742 chain_id: &ChainId,
1743 module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1744 parameters: &Parameters,
1745 argument: &InstantiationArgument,
1746 required_application_ids: &[ApplicationId],
1747 ) -> Result<ApplicationId<Abi>> {
1748 let module_id = module_id.forget_abi();
1749 let json_required_applications_ids = required_application_ids
1750 .iter()
1751 .map(ApplicationId::to_string)
1752 .collect::<Vec<_>>()
1753 .to_value();
1754 let new_parameters = serde_json::to_value(parameters)
1756 .context("could not create parameters JSON")?
1757 .to_value();
1758 let new_argument = serde_json::to_value(argument)
1759 .context("could not create argument JSON")?
1760 .to_value();
1761 let query = format!(
1762 "mutation {{ createApplication(\
1763 chainId: \"{chain_id}\",
1764 moduleId: \"{module_id}\", \
1765 parameters: {new_parameters}, \
1766 instantiationArgument: {new_argument}, \
1767 requiredApplicationIds: {json_required_applications_ids}) \
1768 }}"
1769 );
1770 let data = self.query_node(query).await?;
1771 let app_id_str = data["createApplication"]
1772 .as_str()
1773 .context("missing createApplication string in response")?
1774 .trim();
1775 Ok(app_id_str
1776 .parse::<ApplicationId>()
1777 .context("invalid application ID")?
1778 .with_abi())
1779 }
1780
1781 pub async fn chain_tip(&self, chain: ChainId) -> Result<Option<(CryptoHash, BlockHeight)>> {
1783 let query = format!(
1784 r#"query {{ block(chainId: "{chain}") {{
1785 hash
1786 block {{ header {{ height }} }}
1787 }} }}"#
1788 );
1789
1790 let mut response = self.query_node(&query).await?;
1791
1792 match (
1793 mem::take(&mut response["block"]["hash"]),
1794 mem::take(&mut response["block"]["block"]["header"]["height"]),
1795 ) {
1796 (Value::Null, Value::Null) => Ok(None),
1797 (Value::String(hash), Value::Number(height)) => Ok(Some((
1798 hash.parse()
1799 .context("Received an invalid hash {hash:?} for chain tip")?,
1800 BlockHeight(height.as_u64().unwrap()),
1801 ))),
1802 invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
1803 }
1804 }
1805
1806 pub async fn notifications(
1808 &self,
1809 chain_id: ChainId,
1810 ) -> Result<Pin<Box<impl Stream<Item = Result<Notification>>>>> {
1811 let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
1812 let url = format!("ws://localhost:{}/ws", self.port);
1813 let mut request = url.into_client_request()?;
1814 request.headers_mut().insert(
1815 "Sec-WebSocket-Protocol",
1816 HeaderValue::from_str("graphql-transport-ws")?,
1817 );
1818 let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1819 let init_json = json!({
1820 "type": "connection_init",
1821 "payload": {}
1822 });
1823 websocket.send(init_json.to_string().into()).await?;
1824 let text = websocket
1825 .next()
1826 .await
1827 .context("Failed to establish connection")??
1828 .into_text()?;
1829 ensure!(
1830 text == "{\"type\":\"connection_ack\"}",
1831 "Unexpected response: {text}"
1832 );
1833 let query_json = json!({
1834 "id": "1",
1835 "type": "start",
1836 "payload": {
1837 "query": query,
1838 "variables": {},
1839 "operationName": null
1840 }
1841 });
1842 websocket.send(query_json.to_string().into()).await?;
1843 Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
1844 |message| async {
1845 let text = message.into_text()?;
1846 let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1847 if let Some(errors) = value["payload"].get("errors") {
1848 bail!("Notification subscription failed: {errors:?}");
1849 }
1850 serde_json::from_value(value["payload"]["data"]["notifications"].clone())
1851 .context("Failed to deserialize notification")
1852 },
1853 )))
1854 }
1855
1856 pub async fn query_result(
1858 &self,
1859 name: &str,
1860 chain_id: ChainId,
1861 application_id: &ApplicationId,
1862 ) -> Result<Pin<Box<impl Stream<Item = Result<Value>>>>> {
1863 let query = format!(
1864 r#"subscription {{ queryResult(name: "{name}", chainId: "{chain_id}", applicationId: "{application_id}") }}"#,
1865 );
1866 let url = format!("ws://localhost:{}/ws", self.port);
1867 let mut request = url.into_client_request()?;
1868 request.headers_mut().insert(
1869 "Sec-WebSocket-Protocol",
1870 HeaderValue::from_str("graphql-transport-ws")?,
1871 );
1872 let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1873 let init_json = json!({
1874 "type": "connection_init",
1875 "payload": {}
1876 });
1877 websocket.send(init_json.to_string().into()).await?;
1878 let text = websocket
1879 .next()
1880 .await
1881 .context("Failed to establish connection")??
1882 .into_text()?;
1883 ensure!(
1884 text == "{\"type\":\"connection_ack\"}",
1885 "Unexpected response: {text}"
1886 );
1887 let query_json = json!({
1888 "id": "1",
1889 "type": "start",
1890 "payload": {
1891 "query": query,
1892 "variables": {},
1893 "operationName": null
1894 }
1895 });
1896 websocket.send(query_json.to_string().into()).await?;
1897 Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
1898 |message| async {
1899 let text = message.into_text()?;
1900 let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1901 if let Some(errors) = value["payload"].get("errors") {
1902 bail!("Query result subscription failed: {errors:?}");
1903 }
1904 Ok(value["payload"]["data"]["queryResult"].clone())
1905 },
1906 )))
1907 }
1908}
1909
1910pub struct FaucetService {
1912 port: u16,
1913 child: Child,
1914 _temp_dir: tempfile::TempDir,
1915 terminated: bool,
1916}
1917
1918impl Drop for FaucetService {
1919 fn drop(&mut self) {
1920 if !self.terminated {
1921 log_unexpected_exit(&mut self.child, "faucet", self.port);
1922 }
1923 }
1924}
1925
1926impl FaucetService {
1927 fn new(port: u16, child: Child, temp_dir: tempfile::TempDir) -> Self {
1928 Self {
1929 port,
1930 child,
1931 _temp_dir: temp_dir,
1932 terminated: false,
1933 }
1934 }
1935
1936 pub async fn terminate(mut self) -> Result<()> {
1937 self.terminated = true;
1938 self.child
1939 .kill()
1940 .await
1941 .context("terminating faucet service")
1942 }
1943
1944 pub fn ensure_is_running(&mut self) -> Result<()> {
1945 self.child.ensure_is_running()
1946 }
1947
1948 pub fn instance(&self) -> Faucet {
1949 Faucet::new(format!("http://localhost:{}/", self.port))
1950 }
1951}
1952
1953pub struct ApplicationWrapper<A> {
1955 uri: String,
1956 _phantom: PhantomData<A>,
1957}
1958
1959impl<A> ApplicationWrapper<A> {
1960 pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
1961 let query = query.as_ref();
1962 let value = self.run_json_query(json!({ "query": query })).await?;
1963 Ok(value["data"].clone())
1964 }
1965
1966 pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
1967 const MAX_RETRIES: usize = 5;
1968
1969 for i in 0.. {
1970 let client = reqwest_client();
1971 let result = client.post(&self.uri).json(&query).send().await;
1972 let response = match result {
1973 Ok(response) => response,
1974 Err(error) if i < MAX_RETRIES => {
1975 tracing::warn!(
1976 "Failed to post query \"{}\": {error}; retrying",
1977 truncate_query_output_serialize(&query),
1978 );
1979 continue;
1980 }
1981 Err(error) => {
1982 let query = truncate_query_output_serialize(&query);
1983 return Err(error)
1984 .with_context(|| format!("run_json_query: failed to post query={query}"));
1985 }
1986 };
1987 ensure!(
1988 response.status().is_success(),
1989 "Query \"{}\" failed: {}",
1990 truncate_query_output_serialize(&query),
1991 response
1992 .text()
1993 .await
1994 .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1995 );
1996 let value: Value = response.json().await.context("invalid JSON")?;
1997 if let Some(errors) = value.get("errors") {
1998 bail!(
1999 "Query \"{}\" failed: {}",
2000 truncate_query_output_serialize(&query),
2001 errors
2002 );
2003 }
2004 return Ok(value);
2005 }
2006 unreachable!()
2007 }
2008
2009 pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
2010 let query = query.as_ref();
2011 self.run_graphql_query(&format!("query {{ {query} }}"))
2012 .await
2013 }
2014
2015 pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
2016 let query = query.as_ref().trim();
2017 let name = query
2018 .split_once(|ch: char| !ch.is_alphanumeric())
2019 .map_or(query, |(name, _)| name);
2020 let data = self.query(query).await?;
2021 serde_json::from_value(data[name].clone())
2022 .with_context(|| format!("{name} field missing in response"))
2023 }
2024
2025 pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
2026 let mutation = mutation.as_ref();
2027 self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
2028 .await
2029 }
2030
2031 pub async fn multiple_mutate(&self, mutations: &[String]) -> Result<Value> {
2032 let mut out = String::from("mutation {\n");
2033 for (index, mutation) in mutations.iter().enumerate() {
2034 out = format!("{} u{}: {}\n", out, index, mutation);
2035 }
2036 out.push_str("}\n");
2037 self.run_graphql_query(&out).await
2038 }
2039}
2040
2041impl<A> From<String> for ApplicationWrapper<A> {
2042 fn from(uri: String) -> ApplicationWrapper<A> {
2043 ApplicationWrapper {
2044 uri,
2045 _phantom: PhantomData,
2046 }
2047 }
2048}
2049
2050#[cfg(with_testing)]
2053fn notification_timeout() -> Duration {
2054 const NOTIFICATION_TIMEOUT_MS_ENV: &str = "LINERA_TEST_NOTIFICATION_TIMEOUT_MS";
2055 const NOTIFICATION_TIMEOUT_MS_DEFAULT: u64 = 10_000;
2056
2057 match env::var(NOTIFICATION_TIMEOUT_MS_ENV) {
2058 Ok(var) => Duration::from_millis(var.parse().unwrap_or_else(|error| {
2059 panic!("{NOTIFICATION_TIMEOUT_MS_ENV} is not a valid number: {error}")
2060 })),
2061 Err(env::VarError::NotPresent) => Duration::from_millis(NOTIFICATION_TIMEOUT_MS_DEFAULT),
2062 Err(env::VarError::NotUnicode(_)) => {
2063 panic!("{NOTIFICATION_TIMEOUT_MS_ENV} must be valid Unicode")
2064 }
2065 }
2066}
2067
2068#[cfg(with_testing)]
2069pub trait NotificationsExt {
2070 fn wait_for<T>(
2072 &mut self,
2073 f: impl FnMut(Notification) -> Option<T>,
2074 ) -> impl Future<Output = Result<T>>;
2075
2076 fn wait_for_events(
2079 &mut self,
2080 expected_height: impl Into<Option<BlockHeight>>,
2081 ) -> impl Future<Output = Result<BTreeSet<StreamId>>> {
2082 let expected_height = expected_height.into();
2083 self.wait_for(move |notification| {
2084 if let Reason::NewEvents {
2085 height,
2086 event_streams,
2087 ..
2088 } = notification.reason
2089 {
2090 if expected_height.is_none_or(|h| h == height) {
2091 return Some(event_streams);
2092 }
2093 }
2094 None
2095 })
2096 }
2097
2098 fn wait_for_block(
2101 &mut self,
2102 expected_height: impl Into<Option<BlockHeight>>,
2103 ) -> impl Future<Output = Result<CryptoHash>> {
2104 let expected_height = expected_height.into();
2105 self.wait_for(move |notification| {
2106 if let Reason::NewBlock {
2107 height, block_hash, ..
2108 } = notification.reason
2109 {
2110 if expected_height.is_none_or(|h| h == height) {
2111 return Some(block_hash);
2112 }
2113 }
2114 None
2115 })
2116 }
2117
2118 fn wait_for_bundle(
2121 &mut self,
2122 expected_origin: ChainId,
2123 expected_height: impl Into<Option<BlockHeight>>,
2124 ) -> impl Future<Output = Result<()>> {
2125 let expected_height = expected_height.into();
2126 self.wait_for(move |notification| {
2127 if let Reason::NewIncomingBundle { height, origin } = notification.reason {
2128 if expected_height.is_none_or(|h| h == height) && origin == expected_origin {
2129 return Some(());
2130 }
2131 }
2132 None
2133 })
2134 }
2135}
2136
2137#[cfg(with_testing)]
2138impl<S: Stream<Item = Result<Notification>>> NotificationsExt for Pin<Box<S>> {
2139 async fn wait_for<T>(&mut self, mut f: impl FnMut(Notification) -> Option<T>) -> Result<T> {
2140 let mut timeout = Box::pin(linera_base::time::timer::sleep(notification_timeout())).fuse();
2141 loop {
2142 let notification = futures::select! {
2143 () = timeout => bail!("Timeout waiting for notification"),
2144 notification = self.next().fuse() => notification.context("Stream closed")??,
2145 };
2146 if let Some(t) = f(notification) {
2147 return Ok(t);
2148 }
2149 }
2150 }
2151}