1use std::{
5 borrow::Cow,
6 collections::BTreeMap,
7 env,
8 marker::PhantomData,
9 mem,
10 path::{Path, PathBuf},
11 pin::Pin,
12 process::Stdio,
13 str::FromStr,
14 sync,
15 time::Duration,
16};
17
18use anyhow::{bail, ensure, Context, Result};
19use async_graphql::InputType;
20use async_tungstenite::tungstenite::{client::IntoClientRequest as _, http::HeaderValue};
21use futures::{SinkExt as _, Stream, StreamExt as _, TryStreamExt as _};
22use heck::ToKebabCase;
23use linera_base::{
24 abi::ContractAbi,
25 command::{resolve_binary, CommandExt},
26 crypto::{CryptoHash, InMemorySigner},
27 data_types::{Amount, ApplicationPermissions, BlockHeight, Bytecode, Epoch},
28 identifiers::{
29 Account, AccountOwner, ApplicationId, ChainId, IndexAndEvent, ModuleId, StreamId,
30 },
31 vm::VmRuntime,
32};
33use linera_client::client_options::ResourceControlPolicyConfig;
34use linera_core::worker::Notification;
35use linera_faucet_client::Faucet;
36use serde::{de::DeserializeOwned, ser::Serialize};
37use serde_command_opts::to_args;
38use serde_json::{json, Value};
39use tempfile::TempDir;
40use tokio::{
41 io::{AsyncBufReadExt, BufReader},
42 process::{Child, Command},
43 sync::oneshot,
44 task::JoinHandle,
45};
46#[cfg(with_testing)]
47use {
48 futures::FutureExt as _,
49 linera_core::worker::Reason,
50 std::{collections::BTreeSet, future::Future},
51};
52
53use crate::{
54 cli::command::BenchmarkCommand,
55 cli_wrappers::{
56 local_net::{PathProvider, ProcessInbox},
57 Network,
58 },
59 util::{self, ChildExt},
60 Wallet,
61};
62
63const CLIENT_SERVICE_ENV: &str = "LINERA_CLIENT_SERVICE_PARAMS";
66
67fn reqwest_client() -> reqwest::Client {
68 reqwest::ClientBuilder::new()
69 .timeout(Duration::from_secs(60))
70 .build()
71 .unwrap()
72}
73
74pub struct ClientWrapper {
76 binary_path: sync::Mutex<Option<PathBuf>>,
77 testing_prng_seed: Option<u64>,
78 storage: String,
79 wallet: String,
80 keystore: String,
81 max_pending_message_bundles: usize,
82 network: Network,
83 pub path_provider: PathProvider,
84 on_drop: OnClientDrop,
85 extra_args: Vec<String>,
86}
87
88#[derive(Clone, Copy, Debug, Eq, PartialEq)]
90pub enum OnClientDrop {
91 CloseChains,
93 LeakChains,
95}
96
97impl ClientWrapper {
98 pub fn new(
99 path_provider: PathProvider,
100 network: Network,
101 testing_prng_seed: Option<u64>,
102 id: usize,
103 on_drop: OnClientDrop,
104 ) -> Self {
105 Self::new_with_extra_args(
106 path_provider,
107 network,
108 testing_prng_seed,
109 id,
110 on_drop,
111 vec!["--wait-for-outgoing-messages".to_string()],
112 None,
113 )
114 }
115
116 pub fn new_with_extra_args(
117 path_provider: PathProvider,
118 network: Network,
119 testing_prng_seed: Option<u64>,
120 id: usize,
121 on_drop: OnClientDrop,
122 extra_args: Vec<String>,
123 binary_dir: Option<PathBuf>,
124 ) -> Self {
125 let storage = format!(
126 "rocksdb:{}/client_{}.db",
127 path_provider.path().display(),
128 id
129 );
130 let wallet = format!("wallet_{}.json", id);
131 let keystore = format!("keystore_{}.json", id);
132 let binary_path = binary_dir.map(|dir| dir.join("linera"));
133 Self {
134 binary_path: sync::Mutex::new(binary_path),
135 testing_prng_seed,
136 storage,
137 wallet,
138 keystore,
139 max_pending_message_bundles: 10_000,
140 network,
141 path_provider,
142 on_drop,
143 extra_args,
144 }
145 }
146
147 pub async fn project_new(&self, project_name: &str, linera_root: &Path) -> Result<TempDir> {
149 let tmp = TempDir::new()?;
150 let mut command = self.command().await?;
151 command
152 .current_dir(tmp.path())
153 .arg("project")
154 .arg("new")
155 .arg(project_name)
156 .arg("--linera-root")
157 .arg(linera_root)
158 .spawn_and_wait_for_stdout()
159 .await?;
160 Ok(tmp)
161 }
162
163 pub async fn project_publish<T: Serialize>(
165 &self,
166 path: PathBuf,
167 required_application_ids: Vec<String>,
168 publisher: impl Into<Option<ChainId>>,
169 argument: &T,
170 ) -> Result<String> {
171 let json_parameters = serde_json::to_string(&())?;
172 let json_argument = serde_json::to_string(argument)?;
173 let mut command = self.command().await?;
174 command
175 .arg("project")
176 .arg("publish-and-create")
177 .arg(path)
178 .args(publisher.into().iter().map(ChainId::to_string))
179 .args(["--json-parameters", &json_parameters])
180 .args(["--json-argument", &json_argument]);
181 if !required_application_ids.is_empty() {
182 command.arg("--required-application-ids");
183 command.args(required_application_ids);
184 }
185 let stdout = command.spawn_and_wait_for_stdout().await?;
186 Ok(stdout.trim().to_string())
187 }
188
189 pub async fn project_test(&self, path: &Path) -> Result<()> {
191 self.command()
192 .await
193 .context("failed to create project test command")?
194 .current_dir(path)
195 .arg("project")
196 .arg("test")
197 .spawn_and_wait_for_stdout()
198 .await?;
199 Ok(())
200 }
201
202 async fn command_with_envs_and_arguments(
203 &self,
204 envs: &[(&str, &str)],
205 arguments: impl IntoIterator<Item = Cow<'_, str>>,
206 ) -> Result<Command> {
207 let mut command = self.command_binary().await?;
208 command.current_dir(self.path_provider.path());
209 for (key, value) in envs {
210 command.env(key, value);
211 }
212 for argument in arguments {
213 command.arg(&*argument);
214 }
215 Ok(command)
216 }
217
218 async fn command_with_envs(&self, envs: &[(&str, &str)]) -> Result<Command> {
219 self.command_with_envs_and_arguments(envs, self.command_arguments())
220 .await
221 }
222
223 async fn command_with_arguments(
224 &self,
225 arguments: impl IntoIterator<Item = Cow<'_, str>>,
226 ) -> Result<Command> {
227 self.command_with_envs_and_arguments(
228 &[(
229 "RUST_LOG",
230 &std::env::var("RUST_LOG").unwrap_or_else(|_| String::from("linera=debug")),
231 )],
232 arguments,
233 )
234 .await
235 }
236
237 async fn command(&self) -> Result<Command> {
238 self.command_with_envs(&[(
239 "RUST_LOG",
240 &std::env::var("RUST_LOG").unwrap_or_else(|_| String::from("linera=debug")),
241 )])
242 .await
243 }
244
245 fn required_command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
246 [
247 "--wallet".into(),
248 self.wallet.as_str().into(),
249 "--keystore".into(),
250 self.keystore.as_str().into(),
251 "--storage".into(),
252 self.storage.as_str().into(),
253 "--send-timeout-ms".into(),
254 "500000".into(),
255 "--recv-timeout-ms".into(),
256 "500000".into(),
257 ]
258 .into_iter()
259 .chain(self.extra_args.iter().map(|s| s.as_str().into()))
260 }
261
262 fn command_arguments(&self) -> impl Iterator<Item = Cow<'_, str>> + '_ {
264 self.required_command_arguments().chain([
265 "--max-pending-message-bundles".into(),
266 self.max_pending_message_bundles.to_string().into(),
267 ])
268 }
269
270 async fn command_binary(&self) -> Result<Command> {
274 match self.command_with_cached_binary_path() {
275 Some(command) => Ok(command),
276 None => {
277 let resolved_path = resolve_binary("linera", env!("CARGO_PKG_NAME")).await?;
278 let command = Command::new(&resolved_path);
279
280 self.set_cached_binary_path(resolved_path);
281
282 Ok(command)
283 }
284 }
285 }
286
287 fn command_with_cached_binary_path(&self) -> Option<Command> {
289 let binary_path = self.binary_path.lock().unwrap();
290
291 binary_path.as_ref().map(Command::new)
292 }
293
294 fn set_cached_binary_path(&self, new_binary_path: PathBuf) {
302 let mut binary_path = self.binary_path.lock().unwrap();
303
304 if binary_path.is_none() {
305 *binary_path = Some(new_binary_path);
306 } else {
307 assert_eq!(*binary_path, Some(new_binary_path));
308 }
309 }
310
311 pub async fn create_genesis_config(
313 &self,
314 num_other_initial_chains: u32,
315 initial_funding: Amount,
316 policy_config: ResourceControlPolicyConfig,
317 http_allow_list: Option<Vec<String>>,
318 ) -> Result<()> {
319 let mut command = self.command().await?;
320 command
321 .args([
322 "create-genesis-config",
323 &num_other_initial_chains.to_string(),
324 ])
325 .args(["--initial-funding", &initial_funding.to_string()])
326 .args(["--committee", "committee.json"])
327 .args(["--genesis", "genesis.json"])
328 .args([
329 "--policy-config",
330 &policy_config.to_string().to_kebab_case(),
331 ]);
332 if let Some(allow_list) = http_allow_list {
333 command
334 .arg("--http-request-allow-list")
335 .arg(allow_list.join(","));
336 }
337 if let Some(seed) = self.testing_prng_seed {
338 command.arg("--testing-prng-seed").arg(seed.to_string());
339 }
340 command.spawn_and_wait_for_stdout().await?;
341 Ok(())
342 }
343
344 pub async fn wallet_init(&self, faucet: Option<&'_ Faucet>) -> Result<()> {
347 let mut command = self.command().await?;
348 command.args(["wallet", "init"]);
349 match faucet {
350 None => command.args(["--genesis", "genesis.json"]),
351 Some(faucet) => command.args(["--faucet", faucet.url()]),
352 };
353 if let Some(seed) = self.testing_prng_seed {
354 command.arg("--testing-prng-seed").arg(seed.to_string());
355 }
356 command.spawn_and_wait_for_stdout().await?;
357 Ok(())
358 }
359
360 pub async fn request_chain(
362 &self,
363 faucet: &Faucet,
364 set_default: bool,
365 ) -> Result<(ChainId, AccountOwner)> {
366 let mut command = self.command().await?;
367 command.args(["wallet", "request-chain", "--faucet", faucet.url()]);
368 if set_default {
369 command.arg("--set-default");
370 }
371 let stdout = command.spawn_and_wait_for_stdout().await?;
372 let mut lines = stdout.split_whitespace();
373 let chain_id: ChainId = lines.next().context("missing chain ID")?.parse()?;
374 let owner = lines.next().context("missing chain owner")?.parse()?;
375 Ok((chain_id, owner))
376 }
377
378 #[expect(clippy::too_many_arguments)]
380 pub async fn publish_and_create<
381 A: ContractAbi,
382 Parameters: Serialize,
383 InstantiationArgument: Serialize,
384 >(
385 &self,
386 contract: PathBuf,
387 service: PathBuf,
388 vm_runtime: VmRuntime,
389 parameters: &Parameters,
390 argument: &InstantiationArgument,
391 required_application_ids: &[ApplicationId],
392 publisher: impl Into<Option<ChainId>>,
393 ) -> Result<ApplicationId<A>> {
394 let json_parameters = serde_json::to_string(parameters)?;
395 let json_argument = serde_json::to_string(argument)?;
396 let mut command = self.command().await?;
397 let vm_runtime = format!("{}", vm_runtime);
398 command
399 .arg("publish-and-create")
400 .args([contract, service])
401 .args(["--vm-runtime", &vm_runtime.to_lowercase()])
402 .args(publisher.into().iter().map(ChainId::to_string))
403 .args(["--json-parameters", &json_parameters])
404 .args(["--json-argument", &json_argument]);
405 if !required_application_ids.is_empty() {
406 command.arg("--required-application-ids");
407 command.args(
408 required_application_ids
409 .iter()
410 .map(ApplicationId::to_string),
411 );
412 }
413 let stdout = command.spawn_and_wait_for_stdout().await?;
414 Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
415 }
416
417 pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
419 &self,
420 contract: PathBuf,
421 service: PathBuf,
422 vm_runtime: VmRuntime,
423 publisher: impl Into<Option<ChainId>>,
424 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
425 let stdout = self
426 .command()
427 .await?
428 .arg("publish-module")
429 .args([contract, service])
430 .args(["--vm-runtime", &format!("{}", vm_runtime).to_lowercase()])
431 .args(publisher.into().iter().map(ChainId::to_string))
432 .spawn_and_wait_for_stdout()
433 .await?;
434 let module_id: ModuleId = stdout.trim().parse()?;
435 Ok(module_id.with_abi())
436 }
437
438 pub async fn create_application<
440 Abi: ContractAbi,
441 Parameters: Serialize,
442 InstantiationArgument: Serialize,
443 >(
444 &self,
445 module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
446 parameters: &Parameters,
447 argument: &InstantiationArgument,
448 required_application_ids: &[ApplicationId],
449 creator: impl Into<Option<ChainId>>,
450 ) -> Result<ApplicationId<Abi>> {
451 let json_parameters = serde_json::to_string(parameters)?;
452 let json_argument = serde_json::to_string(argument)?;
453 let mut command = self.command().await?;
454 command
455 .arg("create-application")
456 .arg(module_id.forget_abi().to_string())
457 .args(["--json-parameters", &json_parameters])
458 .args(["--json-argument", &json_argument])
459 .args(creator.into().iter().map(ChainId::to_string));
460 if !required_application_ids.is_empty() {
461 command.arg("--required-application-ids");
462 command.args(
463 required_application_ids
464 .iter()
465 .map(ApplicationId::to_string),
466 );
467 }
468 let stdout = command.spawn_and_wait_for_stdout().await?;
469 Ok(stdout.trim().parse::<ApplicationId>()?.with_abi())
470 }
471
472 pub async fn run_node_service(
474 &self,
475 port: impl Into<Option<u16>>,
476 process_inbox: ProcessInbox,
477 ) -> Result<NodeService> {
478 self.run_node_service_with_options(port, process_inbox, &[], &[], false)
479 .await
480 }
481
482 pub async fn run_node_service_with_options(
484 &self,
485 port: impl Into<Option<u16>>,
486 process_inbox: ProcessInbox,
487 operator_application_ids: &[ApplicationId],
488 operators: &[(String, PathBuf)],
489 read_only: bool,
490 ) -> Result<NodeService> {
491 self.run_node_service_with_all_options(
492 port,
493 process_inbox,
494 operator_application_ids,
495 operators,
496 read_only,
497 &[],
498 &[],
499 )
500 .await
501 }
502
503 #[allow(clippy::too_many_arguments)]
505 pub async fn run_node_service_with_all_options(
506 &self,
507 port: impl Into<Option<u16>>,
508 process_inbox: ProcessInbox,
509 operator_application_ids: &[ApplicationId],
510 operators: &[(String, PathBuf)],
511 read_only: bool,
512 allowed_subscriptions: &[String],
513 subscription_ttls: &[(String, u64)],
514 ) -> Result<NodeService> {
515 let port = port.into().unwrap_or(8080);
516 let mut command = self.command().await?;
517 command.arg("service");
518 if let ProcessInbox::Skip = process_inbox {
519 command.arg("--listener-skip-process-inbox");
520 }
521 if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
522 command.args(var.split_whitespace());
523 }
524 for app_id in operator_application_ids {
525 command.args(["--operator-application-ids", &app_id.to_string()]);
526 }
527 for (name, path) in operators {
528 command.args(["--operators", &format!("{}={}", name, path.display())]);
529 }
530 if read_only {
531 command.arg("--read-only");
532 }
533 for query in allowed_subscriptions {
534 command.args(["--allow-subscription", query]);
535 }
536 for (name, secs) in subscription_ttls {
537 command.args(["--subscription-ttl-secs", &format!("{name}={secs}")]);
538 }
539 let child = command
540 .args(["--port".to_string(), port.to_string()])
541 .spawn_into()?;
542 let client = reqwest_client();
543 for i in 0..10 {
544 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
545 let request = client
546 .get(format!("http://localhost:{}/", port))
547 .send()
548 .await;
549 if request.is_ok() {
550 tracing::info!("Node service has started");
551 return Ok(NodeService::new(port, child));
552 } else {
553 tracing::warn!("Waiting for node service to start");
554 }
555 }
556 bail!("Failed to start node service");
557 }
558
559 pub async fn run_node_service_with_controller(
561 &self,
562 port: impl Into<Option<u16>>,
563 process_inbox: ProcessInbox,
564 controller_id: &ApplicationId,
565 operators: &[(String, PathBuf)],
566 ) -> Result<NodeService> {
567 let port = port.into().unwrap_or(8080);
568 let mut command = self.command().await?;
569 command.arg("service");
570 if let ProcessInbox::Skip = process_inbox {
571 command.arg("--listener-skip-process-inbox");
572 }
573 if let Ok(var) = env::var(CLIENT_SERVICE_ENV) {
574 command.args(var.split_whitespace());
575 }
576 command.args(["--controller-id", &controller_id.to_string()]);
577 for (name, path) in operators {
578 command.args(["--operators", &format!("{}={}", name, path.display())]);
579 }
580 let child = command
581 .args(["--port".to_string(), port.to_string()])
582 .spawn_into()?;
583 let client = reqwest_client();
584 for i in 0..10 {
585 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
586 let request = client
587 .get(format!("http://localhost:{}/", port))
588 .send()
589 .await;
590 if request.is_ok() {
591 tracing::info!("Node service has started");
592 return Ok(NodeService::new(port, child));
593 } else {
594 tracing::warn!("Waiting for node service to start");
595 }
596 }
597 bail!("Failed to start node service");
598 }
599
600 pub async fn query_validator(&self, address: &str) -> Result<CryptoHash> {
602 let mut command = self.command().await?;
603 command.arg("validator").arg("query").arg(address);
604 let stdout = command.spawn_and_wait_for_stdout().await?;
605
606 let hash = stdout
609 .lines()
610 .find_map(|line| {
611 line.strip_prefix("Genesis config hash: ")
612 .and_then(|hash_str| hash_str.trim().parse().ok())
613 })
614 .context("error while parsing the result of `linera validator query`")?;
615 Ok(hash)
616 }
617
618 pub async fn query_validators(&self, chain_id: Option<ChainId>) -> Result<()> {
620 let mut command = self.command().await?;
621 command.arg("validator").arg("list");
622 if let Some(chain_id) = chain_id {
623 command.args(["--chain-id", &chain_id.to_string()]);
624 }
625 command.spawn_and_wait_for_stdout().await?;
626 Ok(())
627 }
628
629 pub async fn sync_validator(
631 &self,
632 chain_ids: impl IntoIterator<Item = &ChainId>,
633 validator_address: impl Into<String>,
634 ) -> Result<()> {
635 let mut command = self.command().await?;
636 command
637 .arg("validator")
638 .arg("sync")
639 .arg(validator_address.into());
640 let mut chain_ids = chain_ids.into_iter().peekable();
641 if chain_ids.peek().is_some() {
642 command
643 .arg("--chains")
644 .args(chain_ids.map(ChainId::to_string));
645 }
646 command.spawn_and_wait_for_stdout().await?;
647 Ok(())
648 }
649
650 pub async fn run_faucet(
652 &self,
653 port: impl Into<Option<u16>>,
654 chain_id: Option<ChainId>,
655 amount: Amount,
656 ) -> Result<FaucetService> {
657 let port = port.into().unwrap_or(8080);
658 let temp_dir = tempfile::tempdir()
659 .context("Failed to create temporary directory for faucet storage")?;
660 let storage_path = temp_dir.path().join("faucet_storage.sqlite");
661 let mut command = self.command().await?;
662 let command = command
663 .arg("faucet")
664 .args(["--port".to_string(), port.to_string()])
665 .args(["--amount".to_string(), amount.to_string()])
666 .args([
667 "--storage-path".to_string(),
668 storage_path.to_string_lossy().to_string(),
669 ]);
670 if let Some(chain_id) = chain_id {
671 command.arg(chain_id.to_string());
672 }
673 let child = command.spawn_into()?;
674 let client = reqwest_client();
675 for i in 0..10 {
676 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
677 let request = client
678 .get(format!("http://localhost:{}/", port))
679 .send()
680 .await;
681 if request.is_ok() {
682 tracing::info!("Faucet has started");
683 return Ok(FaucetService::new(port, child, temp_dir));
684 } else {
685 tracing::debug!("Waiting for faucet to start");
686 }
687 }
688 bail!("Failed to start faucet");
689 }
690
691 pub async fn local_balance(&self, account: Account) -> Result<Amount> {
693 let stdout = self
694 .command()
695 .await?
696 .arg("local-balance")
697 .arg(account.to_string())
698 .spawn_and_wait_for_stdout()
699 .await?;
700 let amount = stdout
701 .trim()
702 .parse()
703 .context("error while parsing the result of `linera local-balance`")?;
704 Ok(amount)
705 }
706
707 pub async fn query_balance(&self, account: Account) -> Result<Amount> {
709 let stdout = self
710 .command()
711 .await?
712 .arg("query-balance")
713 .arg(account.to_string())
714 .spawn_and_wait_for_stdout()
715 .await?;
716 let amount = stdout
717 .trim()
718 .parse()
719 .context("error while parsing the result of `linera query-balance`")?;
720 Ok(amount)
721 }
722
723 pub async fn query_application_json<T: DeserializeOwned>(
725 &self,
726 chain_id: ChainId,
727 application_id: ApplicationId,
728 query: impl AsRef<str>,
729 ) -> Result<T> {
730 let query = query.as_ref().trim();
731 let name = query
732 .split_once(|ch: char| !ch.is_alphanumeric())
733 .map_or(query, |(name, _)| name);
734 let stdout = self
735 .command()
736 .await?
737 .arg("query-application")
738 .arg("--chain-id")
739 .arg(chain_id.to_string())
740 .arg("--application-id")
741 .arg(application_id.to_string())
742 .arg(query)
743 .spawn_and_wait_for_stdout()
744 .await?;
745 let data: serde_json::Value =
746 serde_json::from_str(stdout.trim()).context("invalid JSON from query-application")?;
747 serde_json::from_value(data[name].clone())
748 .with_context(|| format!("{name} field missing in query-application response"))
749 }
750
751 pub async fn sync(&self, chain_id: ChainId) -> Result<()> {
753 self.command()
754 .await?
755 .arg("sync")
756 .arg(chain_id.to_string())
757 .spawn_and_wait_for_stdout()
758 .await?;
759 Ok(())
760 }
761
762 pub async fn process_inbox(&self, chain_id: ChainId) -> Result<()> {
764 self.command()
765 .await?
766 .arg("process-inbox")
767 .arg(chain_id.to_string())
768 .spawn_and_wait_for_stdout()
769 .await?;
770 Ok(())
771 }
772
773 pub async fn transfer(&self, amount: Amount, from: ChainId, to: ChainId) -> Result<()> {
775 self.command()
776 .await?
777 .arg("transfer")
778 .arg(amount.to_string())
779 .args(["--from", &from.to_string()])
780 .args(["--to", &to.to_string()])
781 .spawn_and_wait_for_stdout()
782 .await?;
783 Ok(())
784 }
785
786 pub async fn transfer_with_silent_logs(
788 &self,
789 amount: Amount,
790 from: ChainId,
791 to: ChainId,
792 ) -> Result<()> {
793 self.command()
794 .await?
795 .env("RUST_LOG", "off")
796 .arg("transfer")
797 .arg(amount.to_string())
798 .args(["--from", &from.to_string()])
799 .args(["--to", &to.to_string()])
800 .spawn_and_wait_for_stdout()
801 .await?;
802 Ok(())
803 }
804
805 pub async fn transfer_with_accounts(
807 &self,
808 amount: Amount,
809 from: Account,
810 to: Account,
811 ) -> Result<()> {
812 self.command()
813 .await?
814 .arg("transfer")
815 .arg(amount.to_string())
816 .args(["--from", &from.to_string()])
817 .args(["--to", &to.to_string()])
818 .spawn_and_wait_for_stdout()
819 .await?;
820 Ok(())
821 }
822
823 fn benchmark_command_internal(command: &mut Command, args: &BenchmarkCommand) -> Result<()> {
824 let mut formatted_args = to_args(&args)?;
825 let subcommand = formatted_args.remove(0);
826 formatted_args.remove(0);
829 let options = formatted_args
830 .chunks_exact(2)
831 .flat_map(|pair| {
832 let option = format!("--{}", pair[0]);
833 match pair[1].as_str() {
834 "true" => vec![option],
835 "false" => vec![],
836 _ => vec![option, pair[1].clone()],
837 }
838 })
839 .collect::<Vec<_>>();
840 command
841 .args([
842 "--max-pending-message-bundles",
843 &args.transactions_per_block().to_string(),
844 ])
845 .arg("benchmark")
846 .arg(subcommand)
847 .args(options);
848 Ok(())
849 }
850
851 async fn benchmark_command_with_envs(
852 &self,
853 args: BenchmarkCommand,
854 envs: &[(&str, &str)],
855 ) -> Result<Command> {
856 let mut command = self
857 .command_with_envs_and_arguments(envs, self.required_command_arguments())
858 .await?;
859 Self::benchmark_command_internal(&mut command, &args)?;
860 Ok(command)
861 }
862
863 async fn benchmark_command(&self, args: BenchmarkCommand) -> Result<Command> {
864 let mut command = self
865 .command_with_arguments(self.required_command_arguments())
866 .await?;
867 Self::benchmark_command_internal(&mut command, &args)?;
868 Ok(command)
869 }
870
871 pub async fn benchmark(&self, args: BenchmarkCommand) -> Result<()> {
873 let mut command = self.benchmark_command(args).await?;
874 command.spawn_and_wait_for_stdout().await?;
875 Ok(())
876 }
877
878 pub async fn benchmark_detached(
881 &self,
882 args: BenchmarkCommand,
883 tx: oneshot::Sender<()>,
884 ) -> Result<(Child, JoinHandle<()>, JoinHandle<()>)> {
885 let mut child = self
886 .benchmark_command_with_envs(args, &[("RUST_LOG", "linera=info")])
887 .await?
888 .kill_on_drop(true)
889 .stdin(Stdio::piped())
890 .stdout(Stdio::piped())
891 .stderr(Stdio::piped())
892 .spawn()?;
893
894 let pid = child.id().expect("failed to get pid");
895 let stdout = child.stdout.take().expect("stdout not open");
896 let stdout_handle = tokio::spawn(async move {
897 let mut lines = BufReader::new(stdout).lines();
898 while let Ok(Some(line)) = lines.next_line().await {
899 println!("benchmark{{pid={pid}}} {line}");
900 }
901 });
902
903 let stderr = child.stderr.take().expect("stderr not open");
904 let stderr_handle = tokio::spawn(async move {
905 let mut lines = BufReader::new(stderr).lines();
906 let mut tx = Some(tx);
907 while let Ok(Some(line)) = lines.next_line().await {
908 if line.contains("Ready to start benchmark") {
909 tx.take()
910 .expect("Should only send signal once")
911 .send(())
912 .expect("failed to send ready signal to main thread");
913 } else {
914 println!("benchmark{{pid={pid}}} {line}");
915 }
916 }
917 });
918 Ok((child, stdout_handle, stderr_handle))
919 }
920
921 async fn open_chain_internal(
922 &self,
923 from: ChainId,
924 owner: Option<AccountOwner>,
925 initial_balance: Amount,
926 super_owner: bool,
927 ) -> Result<(ChainId, AccountOwner)> {
928 let mut command = self.command().await?;
929 command
930 .arg("open-chain")
931 .args(["--from", &from.to_string()])
932 .args(["--initial-balance", &initial_balance.to_string()]);
933
934 if let Some(owner) = owner {
935 command.args(["--owner", &owner.to_string()]);
936 }
937
938 if super_owner {
939 command.arg("--super-owner");
940 }
941
942 let stdout = command.spawn_and_wait_for_stdout().await?;
943 let mut split = stdout.split('\n');
944 let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
945 let new_owner = AccountOwner::from_str(split.next().context("no owner in output")?)?;
946 if let Some(owner) = owner {
947 assert_eq!(owner, new_owner);
948 }
949 Ok((chain_id, new_owner))
950 }
951
952 pub async fn open_chain_super_owner(
954 &self,
955 from: ChainId,
956 owner: Option<AccountOwner>,
957 initial_balance: Amount,
958 ) -> Result<(ChainId, AccountOwner)> {
959 self.open_chain_internal(from, owner, initial_balance, true)
960 .await
961 }
962
963 pub async fn open_chain(
965 &self,
966 from: ChainId,
967 owner: Option<AccountOwner>,
968 initial_balance: Amount,
969 ) -> Result<(ChainId, AccountOwner)> {
970 self.open_chain_internal(from, owner, initial_balance, false)
971 .await
972 }
973
974 pub async fn open_and_assign(
976 &self,
977 client: &ClientWrapper,
978 initial_balance: Amount,
979 ) -> Result<ChainId> {
980 let our_chain = self
981 .load_wallet()?
982 .default_chain()
983 .context("no default chain found")?;
984 let owner = client.keygen().await?;
985 let (new_chain, _) = self
986 .open_chain(our_chain, Some(owner), initial_balance)
987 .await?;
988 client.assign(owner, new_chain).await?;
989 Ok(new_chain)
990 }
991
992 pub async fn open_multi_owner_chain(
993 &self,
994 from: ChainId,
995 owners: BTreeMap<AccountOwner, u64>,
996 multi_leader_rounds: u32,
997 balance: Amount,
998 base_timeout_ms: u64,
999 ) -> Result<ChainId> {
1000 let mut command = self.command().await?;
1001 command
1002 .arg("open-multi-owner-chain")
1003 .args(["--from", &from.to_string()])
1004 .arg("--owners")
1005 .arg(serde_json::to_string(&owners)?)
1006 .args(["--base-timeout-ms", &base_timeout_ms.to_string()]);
1007 command
1008 .args(["--multi-leader-rounds", &multi_leader_rounds.to_string()])
1009 .args(["--initial-balance", &balance.to_string()]);
1010
1011 let stdout = command.spawn_and_wait_for_stdout().await?;
1012 let mut split = stdout.split('\n');
1013 let chain_id = ChainId::from_str(split.next().context("no chain ID in output")?)?;
1014
1015 Ok(chain_id)
1016 }
1017
1018 pub async fn change_ownership(
1019 &self,
1020 chain_id: ChainId,
1021 super_owners: Vec<AccountOwner>,
1022 owners: Vec<AccountOwner>,
1023 ) -> Result<()> {
1024 let mut command = self.command().await?;
1025 command
1026 .arg("change-ownership")
1027 .args(["--chain-id", &chain_id.to_string()]);
1028 command
1029 .arg("--super-owners")
1030 .arg(serde_json::to_string(&super_owners)?);
1031 command.arg("--owners").arg(serde_json::to_string(
1032 &owners
1033 .into_iter()
1034 .zip(std::iter::repeat(100u64))
1035 .collect::<BTreeMap<_, _>>(),
1036 )?);
1037 command.spawn_and_wait_for_stdout().await?;
1038 Ok(())
1039 }
1040
1041 pub async fn change_application_permissions(
1042 &self,
1043 chain_id: ChainId,
1044 application_permissions: ApplicationPermissions,
1045 ) -> Result<()> {
1046 let mut command = self.command().await?;
1047 command
1048 .arg("change-application-permissions")
1049 .args(["--chain-id", &chain_id.to_string()]);
1050 command.arg("--manage-chain").arg(serde_json::to_string(
1051 &application_permissions.manage_chain,
1052 )?);
1053 command.spawn_and_wait_for_stdout().await?;
1055 Ok(())
1056 }
1057
1058 pub async fn follow_chain(&self, chain_id: ChainId, sync: bool) -> Result<()> {
1060 let mut command = self.command().await?;
1061 command
1062 .args(["wallet", "follow-chain"])
1063 .arg(chain_id.to_string());
1064 if sync {
1065 command.arg("--sync");
1066 }
1067 command.spawn_and_wait_for_stdout().await?;
1068 Ok(())
1069 }
1070
1071 pub async fn forget_chain(&self, chain_id: ChainId) -> Result<()> {
1073 let mut command = self.command().await?;
1074 command
1075 .args(["wallet", "forget-chain"])
1076 .arg(chain_id.to_string());
1077 command.spawn_and_wait_for_stdout().await?;
1078 Ok(())
1079 }
1080
1081 pub async fn set_default_chain(&self, chain_id: ChainId) -> Result<()> {
1083 let mut command = self.command().await?;
1084 command
1085 .args(["wallet", "set-default"])
1086 .arg(chain_id.to_string());
1087 command.spawn_and_wait_for_stdout().await?;
1088 Ok(())
1089 }
1090
1091 pub async fn retry_pending_block(
1092 &self,
1093 chain_id: Option<ChainId>,
1094 ) -> Result<Option<CryptoHash>> {
1095 let mut command = self.command().await?;
1096 command.arg("retry-pending-block");
1097 if let Some(chain_id) = chain_id {
1098 command.arg(chain_id.to_string());
1099 }
1100 let stdout = command.spawn_and_wait_for_stdout().await?;
1101 let stdout = stdout.trim();
1102 if stdout.is_empty() {
1103 Ok(None)
1104 } else {
1105 Ok(Some(CryptoHash::from_str(stdout)?))
1106 }
1107 }
1108
1109 pub async fn publish_data_blob(
1111 &self,
1112 path: &Path,
1113 chain_id: Option<ChainId>,
1114 ) -> Result<CryptoHash> {
1115 let mut command = self.command().await?;
1116 command.arg("publish-data-blob").arg(path);
1117 if let Some(chain_id) = chain_id {
1118 command.arg(chain_id.to_string());
1119 }
1120 let stdout = command.spawn_and_wait_for_stdout().await?;
1121 let stdout = stdout.trim();
1122 Ok(CryptoHash::from_str(stdout)?)
1123 }
1124
1125 pub async fn read_data_blob(&self, hash: CryptoHash, chain_id: Option<ChainId>) -> Result<()> {
1127 let mut command = self.command().await?;
1128 command.arg("read-data-blob").arg(hash.to_string());
1129 if let Some(chain_id) = chain_id {
1130 command.arg(chain_id.to_string());
1131 }
1132 command.spawn_and_wait_for_stdout().await?;
1133 Ok(())
1134 }
1135
1136 pub fn load_wallet(&self) -> Result<Wallet> {
1137 Ok(Wallet::read(&self.wallet_path())?)
1138 }
1139
1140 pub fn load_keystore(&self) -> Result<InMemorySigner> {
1141 util::read_json(self.keystore_path())
1142 }
1143
1144 pub fn wallet_path(&self) -> PathBuf {
1145 self.path_provider.path().join(&self.wallet)
1146 }
1147
1148 pub fn keystore_path(&self) -> PathBuf {
1149 self.path_provider.path().join(&self.keystore)
1150 }
1151
1152 pub fn storage_path(&self) -> &str {
1153 &self.storage
1154 }
1155
1156 pub fn get_owner(&self) -> Option<AccountOwner> {
1157 let wallet = self.load_wallet().ok()?;
1158 wallet
1159 .get(wallet.default_chain()?)
1160 .expect("default chain must be in wallet")
1161 .owner
1162 }
1163
1164 pub fn is_chain_present_in_wallet(&self, chain: ChainId) -> bool {
1165 self.load_wallet()
1166 .ok()
1167 .is_some_and(|wallet| wallet.get(chain).is_some())
1168 }
1169
1170 pub async fn set_validator(
1171 &self,
1172 validator_key: &(String, String),
1173 port: usize,
1174 votes: usize,
1175 ) -> Result<()> {
1176 let address = format!("{}:127.0.0.1:{}", self.network.short(), port);
1177 self.command()
1178 .await?
1179 .arg("validator")
1180 .arg("add")
1181 .args(["--public-key", &validator_key.0])
1182 .args(["--account-key", &validator_key.1])
1183 .args(["--address", &address])
1184 .args(["--votes", &votes.to_string()])
1185 .spawn_and_wait_for_stdout()
1186 .await?;
1187 Ok(())
1188 }
1189
1190 pub async fn remove_validator(&self, validator_key: &str) -> Result<()> {
1191 self.command()
1192 .await?
1193 .arg("validator")
1194 .arg("remove")
1195 .args(["--public-key", validator_key])
1196 .spawn_and_wait_for_stdout()
1197 .await?;
1198 Ok(())
1199 }
1200
1201 pub async fn change_validators(
1202 &self,
1203 add_validators: &[(String, String, usize, usize)], modify_validators: &[(String, String, usize, usize)], remove_validators: &[String],
1206 ) -> Result<()> {
1207 use std::str::FromStr;
1208
1209 use linera_base::crypto::{AccountPublicKey, ValidatorPublicKey};
1210
1211 let mut changes = std::collections::HashMap::new();
1214
1215 for (public_key_str, account_key_str, port, votes) in
1217 add_validators.iter().chain(modify_validators.iter())
1218 {
1219 let public_key = ValidatorPublicKey::from_str(public_key_str)
1220 .with_context(|| format!("Invalid validator public key: {}", public_key_str))?;
1221
1222 let account_key = AccountPublicKey::from_str(account_key_str)
1223 .with_context(|| format!("Invalid account public key: {}", account_key_str))?;
1224
1225 let address = format!("{}:127.0.0.1:{}", self.network.short(), port)
1226 .parse()
1227 .unwrap();
1228
1229 let change = crate::cli::validator::Change {
1231 account_key,
1232 address,
1233 votes: crate::cli::validator::Votes(
1234 std::num::NonZero::new(*votes as u64).context("Votes must be non-zero")?,
1235 ),
1236 };
1237
1238 changes.insert(public_key, Some(change));
1239 }
1240
1241 for validator_key_str in remove_validators {
1243 let public_key = ValidatorPublicKey::from_str(validator_key_str)
1244 .with_context(|| format!("Invalid validator public key: {}", validator_key_str))?;
1245 changes.insert(public_key, None);
1246 }
1247
1248 let temp_file = tempfile::NamedTempFile::new()
1250 .context("Failed to create temporary file for validator changes")?;
1251 serde_json::to_writer(&temp_file, &changes)
1252 .context("Failed to write validator changes to file")?;
1253 let temp_path = temp_file.path();
1254
1255 self.command()
1256 .await?
1257 .arg("validator")
1258 .arg("update")
1259 .arg(temp_path)
1260 .arg("--yes") .spawn_and_wait_for_stdout()
1262 .await?;
1263
1264 Ok(())
1265 }
1266
1267 pub async fn revoke_epochs(&self, epoch: Epoch) -> Result<()> {
1268 self.command()
1269 .await?
1270 .arg("revoke-epochs")
1271 .arg(epoch.to_string())
1272 .spawn_and_wait_for_stdout()
1273 .await?;
1274 Ok(())
1275 }
1276
1277 pub async fn keygen(&self) -> Result<AccountOwner> {
1279 let stdout = self
1280 .command()
1281 .await?
1282 .arg("keygen")
1283 .spawn_and_wait_for_stdout()
1284 .await?;
1285 AccountOwner::from_str(stdout.as_str().trim())
1286 }
1287
1288 pub fn default_chain(&self) -> Option<ChainId> {
1290 self.load_wallet().ok()?.default_chain()
1291 }
1292
1293 pub async fn assign(&self, owner: AccountOwner, chain_id: ChainId) -> Result<()> {
1295 let _stdout = self
1296 .command()
1297 .await?
1298 .arg("assign")
1299 .args(["--owner", &owner.to_string()])
1300 .args(["--chain-id", &chain_id.to_string()])
1301 .spawn_and_wait_for_stdout()
1302 .await?;
1303 Ok(())
1304 }
1305
1306 pub async fn set_preferred_owner(
1308 &self,
1309 chain_id: ChainId,
1310 owner: Option<AccountOwner>,
1311 ) -> Result<()> {
1312 let mut owner_arg = vec!["--owner".to_string()];
1313 if let Some(owner) = owner {
1314 owner_arg.push(owner.to_string());
1315 };
1316 self.command()
1317 .await?
1318 .arg("set-preferred-owner")
1319 .args(["--chain-id", &chain_id.to_string()])
1320 .args(owner_arg)
1321 .spawn_and_wait_for_stdout()
1322 .await?;
1323 Ok(())
1324 }
1325
1326 pub async fn build_application(
1327 &self,
1328 path: &Path,
1329 name: &str,
1330 is_workspace: bool,
1331 ) -> Result<(PathBuf, PathBuf)> {
1332 Command::new("cargo")
1333 .current_dir(self.path_provider.path())
1334 .arg("build")
1335 .arg("--release")
1336 .args(["--target", "wasm32-unknown-unknown"])
1337 .arg("--manifest-path")
1338 .arg(path.join("Cargo.toml"))
1339 .spawn_and_wait_for_stdout()
1340 .await?;
1341
1342 let release_dir = match is_workspace {
1343 true => path.join("../target/wasm32-unknown-unknown/release"),
1344 false => path.join("target/wasm32-unknown-unknown/release"),
1345 };
1346
1347 let contract = release_dir.join(format!("{}_contract.wasm", name.replace('-', "_")));
1348 let service = release_dir.join(format!("{}_service.wasm", name.replace('-', "_")));
1349
1350 let contract_size = fs_err::tokio::metadata(&contract).await?.len();
1351 let service_size = fs_err::tokio::metadata(&service).await?.len();
1352 tracing::info!("Done building application {name}: contract_size={contract_size}, service_size={service_size}");
1353
1354 Ok((contract, service))
1355 }
1356}
1357
1358impl Drop for ClientWrapper {
1359 fn drop(&mut self) {
1360 use std::process::Command as SyncCommand;
1361
1362 if self.on_drop != OnClientDrop::CloseChains {
1363 return;
1364 }
1365
1366 let Ok(binary_path) = self.binary_path.lock() else {
1367 tracing::error!(
1368 "Failed to close chains because a thread panicked with a lock to `binary_path`"
1369 );
1370 return;
1371 };
1372
1373 let Some(binary_path) = binary_path.as_ref() else {
1374 tracing::warn!(
1375 "Assuming no chains need to be closed, because the command binary was never \
1376 resolved and therefore presumably never called"
1377 );
1378 return;
1379 };
1380
1381 let working_directory = self.path_provider.path();
1382 let mut wallet_show_command = SyncCommand::new(binary_path);
1383
1384 for argument in self.command_arguments() {
1385 wallet_show_command.arg(&*argument);
1386 }
1387
1388 let Ok(wallet_show_output) = wallet_show_command
1389 .current_dir(working_directory)
1390 .args(["wallet", "show", "--short", "--owned"])
1391 .output()
1392 else {
1393 tracing::warn!("Failed to execute `wallet show --short` to list chains to close");
1394 return;
1395 };
1396
1397 if !wallet_show_output.status.success() {
1398 tracing::warn!("Failed to list chains in the wallet to close them");
1399 return;
1400 }
1401
1402 let Ok(chain_list_string) = String::from_utf8(wallet_show_output.stdout) else {
1403 tracing::warn!(
1404 "Failed to close chains because `linera wallet show --short` \
1405 returned a non-UTF-8 output"
1406 );
1407 return;
1408 };
1409
1410 let chain_ids = chain_list_string
1411 .split('\n')
1412 .map(|line| line.trim())
1413 .filter(|line| !line.is_empty());
1414
1415 for chain_id in chain_ids {
1416 let mut close_chain_command = SyncCommand::new(binary_path);
1417
1418 for argument in self.command_arguments() {
1419 close_chain_command.arg(&*argument);
1420 }
1421
1422 close_chain_command.current_dir(working_directory);
1423
1424 match close_chain_command.args(["close-chain", chain_id]).status() {
1425 Ok(status) if status.success() => (),
1426 Ok(failure) => tracing::warn!("Failed to close chain {chain_id}: {failure}"),
1427 Err(error) => tracing::warn!("Failed to close chain {chain_id}: {error}"),
1428 }
1429 }
1430 }
1431}
1432
1433#[cfg(with_testing)]
1434impl ClientWrapper {
1435 pub async fn build_example(&self, name: &str) -> Result<(PathBuf, PathBuf)> {
1436 self.build_application(Self::example_path(name)?.as_path(), name, true)
1437 .await
1438 }
1439
1440 pub fn example_path(name: &str) -> Result<PathBuf> {
1441 Ok(env::current_dir()?.join("../examples/").join(name))
1442 }
1443}
1444
1445fn truncate_query_output(input: &str) -> String {
1446 let max_len = 1000;
1447 if input.len() < max_len {
1448 input.to_string()
1449 } else {
1450 format!("{} ...", input.get(..max_len).unwrap())
1451 }
1452}
1453
1454fn truncate_query_output_serialize<T: Serialize>(query: T) -> String {
1455 let query = serde_json::to_string(&query).expect("Failed to serialize the failed query");
1456 let max_len = 200;
1457 if query.len() < max_len {
1458 query
1459 } else {
1460 format!("{} ...", query.get(..max_len).unwrap())
1461 }
1462}
1463
1464fn log_unexpected_exit(child: &mut Child, service_kind: &str, port: u16) {
1468 match child.try_wait() {
1469 Ok(Some(status)) => {
1470 #[cfg(unix)]
1471 {
1472 use std::os::unix::process::ExitStatusExt as _;
1473 if let Some(signal) = status.signal() {
1474 tracing::error!(
1475 port,
1476 signal,
1477 "The {service_kind} service was killed by signal {signal}",
1478 );
1479 return;
1480 }
1481 }
1482 if !status.success() {
1483 tracing::error!(
1484 port,
1485 %status,
1486 "The {service_kind} service exited unexpectedly with {status}",
1487 );
1488 }
1489 }
1490 Ok(None) => {} Err(error) => {
1492 tracing::warn!(
1493 port,
1494 %error,
1495 "Failed to check {service_kind} service status",
1496 );
1497 }
1498 }
1499}
1500
1501pub struct NodeService {
1502 port: u16,
1503 child: Child,
1504 terminated: bool,
1505}
1506
1507impl Drop for NodeService {
1508 fn drop(&mut self) {
1509 if !self.terminated {
1510 log_unexpected_exit(&mut self.child, "node", self.port);
1511 }
1512 }
1513}
1514
1515impl NodeService {
1516 fn new(port: u16, child: Child) -> Self {
1517 Self {
1518 port,
1519 child,
1520 terminated: false,
1521 }
1522 }
1523
1524 pub async fn terminate(mut self) -> Result<()> {
1525 self.terminated = true;
1526 self.child.kill().await.context("terminating node service")
1527 }
1528
1529 pub fn port(&self) -> u16 {
1530 self.port
1531 }
1532
1533 pub fn ensure_is_running(&mut self) -> Result<()> {
1534 self.child.ensure_is_running()
1535 }
1536
1537 pub async fn process_inbox(&self, chain_id: &ChainId) -> Result<Vec<CryptoHash>> {
1538 let query = format!("mutation {{ processInbox(chainId: \"{chain_id}\") }}");
1539 let mut data = self.query_node(query).await?;
1540 Ok(serde_json::from_value(data["processInbox"].take())?)
1541 }
1542
1543 pub async fn sync(&self, chain_id: &ChainId) -> Result<u64> {
1544 let query = format!("mutation {{ sync(chainId: \"{chain_id}\") }}");
1545 let mut data = self.query_node(query).await?;
1546 Ok(serde_json::from_value(data["sync"].take())?)
1547 }
1548
1549 pub async fn transfer(
1550 &self,
1551 chain_id: ChainId,
1552 owner: AccountOwner,
1553 recipient: Account,
1554 amount: Amount,
1555 ) -> Result<CryptoHash> {
1556 let json_owner = owner.to_value();
1557 let json_recipient = recipient.to_value();
1558 let query = format!(
1559 "mutation {{ transfer(\
1560 chainId: \"{chain_id}\", \
1561 owner: {json_owner}, \
1562 recipient: {json_recipient}, \
1563 amount: \"{amount}\") \
1564 }}"
1565 );
1566 let data = self.query_node(query).await?;
1567 serde_json::from_value(data["transfer"].clone())
1568 .context("missing transfer field in response")
1569 }
1570
1571 pub async fn balance(&self, account: &Account) -> Result<Amount> {
1572 let chain = account.chain_id;
1573 let owner = account.owner;
1574 if matches!(owner, AccountOwner::CHAIN) {
1575 let query = format!(
1576 "query {{ chain(chainId:\"{chain}\") {{
1577 executionState {{ system {{ balance }} }}
1578 }} }}"
1579 );
1580 let response = self.query_node(query).await?;
1581 let balance = &response["chain"]["executionState"]["system"]["balance"]
1582 .as_str()
1583 .unwrap();
1584 return Ok(Amount::from_str(balance)?);
1585 }
1586 let query = format!(
1587 "query {{ chain(chainId:\"{chain}\") {{
1588 executionState {{ system {{ balances {{
1589 entry(key:\"{owner}\") {{ value }}
1590 }} }} }}
1591 }} }}"
1592 );
1593 let response = self.query_node(query).await?;
1594 let balances = &response["chain"]["executionState"]["system"]["balances"];
1595 let balance = balances["entry"]["value"].as_str();
1596 match balance {
1597 None => Ok(Amount::ZERO),
1598 Some(amount) => Ok(Amount::from_str(amount)?),
1599 }
1600 }
1601
1602 pub fn make_application<A: ContractAbi>(
1603 &self,
1604 chain_id: &ChainId,
1605 application_id: &ApplicationId<A>,
1606 ) -> Result<ApplicationWrapper<A>> {
1607 let application_id = application_id.forget_abi().to_string();
1608 let link = format!(
1609 "http://localhost:{}/chains/{chain_id}/applications/{application_id}",
1610 self.port
1611 );
1612 Ok(ApplicationWrapper::from(link))
1613 }
1614
1615 pub async fn publish_data_blob(
1616 &self,
1617 chain_id: &ChainId,
1618 bytes: Vec<u8>,
1619 ) -> Result<CryptoHash> {
1620 let query = format!(
1621 "mutation {{ publishDataBlob(chainId: {}, bytes: {}) }}",
1622 chain_id.to_value(),
1623 bytes.to_value(),
1624 );
1625 let data = self.query_node(query).await?;
1626 serde_json::from_value(data["publishDataBlob"].clone())
1627 .context("missing publishDataBlob field in response")
1628 }
1629
1630 pub async fn publish_module<Abi, Parameters, InstantiationArgument>(
1631 &self,
1632 chain_id: &ChainId,
1633 contract: PathBuf,
1634 service: PathBuf,
1635 vm_runtime: VmRuntime,
1636 ) -> Result<ModuleId<Abi, Parameters, InstantiationArgument>> {
1637 let contract_code = Bytecode::load_from_file(&contract).await?;
1638 let service_code = Bytecode::load_from_file(&service).await?;
1639 let query = format!(
1640 "mutation {{ publishModule(chainId: {}, contract: {}, service: {}, vmRuntime: {}) }}",
1641 chain_id.to_value(),
1642 contract_code.to_value(),
1643 service_code.to_value(),
1644 vm_runtime.to_value(),
1645 );
1646 let data = self.query_node(query).await?;
1647 let module_str = data["publishModule"]
1648 .as_str()
1649 .context("module ID not found")?;
1650 let module_id: ModuleId = module_str.parse().context("could not parse module ID")?;
1651 Ok(module_id.with_abi())
1652 }
1653
1654 pub async fn query_committees(
1655 &self,
1656 chain_id: &ChainId,
1657 ) -> Result<BTreeMap<Epoch, CryptoHash>> {
1658 let query = format!(
1659 "query {{ chain(chainId:\"{chain_id}\") {{
1660 executionState {{ system {{ committees }} }}
1661 }} }}"
1662 );
1663 let mut response = self.query_node(query).await?;
1664 let committees = response["chain"]["executionState"]["system"]["committees"].take();
1665 Ok(serde_json::from_value(committees)?)
1666 }
1667
1668 pub async fn events_from_index(
1669 &self,
1670 chain_id: &ChainId,
1671 stream_id: &StreamId,
1672 start_index: u32,
1673 ) -> Result<Vec<IndexAndEvent>> {
1674 let query = format!(
1675 "query {{
1676 eventsFromIndex(chainId: \"{chain_id}\", streamId: {}, startIndex: {start_index})
1677 {{ index event }}
1678 }}",
1679 stream_id.to_value()
1680 );
1681 let mut response = self.query_node(query).await?;
1682 let response = response["eventsFromIndex"].take();
1683 Ok(serde_json::from_value(response)?)
1684 }
1685
1686 pub async fn query_node(&self, query: impl AsRef<str>) -> Result<Value> {
1687 let n_try = 5;
1688 let query = query.as_ref();
1689 for i in 0..n_try {
1690 linera_base::time::timer::sleep(Duration::from_secs(i)).await;
1691 let url = format!("http://localhost:{}/", self.port);
1692 let client = reqwest_client();
1693 let result = client
1694 .post(url)
1695 .json(&json!({ "query": query }))
1696 .send()
1697 .await;
1698 if matches!(result, Err(ref error) if error.is_timeout()) {
1699 tracing::warn!(
1700 "Timeout when sending query {} to the node service",
1701 truncate_query_output(query)
1702 );
1703 continue;
1704 }
1705 let response = result.with_context(|| {
1706 format!(
1707 "query_node: failed to post query={}",
1708 truncate_query_output(query)
1709 )
1710 })?;
1711 ensure!(
1712 response.status().is_success(),
1713 "Query \"{}\" failed: {}",
1714 truncate_query_output(query),
1715 response
1716 .text()
1717 .await
1718 .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1719 );
1720 let value: Value = response.json().await.context("invalid JSON")?;
1721 if let Some(errors) = value.get("errors") {
1722 tracing::warn!(
1723 "Query \"{}\" failed: {}",
1724 truncate_query_output(query),
1725 errors
1726 );
1727 } else {
1728 return Ok(value["data"].clone());
1729 }
1730 }
1731 bail!(
1732 "Query \"{}\" failed after {} retries.",
1733 truncate_query_output(query),
1734 n_try
1735 );
1736 }
1737
1738 pub async fn create_application<
1739 Abi: ContractAbi,
1740 Parameters: Serialize,
1741 InstantiationArgument: Serialize,
1742 >(
1743 &self,
1744 chain_id: &ChainId,
1745 module_id: &ModuleId<Abi, Parameters, InstantiationArgument>,
1746 parameters: &Parameters,
1747 argument: &InstantiationArgument,
1748 required_application_ids: &[ApplicationId],
1749 ) -> Result<ApplicationId<Abi>> {
1750 let module_id = module_id.forget_abi();
1751 let json_required_applications_ids = required_application_ids
1752 .iter()
1753 .map(ApplicationId::to_string)
1754 .collect::<Vec<_>>()
1755 .to_value();
1756 let new_parameters = serde_json::to_value(parameters)
1758 .context("could not create parameters JSON")?
1759 .to_value();
1760 let new_argument = serde_json::to_value(argument)
1761 .context("could not create argument JSON")?
1762 .to_value();
1763 let query = format!(
1764 "mutation {{ createApplication(\
1765 chainId: \"{chain_id}\",
1766 moduleId: \"{module_id}\", \
1767 parameters: {new_parameters}, \
1768 instantiationArgument: {new_argument}, \
1769 requiredApplicationIds: {json_required_applications_ids}) \
1770 }}"
1771 );
1772 let data = self.query_node(query).await?;
1773 let app_id_str = data["createApplication"]
1774 .as_str()
1775 .context("missing createApplication string in response")?
1776 .trim();
1777 Ok(app_id_str
1778 .parse::<ApplicationId>()
1779 .context("invalid application ID")?
1780 .with_abi())
1781 }
1782
1783 pub async fn chain_tip(&self, chain: ChainId) -> Result<Option<(CryptoHash, BlockHeight)>> {
1785 let query = format!(
1786 r#"query {{ block(chainId: "{chain}") {{
1787 hash
1788 block {{ header {{ height }} }}
1789 }} }}"#
1790 );
1791
1792 let mut response = self.query_node(&query).await?;
1793
1794 match (
1795 mem::take(&mut response["block"]["hash"]),
1796 mem::take(&mut response["block"]["block"]["header"]["height"]),
1797 ) {
1798 (Value::Null, Value::Null) => Ok(None),
1799 (Value::String(hash), Value::Number(height)) => Ok(Some((
1800 hash.parse()
1801 .context("Received an invalid hash {hash:?} for chain tip")?,
1802 BlockHeight(height.as_u64().unwrap()),
1803 ))),
1804 invalid_data => bail!("Expected a tip hash string, but got {invalid_data:?} instead"),
1805 }
1806 }
1807
1808 pub async fn notifications(
1810 &self,
1811 chain_id: ChainId,
1812 ) -> Result<Pin<Box<impl Stream<Item = Result<Notification>>>>> {
1813 let query = format!("subscription {{ notifications(chainId: \"{chain_id}\") }}",);
1814 let url = format!("ws://localhost:{}/ws", self.port);
1815 let mut request = url.into_client_request()?;
1816 request.headers_mut().insert(
1817 "Sec-WebSocket-Protocol",
1818 HeaderValue::from_str("graphql-transport-ws")?,
1819 );
1820 let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1821 let init_json = json!({
1822 "type": "connection_init",
1823 "payload": {}
1824 });
1825 websocket.send(init_json.to_string().into()).await?;
1826 let text = websocket
1827 .next()
1828 .await
1829 .context("Failed to establish connection")??
1830 .into_text()?;
1831 ensure!(
1832 text == "{\"type\":\"connection_ack\"}",
1833 "Unexpected response: {text}"
1834 );
1835 let query_json = json!({
1836 "id": "1",
1837 "type": "start",
1838 "payload": {
1839 "query": query,
1840 "variables": {},
1841 "operationName": null
1842 }
1843 });
1844 websocket.send(query_json.to_string().into()).await?;
1845 Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
1846 |message| async {
1847 let text = message.into_text()?;
1848 let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1849 if let Some(errors) = value["payload"].get("errors") {
1850 bail!("Notification subscription failed: {errors:?}");
1851 }
1852 serde_json::from_value(value["payload"]["data"]["notifications"].clone())
1853 .context("Failed to deserialize notification")
1854 },
1855 )))
1856 }
1857
1858 pub async fn query_result(
1860 &self,
1861 name: &str,
1862 chain_id: ChainId,
1863 application_id: &ApplicationId,
1864 ) -> Result<Pin<Box<impl Stream<Item = Result<Value>>>>> {
1865 let query = format!(
1866 r#"subscription {{ queryResult(name: "{name}", chainId: "{chain_id}", applicationId: "{application_id}") }}"#,
1867 );
1868 let url = format!("ws://localhost:{}/ws", self.port);
1869 let mut request = url.into_client_request()?;
1870 request.headers_mut().insert(
1871 "Sec-WebSocket-Protocol",
1872 HeaderValue::from_str("graphql-transport-ws")?,
1873 );
1874 let (mut websocket, _) = async_tungstenite::tokio::connect_async(request).await?;
1875 let init_json = json!({
1876 "type": "connection_init",
1877 "payload": {}
1878 });
1879 websocket.send(init_json.to_string().into()).await?;
1880 let text = websocket
1881 .next()
1882 .await
1883 .context("Failed to establish connection")??
1884 .into_text()?;
1885 ensure!(
1886 text == "{\"type\":\"connection_ack\"}",
1887 "Unexpected response: {text}"
1888 );
1889 let query_json = json!({
1890 "id": "1",
1891 "type": "start",
1892 "payload": {
1893 "query": query,
1894 "variables": {},
1895 "operationName": null
1896 }
1897 });
1898 websocket.send(query_json.to_string().into()).await?;
1899 Ok(Box::pin(websocket.map_err(anyhow::Error::from).and_then(
1900 |message| async {
1901 let text = message.into_text()?;
1902 let value: Value = serde_json::from_str(&text).context("invalid JSON")?;
1903 if let Some(errors) = value["payload"].get("errors") {
1904 bail!("Query result subscription failed: {errors:?}");
1905 }
1906 Ok(value["payload"]["data"]["queryResult"].clone())
1907 },
1908 )))
1909 }
1910}
1911
1912pub struct FaucetService {
1914 port: u16,
1915 child: Child,
1916 _temp_dir: tempfile::TempDir,
1917 terminated: bool,
1918}
1919
1920impl Drop for FaucetService {
1921 fn drop(&mut self) {
1922 if !self.terminated {
1923 log_unexpected_exit(&mut self.child, "faucet", self.port);
1924 }
1925 }
1926}
1927
1928impl FaucetService {
1929 fn new(port: u16, child: Child, temp_dir: tempfile::TempDir) -> Self {
1930 Self {
1931 port,
1932 child,
1933 _temp_dir: temp_dir,
1934 terminated: false,
1935 }
1936 }
1937
1938 pub async fn terminate(mut self) -> Result<()> {
1939 self.terminated = true;
1940 self.child
1941 .kill()
1942 .await
1943 .context("terminating faucet service")
1944 }
1945
1946 pub fn ensure_is_running(&mut self) -> Result<()> {
1947 self.child.ensure_is_running()
1948 }
1949
1950 pub fn instance(&self) -> Faucet {
1951 Faucet::new(format!("http://localhost:{}/", self.port))
1952 }
1953}
1954
1955pub struct ApplicationWrapper<A> {
1957 uri: String,
1958 _phantom: PhantomData<A>,
1959}
1960
1961impl<A> ApplicationWrapper<A> {
1962 pub async fn run_graphql_query(&self, query: impl AsRef<str>) -> Result<Value> {
1963 let query = query.as_ref();
1964 let value = self.run_json_query(json!({ "query": query })).await?;
1965 Ok(value["data"].clone())
1966 }
1967
1968 pub async fn run_json_query<T: Serialize>(&self, query: T) -> Result<Value> {
1969 const MAX_RETRIES: usize = 5;
1970
1971 for i in 0.. {
1972 let client = reqwest_client();
1973 let result = client.post(&self.uri).json(&query).send().await;
1974 let response = match result {
1975 Ok(response) => response,
1976 Err(error) if i < MAX_RETRIES => {
1977 tracing::warn!(
1978 "Failed to post query \"{}\": {error}; retrying",
1979 truncate_query_output_serialize(&query),
1980 );
1981 continue;
1982 }
1983 Err(error) => {
1984 let query = truncate_query_output_serialize(&query);
1985 return Err(error)
1986 .with_context(|| format!("run_json_query: failed to post query={query}"));
1987 }
1988 };
1989 ensure!(
1990 response.status().is_success(),
1991 "Query \"{}\" failed: {}",
1992 truncate_query_output_serialize(&query),
1993 response
1994 .text()
1995 .await
1996 .unwrap_or_else(|error| format!("Could not get response text: {error}"))
1997 );
1998 let value: Value = response.json().await.context("invalid JSON")?;
1999 if let Some(errors) = value.get("errors") {
2000 bail!(
2001 "Query \"{}\" failed: {}",
2002 truncate_query_output_serialize(&query),
2003 errors
2004 );
2005 }
2006 return Ok(value);
2007 }
2008 unreachable!()
2009 }
2010
2011 pub async fn query(&self, query: impl AsRef<str>) -> Result<Value> {
2012 let query = query.as_ref();
2013 self.run_graphql_query(&format!("query {{ {query} }}"))
2014 .await
2015 }
2016
2017 pub async fn query_json<T: DeserializeOwned>(&self, query: impl AsRef<str>) -> Result<T> {
2018 let query = query.as_ref().trim();
2019 let name = query
2020 .split_once(|ch: char| !ch.is_alphanumeric())
2021 .map_or(query, |(name, _)| name);
2022 let data = self.query(query).await?;
2023 serde_json::from_value(data[name].clone())
2024 .with_context(|| format!("{name} field missing in response"))
2025 }
2026
2027 pub async fn mutate(&self, mutation: impl AsRef<str>) -> Result<Value> {
2028 let mutation = mutation.as_ref();
2029 self.run_graphql_query(&format!("mutation {{ {mutation} }}"))
2030 .await
2031 }
2032
2033 pub async fn multiple_mutate(&self, mutations: &[String]) -> Result<Value> {
2034 let mut out = String::from("mutation {\n");
2035 for (index, mutation) in mutations.iter().enumerate() {
2036 out = format!("{} u{}: {}\n", out, index, mutation);
2037 }
2038 out.push_str("}\n");
2039 self.run_graphql_query(&out).await
2040 }
2041}
2042
2043impl<A> From<String> for ApplicationWrapper<A> {
2044 fn from(uri: String) -> ApplicationWrapper<A> {
2045 ApplicationWrapper {
2046 uri,
2047 _phantom: PhantomData,
2048 }
2049 }
2050}
2051
2052#[cfg(with_testing)]
2055fn notification_timeout() -> Duration {
2056 const NOTIFICATION_TIMEOUT_MS_ENV: &str = "LINERA_TEST_NOTIFICATION_TIMEOUT_MS";
2057 const NOTIFICATION_TIMEOUT_MS_DEFAULT: u64 = 10_000;
2058
2059 match env::var(NOTIFICATION_TIMEOUT_MS_ENV) {
2060 Ok(var) => Duration::from_millis(var.parse().unwrap_or_else(|error| {
2061 panic!("{NOTIFICATION_TIMEOUT_MS_ENV} is not a valid number: {error}")
2062 })),
2063 Err(env::VarError::NotPresent) => Duration::from_millis(NOTIFICATION_TIMEOUT_MS_DEFAULT),
2064 Err(env::VarError::NotUnicode(_)) => {
2065 panic!("{NOTIFICATION_TIMEOUT_MS_ENV} must be valid Unicode")
2066 }
2067 }
2068}
2069
2070#[cfg(with_testing)]
2071pub trait NotificationsExt {
2072 fn wait_for<T>(
2074 &mut self,
2075 f: impl FnMut(Notification) -> Option<T>,
2076 ) -> impl Future<Output = Result<T>>;
2077
2078 fn wait_for_events(
2081 &mut self,
2082 expected_height: impl Into<Option<BlockHeight>>,
2083 ) -> impl Future<Output = Result<BTreeSet<StreamId>>> {
2084 let expected_height = expected_height.into();
2085 self.wait_for(move |notification| {
2086 if let Reason::NewEvents {
2087 height,
2088 event_streams,
2089 ..
2090 } = notification.reason
2091 {
2092 if expected_height.is_none_or(|h| h == height) {
2093 return Some(event_streams);
2094 }
2095 }
2096 None
2097 })
2098 }
2099
2100 fn wait_for_block(
2103 &mut self,
2104 expected_height: impl Into<Option<BlockHeight>>,
2105 ) -> impl Future<Output = Result<CryptoHash>> {
2106 let expected_height = expected_height.into();
2107 self.wait_for(move |notification| {
2108 if let Reason::NewBlock { height, hash, .. } = notification.reason {
2109 if expected_height.is_none_or(|h| h == height) {
2110 return Some(hash);
2111 }
2112 }
2113 None
2114 })
2115 }
2116
2117 fn wait_for_bundle(
2120 &mut self,
2121 expected_origin: ChainId,
2122 expected_height: impl Into<Option<BlockHeight>>,
2123 ) -> impl Future<Output = Result<()>> {
2124 let expected_height = expected_height.into();
2125 self.wait_for(move |notification| {
2126 if let Reason::NewIncomingBundle { height, origin } = notification.reason {
2127 if expected_height.is_none_or(|h| h == height) && origin == expected_origin {
2128 return Some(());
2129 }
2130 }
2131 None
2132 })
2133 }
2134}
2135
2136#[cfg(with_testing)]
2137impl<S: Stream<Item = Result<Notification>>> NotificationsExt for Pin<Box<S>> {
2138 async fn wait_for<T>(&mut self, mut f: impl FnMut(Notification) -> Option<T>) -> Result<T> {
2139 let mut timeout = Box::pin(linera_base::time::timer::sleep(notification_timeout())).fuse();
2140 loop {
2141 let notification = futures::select! {
2142 () = timeout => bail!("Timeout waiting for notification"),
2143 notification = self.next().fuse() => notification.context("Stream closed")??,
2144 };
2145 if let Some(t) = f(notification) {
2146 return Ok(t);
2147 }
2148 }
2149 }
2150}