1use std::{collections::HashMap, num::NonZero, str::FromStr};
7
8use anyhow::Context as _;
9use futures::stream::TryStreamExt as _;
10use linera_base::{
11 crypto::{AccountPublicKey, ValidatorPublicKey},
12 identifiers::ChainId,
13};
14use linera_client::{chain_listener::ClientContext as _, client_context::ClientContext};
15use linera_core::{data_types::ClientOutcome, node::ValidatorNodeProvider, Wallet as _};
16use linera_execution::committee::{Committee, ValidatorState};
17use serde::{Deserialize, Serialize};
18
19#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
23pub struct Votes(pub NonZero<u64>);
24
25impl Default for Votes {
26 fn default() -> Self {
27 Self(nonzero_lit::u64!(1))
28 }
29}
30
31impl FromStr for Votes {
32 type Err = <NonZero<u64> as FromStr>::Err;
33 fn from_str(s: &str) -> Result<Self, Self::Err> {
34 Ok(Votes(s.parse()?))
35 }
36}
37
38#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
40#[serde(rename_all = "camelCase")]
41pub struct Spec {
42 pub public_key: ValidatorPublicKey,
43 pub account_key: AccountPublicKey,
44 pub network_address: url::Url,
45 #[serde(default)]
46 pub votes: Votes,
47}
48
49#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
51#[serde(rename_all = "camelCase")]
52pub struct Change {
53 pub account_key: AccountPublicKey,
54 pub address: url::Url,
55 #[serde(default)]
56 pub votes: Votes,
57}
58
59pub type BatchFile = HashMap<ValidatorPublicKey, Option<Change>>;
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct QueryBatch {
69 pub validators: Vec<Spec>,
70}
71
72#[derive(Debug, Clone, clap::Subcommand)]
74pub enum Command {
75 Add(Add),
76 BatchQuery(BatchQuery),
77 Update(Update),
78 List(List),
79 Query(Query),
80 Remove(Remove),
81 Sync(Sync),
82}
83
84#[derive(Debug, Clone, clap::Parser)]
89pub struct Add {
90 #[arg(long)]
92 public_key: ValidatorPublicKey,
93 #[arg(long)]
95 account_key: AccountPublicKey,
96 #[arg(long)]
98 address: url::Url,
99 #[arg(long, required = false)]
101 votes: Votes,
102 #[arg(long)]
104 skip_online_check: bool,
105}
106
107#[derive(Debug, Clone, clap::Parser)]
112pub struct BatchQuery {
113 file: clio::Input,
115 #[arg(long)]
117 chain_id: Option<ChainId>,
118}
119
120#[derive(Debug, Clone, clap::Parser)]
129pub struct Update {
130 #[arg(required = false)]
132 file: clio::Input,
133 #[arg(long)]
135 dry_run: bool,
136 #[arg(long, short = 'y')]
138 yes: bool,
139 #[arg(long)]
141 skip_online_check: bool,
142}
143
144#[derive(Debug, Clone, clap::Parser)]
149pub struct List {
150 #[arg(long)]
152 chain_id: Option<ChainId>,
153 #[arg(long)]
155 min_votes: Option<u64>,
156}
157
158#[derive(Debug, Clone, clap::Parser)]
163pub struct Query {
164 address: String,
166 #[arg(long)]
168 chain_id: Option<ChainId>,
169 #[arg(long)]
171 public_key: Option<ValidatorPublicKey>,
172}
173
174#[derive(Debug, Clone, clap::Parser)]
179pub struct Remove {
180 #[arg(long)]
182 public_key: ValidatorPublicKey,
183}
184
185#[derive(Debug, Clone, clap::Parser)]
190pub struct Sync {
191 address: String,
193 #[arg(long)]
195 chains: Vec<ChainId>,
196 #[arg(long)]
198 check_online: bool,
199}
200
201fn parse_batch_file(input: clio::Input) -> anyhow::Result<BatchFile> {
204 Ok(serde_json::from_reader(input)?)
205}
206
207fn parse_query_batch_file(input: clio::Input) -> anyhow::Result<QueryBatch> {
209 Ok(serde_json::from_reader(input)?)
210}
211
212impl Command {
213 pub async fn run(
215 &self,
216 context: &mut ClientContext<
217 impl linera_core::Environment<ValidatorNode = linera_rpc::Client>,
218 >,
219 ) -> anyhow::Result<()> {
220 use Command::*;
221
222 match self {
223 Add(command) => command.run(context).await,
224 BatchQuery(command) => command.run(context).await,
225 Update(command) => command.run(context).await,
226 List(command) => command.run(context).await,
227 Query(command) => command.run(context).await,
228 Remove(command) => command.run(context).await,
229 Sync(command) => Box::pin(command.run(context)).await,
230 }
231 }
232}
233
234impl Add {
235 async fn run(
236 &self,
237 context: &mut ClientContext<impl linera_core::Environment>,
238 ) -> anyhow::Result<()> {
239 tracing::info!("Starting operation to add validator");
240 let time_start = std::time::Instant::now();
241
242 if !self.skip_online_check {
244 let node = context
245 .make_node_provider()
246 .make_node(self.address.as_str())?;
247 context
248 .check_compatible_version_info(self.address.as_str(), &node)
249 .await?;
250 context
251 .check_matching_network_description(self.address.as_str(), &node)
252 .await?;
253 }
254
255 let admin_id = context.admin_chain();
256 let chain_client = context.make_chain_client(admin_id).await?;
257
258 chain_client.synchronize_chain_state(admin_id).await?;
260
261 let maybe_certificate = context
262 .apply_client_command(&chain_client, |chain_client| {
263 let me = self.clone();
264 let chain_client = chain_client.clone();
265 async move {
266 let mut committee = chain_client.local_committee().await?;
268 let policy = committee.policy().clone();
269 let mut validators = committee.validators().clone();
270
271 validators.insert(
272 me.public_key,
273 ValidatorState {
274 network_address: me.address.to_string(),
275 votes: me.votes.0.get(),
276 account_public_key: me.account_key,
277 },
278 );
279
280 committee = Committee::new(validators, policy);
281 chain_client
282 .stage_new_committee(committee)
283 .await
284 .map(|outcome| outcome.map(Some))
285 }
286 })
287 .await
288 .context("Failed to stage committee")?;
289
290 let Some(certificate) = maybe_certificate else {
291 return Ok(());
292 };
293 tracing::info!("Created new committee:\n{:?}", certificate);
294
295 let time_total = time_start.elapsed();
296 tracing::info!("Operation confirmed after {} ms", time_total.as_millis());
297
298 Ok(())
299 }
300}
301
302impl BatchQuery {
303 async fn run(
304 &self,
305 context: &mut ClientContext<impl linera_core::Environment>,
306 ) -> anyhow::Result<()> {
307 let batch = parse_query_batch_file(self.file.clone())
308 .context("parsing query batch file `{file}`")?;
309 let chain_id = self.chain_id.unwrap_or_else(|| context.default_chain());
310 println!(
311 "Querying {} validators about chain {chain_id}.\n",
312 batch.validators.len()
313 );
314
315 let node_provider = context.make_node_provider();
316 let mut has_errors = false;
317
318 for spec in batch.validators {
319 let node = node_provider.make_node(spec.network_address.as_str())?;
320 let results = context
321 .query_validator(
322 spec.network_address.as_str(),
323 &node,
324 chain_id,
325 Some(&spec.public_key),
326 )
327 .await;
328
329 if !results.errors().is_empty() {
330 has_errors = true;
331 for error in results.errors() {
332 tracing::error!("Validator {}: {}", spec.public_key, error);
333 }
334 }
335
336 results.print(
337 Some(&spec.public_key),
338 Some(spec.network_address.as_str()),
339 None,
340 None,
341 );
342 }
343
344 if has_errors {
345 anyhow::bail!("Found issues while querying validators");
346 }
347
348 Ok(())
349 }
350}
351
352impl Update {
353 async fn run(
354 &self,
355 context: &mut ClientContext<impl linera_core::Environment>,
356 ) -> anyhow::Result<()> {
357 tracing::info!("Starting batch update operation");
358 let time_start = std::time::Instant::now();
359
360 let batch = parse_batch_file(self.file.clone())
362 .with_context(|| format!("parsing batch file `{}`", self.file))?;
363
364 if batch.is_empty() {
365 tracing::warn!("No validator changes specified in input.");
366 return Ok(());
367 }
368
369 let mut adds = Vec::new();
371 let mut modifies = Vec::new();
372 let mut removes = Vec::new();
373
374 let admin_id = context.client().admin_chain();
376 let chain_client = context.make_chain_client(admin_id).await?;
377 let current_committee = chain_client.local_committee().await?;
378 let current_validators = current_committee.validators();
379
380 for (public_key, change_opt) in &batch {
381 match change_opt {
382 None => {
383 removes.push(*public_key);
385 }
386 Some(spec) => {
387 if current_validators.contains_key(public_key) {
388 modifies.push((public_key, spec));
389 } else {
390 adds.push((public_key, spec));
391 }
392 }
393 }
394 }
395
396 println!(
398 "\n╔══════════════════════════════════════════════════════════════════════════════╗"
399 );
400 println!(
401 "║ VALIDATOR BATCH UPDATE RECAP ║"
402 );
403 println!(
404 "╚══════════════════════════════════════════════════════════════════════════════╝\n"
405 );
406
407 println!("Summary:");
408 println!(" • {} validator(s) to add", adds.len());
409 println!(" • {} validator(s) to modify", modifies.len());
410 println!(" • {} validator(s) to remove", removes.len());
411 println!();
412
413 if !adds.is_empty() {
414 println!("Validators to ADD:");
415 for (pk, spec) in &adds {
416 println!(" + {}", pk);
417 println!(" Address: {}", spec.address);
418 println!(" Account Key: {}", spec.account_key);
419 println!(" Votes: {}", spec.votes.0.get());
420 }
421 println!();
422 }
423
424 if !modifies.is_empty() {
425 println!("Validators to MODIFY:");
426 for (pk, spec) in &modifies {
427 println!(" * {}", pk);
428 println!(" New Address: {}", spec.address);
429 println!(" New Account Key: {}", spec.account_key);
430 println!(" New Votes: {}", spec.votes.0.get());
431 }
432 println!();
433 }
434
435 if !removes.is_empty() {
436 println!("Validators to REMOVE:");
437 for pk in &removes {
438 println!(" - {}", pk);
439 }
440 println!();
441 }
442
443 if self.dry_run {
444 println!(
445 "═════════════════════════════════════════════════════════════════════════════"
446 );
447 println!("DRY RUN MODE: No changes will be applied");
448 println!(
449 "═════════════════════════════════════════════════════════════════════════════\n"
450 );
451 return Ok(());
452 }
453
454 if !self.yes {
456 println!(
457 "═════════════════════════════════════════════════════════════════════════════"
458 );
459 println!("⚠️ WARNING: This operation will modify the validator committee.");
460 println!(" Changes are permanent and will be broadcast to the network.");
461 println!(
462 "═════════════════════════════════════════════════════════════════════════════\n"
463 );
464 println!("Do you want to proceed? Type 'YES' (uppercase) to confirm: ");
465
466 use std::io::{self, Write};
467 io::stdout().flush()?;
468
469 let mut input = String::new();
470 io::stdin()
471 .read_line(&mut input)
472 .context("Failed to read confirmation input")?;
473
474 let input = input.trim();
475 if input != "YES" {
476 println!("\nOperation cancelled. (Expected 'YES', got '{}')", input);
477 return Ok(());
478 }
479 println!("\nConfirmed. Proceeding with batch update...\n");
480 }
481
482 if !self.skip_online_check {
484 let node_provider = context.make_node_provider();
485
486 tracing::info!("Checking validators are online...");
487 for (_, spec) in adds.iter().chain(modifies.iter()) {
488 let address = &spec.address;
489 let node = node_provider.make_node(address.as_str())?;
490 context
491 .check_compatible_version_info(address.as_str(), &node)
492 .await?;
493 context
494 .check_matching_network_description(address.as_str(), &node)
495 .await?;
496 }
497 }
498
499 let admin_id = context.admin_chain();
500 let chain_client = context.make_chain_client(admin_id).await?;
501
502 chain_client.synchronize_chain_state(admin_id).await?;
504
505 let batch_clone = batch.clone();
506 let maybe_certificate = context
507 .apply_client_command(&chain_client, |chain_client| {
508 let chain_client = chain_client.clone();
509 let batch = batch_clone.clone();
510 async move {
511 let mut committee = chain_client.local_committee().await?;
513 let policy = committee.policy().clone();
514 let mut validators = committee.validators().clone();
515
516 for (public_key, change_opt) in &batch {
518 if let Some(spec) = change_opt {
519 let address = &spec.address;
521 let votes = spec.votes.0.get();
522 let account_key = spec.account_key;
523
524 let exists = validators.contains_key(public_key);
525 validators.insert(
526 *public_key,
527 ValidatorState {
528 network_address: address.to_string(),
529 votes,
530 account_public_key: account_key,
531 },
532 );
533
534 if exists {
535 tracing::info!(
536 "Modified validator {} @ {} ({} votes)",
537 public_key,
538 address,
539 votes
540 );
541 } else {
542 tracing::info!(
543 "Added validator {} @ {} ({} votes)",
544 public_key,
545 address,
546 votes
547 );
548 }
549 } else {
550 if validators.remove(public_key).is_none() {
552 tracing::warn!(
553 "Validator {} does not exist; skipping remove",
554 public_key
555 );
556 } else {
557 tracing::info!("Removed validator {}", public_key);
558 }
559 }
560 }
561
562 committee = Committee::new(validators, policy);
564 chain_client
565 .stage_new_committee(committee)
566 .await
567 .map(|outcome| outcome.map(Some))
568 }
569 })
570 .await
571 .context("Failed to stage committee")?;
572
573 let Some(certificate) = maybe_certificate else {
574 tracing::info!("No changes applied");
575 return Ok(());
576 };
577
578 tracing::info!("Created new committee:\n{:?}", certificate);
579 let time_total = time_start.elapsed();
580 tracing::info!("Batch update confirmed after {} ms", time_total.as_millis());
581
582 Ok(())
583 }
584}
585
586impl List {
587 async fn run(
588 &self,
589 context: &mut ClientContext<impl linera_core::Environment>,
590 ) -> anyhow::Result<()> {
591 let chain_id = self.chain_id.unwrap_or_else(|| context.default_chain());
592 println!("Querying validators about chain {chain_id}.\n");
593
594 let local_results = context.query_local_node(chain_id).await?;
595 let chain_client = context.make_chain_client(chain_id).await?;
596 tracing::info!("Querying validators about chain {}", chain_id);
597 let result = chain_client.local_committee().await;
598 context.update_wallet_from_client(&chain_client).await?;
599 let committee = result.context("Failed to get local committee")?;
600
601 tracing::info!(
602 "Using the local set of validators: {:?}",
603 committee.validators()
604 );
605
606 let node_provider = context.make_node_provider();
607 let mut validator_results = Vec::new();
608
609 for (name, state) in committee.validators() {
610 if self.min_votes.is_some_and(|votes| state.votes < votes) {
611 continue; }
613 let address = &state.network_address;
614 let node = node_provider.make_node(address)?;
615 let results = context
616 .query_validator(address, &node, chain_id, Some(name))
617 .await;
618 validator_results.push((name, address, state.votes, results));
619 }
620
621 let mut faulty_validators = std::collections::BTreeMap::<_, Vec<_>>::new();
622 for (name, address, _votes, results) in &validator_results {
623 for error in results.errors() {
624 tracing::error!("{}", error);
625 faulty_validators
626 .entry((*name, *address))
627 .or_default()
628 .push(error);
629 }
630 }
631
632 println!("Local Node:");
634 local_results.print(None, None, None, None);
635 println!();
636
637 for (name, address, votes, results) in &validator_results {
639 results.print(
640 Some(name),
641 Some(address),
642 Some(*votes),
643 Some(&local_results),
644 );
645 }
646
647 if !faulty_validators.is_empty() {
648 println!("\nFaulty validators:");
649 for ((name, address), errors) in faulty_validators {
650 println!(" {} at {}: {} error(s)", name, address, errors.len());
651 }
652 anyhow::bail!("Found faulty validators");
653 }
654
655 Ok(())
656 }
657}
658
659impl Query {
660 async fn run(
661 &self,
662 context: &mut ClientContext<impl linera_core::Environment>,
663 ) -> anyhow::Result<()> {
664 let node = context.make_node_provider().make_node(&self.address)?;
665 let chain_id = self.chain_id.unwrap_or_else(|| context.default_chain());
666 println!("Querying validator about chain {chain_id}.\n");
667
668 let results = context
669 .query_validator(&self.address, &node, chain_id, self.public_key.as_ref())
670 .await;
671
672 for error in results.errors() {
673 tracing::error!("{}", error);
674 }
675
676 results.print(self.public_key.as_ref(), Some(&self.address), None, None);
677
678 if !results.errors().is_empty() {
679 anyhow::bail!(
680 "Found one or several issue(s) while querying validator {}",
681 self.address
682 );
683 }
684
685 Ok(())
686 }
687}
688
689impl Remove {
690 async fn run(
691 &self,
692 context: &mut ClientContext<impl linera_core::Environment>,
693 ) -> anyhow::Result<()> {
694 tracing::info!("Starting operation to remove validator");
695 let time_start = std::time::Instant::now();
696
697 let admin_id = context.admin_chain();
698 let chain_client = context.make_chain_client(admin_id).await?;
699
700 chain_client.synchronize_chain_state(admin_id).await?;
702
703 let maybe_certificate = context
704 .apply_client_command(&chain_client, |chain_client| {
705 let chain_client = chain_client.clone();
706 async move {
707 let mut committee = chain_client.local_committee().await?;
709 let policy = committee.policy().clone();
710 let mut validators = committee.validators().clone();
711
712 if validators.remove(&self.public_key).is_none() {
713 tracing::error!("Validator {} does not exist; aborting.", self.public_key);
714 return Ok(ClientOutcome::Committed(None));
715 }
716
717 committee = Committee::new(validators, policy);
718 chain_client
719 .stage_new_committee(committee)
720 .await
721 .map(|outcome| outcome.map(Some))
722 }
723 })
724 .await
725 .context("Failed to stage committee")?;
726
727 let Some(certificate) = maybe_certificate else {
728 return Ok(());
729 };
730 tracing::info!("Created new committee:\n{:?}", certificate);
731
732 let time_total = time_start.elapsed();
733 tracing::info!("Operation confirmed after {} ms", time_total.as_millis());
734
735 Ok(())
736 }
737}
738
739impl Sync {
740 async fn run(
741 &self,
742 context: &mut ClientContext<
743 impl linera_core::Environment<ValidatorNode = linera_rpc::Client>,
744 >,
745 ) -> anyhow::Result<()> {
746 tracing::info!("Starting sync operation for validator at {}", self.address);
747
748 if self.check_online {
750 let node_provider = context.make_node_provider();
751 let node = node_provider.make_node(&self.address)?;
752 context
753 .check_compatible_version_info(&self.address, &node)
754 .await?;
755 context
756 .check_matching_network_description(&self.address, &node)
757 .await?;
758 }
759
760 let chains_to_sync = if self.chains.is_empty() {
762 context.wallet().chain_ids().try_collect().await?
763 } else {
764 self.chains.clone()
765 };
766
767 tracing::info!(
768 "Syncing {} chains to validator {}",
769 chains_to_sync.len(),
770 self.address
771 );
772
773 let node_provider = context.make_node_provider();
775 let validator = node_provider.make_node(&self.address)?;
776
777 for chain_id in chains_to_sync {
779 tracing::info!("Syncing chain {} to {}", chain_id, self.address);
780 let chain = context.make_chain_client(chain_id).await?;
781
782 Box::pin(chain.sync_validator(validator.clone())).await?;
783 tracing::info!("Chain {} synced successfully", chain_id);
784 }
785
786 tracing::info!("Sync operation completed successfully");
787 Ok(())
788 }
789}
790
791#[cfg(test)]
792mod tests {
793 use std::io::Write;
794
795 use tempfile::NamedTempFile;
796
797 use super::*;
798
799 #[test]
800 fn test_parse_batch_file_valid() {
801 let pk0 = ValidatorPublicKey::test_key(0);
803 let pk1 = ValidatorPublicKey::test_key(1);
804 let pk2 = ValidatorPublicKey::test_key(2);
805
806 let mut batch = BatchFile::new();
807
808 batch.insert(
810 pk0,
811 Some(Change {
812 account_key: AccountPublicKey::test_key(0),
813 address: "grpcs://validator1.example.com:443".parse().unwrap(),
814 votes: Votes(NonZero::new(100).unwrap()),
815 }),
816 );
817
818 batch.insert(
820 pk1,
821 Some(Change {
822 account_key: AccountPublicKey::test_key(1),
823 address: "grpcs://validator2.example.com:443".parse().unwrap(),
824 votes: Votes(NonZero::new(150).unwrap()),
825 }),
826 );
827
828 batch.insert(pk2, None);
830
831 let json = serde_json::to_string(&batch).unwrap();
832
833 let mut temp_file = NamedTempFile::new().unwrap();
834 temp_file.write_all(json.as_bytes()).unwrap();
835 temp_file.flush().unwrap();
836
837 let input = clio::Input::new(temp_file.path().to_str().unwrap()).unwrap();
838 let result = parse_batch_file(input);
839 assert!(
840 result.is_ok(),
841 "Failed to parse batch file: {:?}",
842 result.err()
843 );
844
845 let parsed_batch = result.unwrap();
846 assert_eq!(parsed_batch.len(), 3);
847
848 assert!(parsed_batch.contains_key(&pk0));
850 let spec0 = parsed_batch.get(&pk0).unwrap().as_ref().unwrap();
851 assert_eq!(spec0.votes.0.get(), 100);
852
853 assert!(parsed_batch.contains_key(&pk1));
855 let spec1 = parsed_batch.get(&pk1).unwrap().as_ref().unwrap();
856 assert_eq!(spec1.votes.0.get(), 150);
857
858 assert!(parsed_batch.contains_key(&pk2));
860 assert!(parsed_batch.get(&pk2).unwrap().is_none());
861 }
862
863 #[test]
864 fn test_parse_batch_file_empty() {
865 let json = r#"{}"#;
866
867 let mut temp_file = NamedTempFile::new().unwrap();
868 temp_file.write_all(json.as_bytes()).unwrap();
869 temp_file.flush().unwrap();
870
871 let input = clio::Input::new(temp_file.path().to_str().unwrap()).unwrap();
872 let result = parse_batch_file(input);
873 assert!(result.is_ok());
874
875 let batch = result.unwrap();
876 assert_eq!(batch.len(), 0);
877 }
878
879 #[test]
880 fn test_parse_query_batch_file_valid() {
881 let spec1 = Spec {
883 public_key: ValidatorPublicKey::test_key(0),
884 account_key: AccountPublicKey::test_key(0),
885 network_address: "grpcs://validator1.example.com:443".parse().unwrap(),
886 votes: Votes(NonZero::new(100).unwrap()),
887 };
888 let spec2 = Spec {
889 public_key: ValidatorPublicKey::test_key(1),
890 account_key: AccountPublicKey::test_key(1),
891 network_address: "grpcs://validator2.example.com:443".parse().unwrap(),
892 votes: Votes(NonZero::new(150).unwrap()),
893 };
894
895 let batch = QueryBatch {
896 validators: vec![spec1, spec2],
897 };
898
899 let json = serde_json::to_string(&batch).unwrap();
900
901 let mut temp_file = NamedTempFile::new().unwrap();
902 temp_file.write_all(json.as_bytes()).unwrap();
903 temp_file.flush().unwrap();
904
905 let result = parse_query_batch_file(temp_file.path().try_into().unwrap());
906 assert!(
907 result.is_ok(),
908 "Failed to parse query batch file: {:?}",
909 result.err()
910 );
911
912 let parsed_batch = result.unwrap();
913 assert_eq!(parsed_batch.validators.len(), 2);
914 assert_eq!(parsed_batch.validators[0].votes.0.get(), 100);
915 assert_eq!(parsed_batch.validators[1].votes.0.get(), 150);
916 }
917}