1#[cfg(with_testing)]
5use std::sync::LazyLock;
6use std::{
7 collections::BTreeMap,
8 env,
9 path::{Path, PathBuf},
10 sync::Arc,
11 time::Duration,
12};
13
14use anyhow::{anyhow, bail, ensure, Context, Result};
15#[cfg(with_testing)]
16use async_lock::RwLock;
17use async_trait::async_trait;
18use linera_base::{
19 command::{resolve_binary, CommandExt},
20 data_types::Amount,
21};
22use linera_client::client_options::ResourceControlPolicyConfig;
23use linera_core::node::ValidatorNodeProvider;
24use linera_rpc::config::CrossChainConfig;
25#[cfg(all(feature = "storage-service", with_testing))]
26use linera_storage_service::common::storage_service_test_endpoint;
27#[cfg(all(feature = "rocksdb", feature = "scylladb", with_testing))]
28use linera_views::rocks_db::{RocksDbSpawnMode, RocksDbStore};
29#[cfg(all(feature = "scylladb", with_testing))]
30use linera_views::{scylla_db::ScyllaDbStore, store::TestKeyValueStore as _};
31use tempfile::{tempdir, TempDir};
32use tokio::process::{Child, Command};
33use tonic::transport::{channel::ClientTlsConfig, Endpoint};
34use tonic_health::pb::{
35 health_check_response::ServingStatus, health_client::HealthClient, HealthCheckRequest,
36};
37use tracing::{error, info, warn};
38
39use crate::{
40 cli_wrappers::{
41 ClientWrapper, LineraNet, LineraNetConfig, Network, NetworkConfig, OnClientDrop,
42 },
43 storage::{StorageConfig, StorageConfigNamespace},
44 util::ChildExt,
45};
46
47pub enum ProcessInbox {
48 Skip,
49 Automatic,
50}
51
52#[cfg(with_testing)]
53static PORT_PROVIDER: LazyLock<RwLock<u16>> = LazyLock::new(|| RwLock::new(7080));
54
55#[cfg(with_testing)]
57pub async fn get_node_port() -> u16 {
58 let mut port = PORT_PROVIDER.write().await;
59 let port_ret = *port;
60 *port += 1;
61 info!("get_node_port returning port_ret={}", port_ret);
62 assert!(port_selector::is_free(port_ret));
63 port_ret
64}
65
66#[cfg(with_testing)]
67async fn make_testing_config(database: Database) -> Result<StorageConfig> {
68 match database {
69 Database::Service => {
70 #[cfg(feature = "storage-service")]
71 {
72 let endpoint = storage_service_test_endpoint()
73 .expect("Reading LINERA_STORAGE_SERVICE environment variable");
74 Ok(StorageConfig::Service { endpoint })
75 }
76 #[cfg(not(feature = "storage-service"))]
77 panic!("Database::Service is selected without the feature storage_service");
78 }
79 Database::DynamoDb => {
80 #[cfg(feature = "dynamodb")]
81 {
82 let use_dynamodb_local = true;
83 Ok(StorageConfig::DynamoDb { use_dynamodb_local })
84 }
85 #[cfg(not(feature = "dynamodb"))]
86 panic!("Database::DynamoDb is selected without the feature dynamodb");
87 }
88 Database::ScyllaDb => {
89 #[cfg(feature = "scylladb")]
90 {
91 let config = ScyllaDbStore::new_test_config().await?;
92 Ok(StorageConfig::ScyllaDb {
93 uri: config.inner_config.uri,
94 })
95 }
96 #[cfg(not(feature = "scylladb"))]
97 panic!("Database::ScyllaDb is selected without the feature scylladb");
98 }
99 Database::DualRocksDbScyllaDb => {
100 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
101 {
102 let rocksdb_config = RocksDbStore::new_test_config().await?;
103 let scylla_config = ScyllaDbStore::new_test_config().await?;
104 let spawn_mode = RocksDbSpawnMode::get_spawn_mode_from_runtime();
105 Ok(StorageConfig::DualRocksDbScyllaDb {
106 path_with_guard: rocksdb_config.inner_config.path_with_guard,
107 spawn_mode,
108 uri: scylla_config.inner_config.uri,
109 })
110 }
111 #[cfg(not(all(feature = "rocksdb", feature = "scylladb")))]
112 panic!("Database::DualRocksDbScyllaDb is selected without the features rocksdb and scylladb");
113 }
114 }
115}
116
117pub enum StorageConfigBuilder {
118 #[cfg(with_testing)]
119 TestConfig,
120 ExistingConfig {
121 storage_config: StorageConfig,
122 },
123}
124
125impl StorageConfigBuilder {
126 #[allow(unused_variables)]
127 pub async fn build(self, database: Database) -> Result<StorageConfig> {
128 match self {
129 #[cfg(with_testing)]
130 StorageConfigBuilder::TestConfig => make_testing_config(database).await,
131 StorageConfigBuilder::ExistingConfig { storage_config } => Ok(storage_config),
132 }
133 }
134}
135
136#[derive(Clone)]
139pub enum PathProvider {
140 ExternalPath { path_buf: PathBuf },
141 TemporaryDirectory { tmp_dir: Arc<TempDir> },
142}
143
144impl PathProvider {
145 pub fn path(&self) -> &Path {
146 match self {
147 PathProvider::ExternalPath { path_buf } => path_buf.as_path(),
148 PathProvider::TemporaryDirectory { tmp_dir } => tmp_dir.path(),
149 }
150 }
151
152 pub fn create_temporary_directory() -> Result<Self> {
153 let tmp_dir = Arc::new(tempdir()?);
154 Ok(PathProvider::TemporaryDirectory { tmp_dir })
155 }
156
157 pub fn new(path: &Option<String>) -> anyhow::Result<Self> {
158 Ok(match path {
159 None => {
160 let tmp_dir = Arc::new(tempfile::tempdir()?);
161 PathProvider::TemporaryDirectory { tmp_dir }
162 }
163 Some(path) => {
164 let path = Path::new(path);
165 let path_buf = path.to_path_buf();
166 PathProvider::ExternalPath { path_buf }
167 }
168 })
169 }
170}
171
172pub struct LocalNetConfig {
174 pub database: Database,
175 pub network: NetworkConfig,
176 pub testing_prng_seed: Option<u64>,
177 pub namespace: String,
178 pub num_other_initial_chains: u32,
179 pub initial_amount: Amount,
180 pub num_initial_validators: usize,
181 pub num_shards: usize,
182 pub num_proxies: usize,
183 pub policy_config: ResourceControlPolicyConfig,
184 pub cross_chain_config: CrossChainConfig,
185 pub storage_config_builder: StorageConfigBuilder,
186 pub path_provider: PathProvider,
187 pub num_block_exporters: u32,
188}
189
190pub struct LocalNet {
192 network: NetworkConfig,
193 testing_prng_seed: Option<u64>,
194 next_client_id: usize,
195 num_initial_validators: usize,
196 num_proxies: usize,
197 num_shards: usize,
198 validator_keys: BTreeMap<usize, (String, String)>,
199 running_validators: BTreeMap<usize, Validator>,
200 initialized_validator_storages: BTreeMap<usize, StorageConfigNamespace>,
201 common_namespace: String,
202 common_storage_config: StorageConfig,
203 cross_chain_config: CrossChainConfig,
204 path_provider: PathProvider,
205 num_block_exporters: u32,
206}
207
208const SERVER_ENV: &str = "LINERA_SERVER_PARAMS";
211
212#[derive(Copy, Clone, Eq, PartialEq)]
214pub enum Database {
215 Service,
216 DynamoDb,
217 ScyllaDb,
218 DualRocksDbScyllaDb,
219}
220
221struct Validator {
223 proxy: Child,
224 servers: Vec<Child>,
225 exporters: Vec<Child>,
226}
227
228impl Validator {
229 fn new(proxy: Child) -> Self {
230 Self {
231 proxy,
232 servers: vec![],
233 exporters: vec![],
234 }
235 }
236
237 async fn terminate(&mut self) -> Result<()> {
238 self.proxy
239 .kill()
240 .await
241 .context("terminating validator proxy")?;
242 for server in &mut self.servers {
243 server
244 .kill()
245 .await
246 .context("terminating validator server")?;
247 }
248 Ok(())
249 }
250
251 fn add_server(&mut self, server: Child) {
252 self.servers.push(server)
253 }
254
255 fn add_block_exporter(&mut self, exporter: Child) {
256 self.exporters.push(exporter);
257 }
258
259 #[cfg(with_testing)]
260 async fn terminate_server(&mut self, index: usize) -> Result<()> {
261 let mut server = self.servers.remove(index);
262 server
263 .kill()
264 .await
265 .context("terminating validator server")?;
266 Ok(())
267 }
268
269 fn ensure_is_running(&mut self) -> Result<()> {
270 self.proxy.ensure_is_running()?;
271 for child in &mut self.servers {
272 child.ensure_is_running()?;
273 }
274
275 for exporter in &mut self.exporters {
276 exporter.ensure_is_running()?;
277 }
278
279 Ok(())
280 }
281}
282
283#[cfg(with_testing)]
284impl LocalNetConfig {
285 pub fn new_test(database: Database, network: Network) -> Self {
286 let num_shards = 4;
287 let num_proxies = 1;
288 let storage_config_builder = StorageConfigBuilder::TestConfig;
289 let path_provider = PathProvider::create_temporary_directory().unwrap();
290 let internal = network.drop_tls();
291 let external = network;
292 let network = NetworkConfig { internal, external };
293 let cross_chain_config = CrossChainConfig::default();
294 Self {
295 database,
296 network,
297 num_other_initial_chains: 2,
298 initial_amount: Amount::from_tokens(1_000_000),
299 policy_config: ResourceControlPolicyConfig::Testnet,
300 cross_chain_config,
301 testing_prng_seed: Some(37),
302 namespace: linera_views::random::generate_test_namespace(),
303 num_initial_validators: 4,
304 num_shards,
305 num_proxies,
306 storage_config_builder,
307 path_provider,
308 num_block_exporters: 0,
309 }
310 }
311}
312
313#[async_trait]
314impl LineraNetConfig for LocalNetConfig {
315 type Net = LocalNet;
316
317 async fn instantiate(self) -> Result<(Self::Net, ClientWrapper)> {
318 let storage_config = self.storage_config_builder.build(self.database).await?;
319 let mut net = LocalNet::new(
320 self.network,
321 self.testing_prng_seed,
322 self.namespace,
323 self.num_initial_validators,
324 self.num_shards,
325 self.num_proxies,
326 storage_config,
327 self.cross_chain_config,
328 self.path_provider,
329 self.num_block_exporters,
330 )?;
331 let client = net.make_client().await;
332 ensure!(
333 self.num_initial_validators > 0,
334 "There should be at least one initial validator"
335 );
336 net.generate_initial_validator_config().await?;
337 client
338 .create_genesis_config(
339 self.num_other_initial_chains,
340 self.initial_amount,
341 self.policy_config,
342 Some(vec!["localhost".to_owned()]),
343 )
344 .await?;
345 net.run().await?;
346 Ok((net, client))
347 }
348}
349
350#[async_trait]
351impl LineraNet for LocalNet {
352 async fn ensure_is_running(&mut self) -> Result<()> {
353 for validator in self.running_validators.values_mut() {
354 validator.ensure_is_running().context("in local network")?;
355 }
356 Ok(())
357 }
358
359 async fn make_client(&mut self) -> ClientWrapper {
360 let client = ClientWrapper::new(
361 self.path_provider.clone(),
362 self.network.external,
363 self.testing_prng_seed,
364 self.next_client_id,
365 OnClientDrop::LeakChains,
366 );
367 if let Some(seed) = self.testing_prng_seed {
368 self.testing_prng_seed = Some(seed + 1);
369 }
370 self.next_client_id += 1;
371 client
372 }
373
374 async fn terminate(&mut self) -> Result<()> {
375 for validator in self.running_validators.values_mut() {
376 validator.terminate().await.context("in local network")?
377 }
378 Ok(())
379 }
380}
381
382impl LocalNet {
383 #[expect(clippy::too_many_arguments)]
384 fn new(
385 network: NetworkConfig,
386 testing_prng_seed: Option<u64>,
387 common_namespace: String,
388 num_initial_validators: usize,
389 num_proxies: usize,
390 num_shards: usize,
391 common_storage_config: StorageConfig,
392 cross_chain_config: CrossChainConfig,
393 path_provider: PathProvider,
394 num_block_exporters: u32,
395 ) -> Result<Self> {
396 Ok(Self {
397 network,
398 testing_prng_seed,
399 next_client_id: 0,
400 num_initial_validators,
401 num_proxies,
402 num_shards,
403 validator_keys: BTreeMap::new(),
404 running_validators: BTreeMap::new(),
405 initialized_validator_storages: BTreeMap::new(),
406 common_namespace,
407 common_storage_config,
408 cross_chain_config,
409 path_provider,
410 num_block_exporters,
411 })
412 }
413
414 async fn command_for_binary(&self, name: &'static str) -> Result<Command> {
415 let path = resolve_binary(name, env!("CARGO_PKG_NAME")).await?;
416 let mut command = Command::new(path);
417 command.current_dir(self.path_provider.path());
418 Ok(command)
419 }
420
421 #[cfg(with_testing)]
422 pub fn genesis_config(&self) -> Result<linera_client::config::GenesisConfig> {
423 let path = self.path_provider.path();
424 crate::util::read_json(path.join("genesis.json"))
425 }
426
427 pub fn proxy_public_port(validator: usize, proxy_id: usize) -> usize {
428 13000 + validator * 100 + proxy_id + 1
429 }
430
431 pub fn proxy_internal_port(validator: usize, proxy_id: usize) -> usize {
432 10000 + validator * 100 + proxy_id + 1
433 }
434
435 fn shard_port(validator: usize, shard: usize) -> usize {
436 9000 + validator * 100 + shard + 1
437 }
438
439 fn proxy_metrics_port(validator: usize, proxy_id: usize) -> usize {
440 12000 + validator * 100 + proxy_id + 1
441 }
442
443 fn shard_metrics_port(validator: usize, shard: usize) -> usize {
444 11000 + validator * 100 + shard + 1
445 }
446
447 fn block_exporter_port(validator: usize, exporter_id: usize) -> usize {
448 12000 + validator * 100 + exporter_id + 1
449 }
450
451 fn configuration_string(&self, server_number: usize) -> Result<String> {
452 let n = server_number;
453 let path = self
454 .path_provider
455 .path()
456 .join(format!("validator_{n}.toml"));
457 let port = Self::proxy_public_port(n, 0);
458 let external_protocol = self.network.external.toml();
459 let internal_protocol = self.network.internal.toml();
460 let external_host = self.network.external.localhost();
461 let internal_host = self.network.internal.localhost();
462 let mut content = format!(
463 r#"
464 server_config_path = "server_{n}.json"
465 host = "{external_host}"
466 port = {port}
467 external_protocol = {external_protocol}
468 internal_protocol = {internal_protocol}
469 "#
470 );
471
472 for k in 0..self.num_proxies {
473 let internal_port = Self::proxy_internal_port(n, k);
474 let metrics_port = Self::proxy_metrics_port(n, k);
475 content.push_str(&format!(
479 r#"
480 [[proxies]]
481 host = "{internal_host}"
482 public_port = {port}
483 private_port = {internal_port}
484 metrics_host = "{external_host}"
485 metrics_port = {metrics_port}
486 "#
487 ));
488 }
489
490 for k in 0..self.num_shards {
491 let shard_port = Self::shard_port(n, k);
492 let shard_metrics_port = Self::shard_metrics_port(n, k);
493 content.push_str(&format!(
494 r#"
495
496 [[shards]]
497 host = "{internal_host}"
498 port = {shard_port}
499 metrics_port = {shard_metrics_port}
500 "#
501 ));
502 }
503
504 for j in 0..self.num_block_exporters {
505 let host = Network::Grpc.localhost();
506 let port = Self::block_exporter_port(n, j as usize);
507 let config_content = format!(
508 r#"
509
510 [[block_exporters]]
511 host = "{host}"
512 port = {port}
513 "#
514 );
515
516 content.push_str(&config_content);
517 let exporter_config = self.generate_block_exporter_config(n, j);
518 let config_path = self
519 .path_provider
520 .path()
521 .join(format!("exporter_config_{n}:{j}.toml"));
522
523 fs_err::write(&config_path, &exporter_config)?;
524 }
525
526 fs_err::write(&path, content)?;
527 path.into_os_string().into_string().map_err(|error| {
528 anyhow!(
529 "could not parse OS string into string: {}",
530 error.to_string_lossy()
531 )
532 })
533 }
534
535 fn generate_block_exporter_config(&self, validator: usize, exporter_id: u32) -> String {
536 let n = validator;
537 let host = Network::Grpc.localhost();
538 let port = Self::block_exporter_port(n, exporter_id as usize);
539 let config = format!(
540 r#"
541 id = {exporter_id}
542
543 [service_config]
544 host = "{host}"
545 port = {port}
546 "#
547 );
548
549 config
550 }
551
552 async fn generate_initial_validator_config(&mut self) -> Result<()> {
553 let mut command = self.command_for_binary("linera-server").await?;
554 command.arg("generate");
555 if let Some(seed) = self.testing_prng_seed {
556 command.arg("--testing-prng-seed").arg(seed.to_string());
557 self.testing_prng_seed = Some(seed + 1);
558 }
559 command.arg("--validators");
560 for i in 0..self.num_initial_validators {
561 command.arg(&self.configuration_string(i)?);
562 }
563 let output = command
564 .args(["--committee", "committee.json"])
565 .spawn_and_wait_for_stdout()
566 .await?;
567 self.validator_keys = output
568 .split_whitespace()
569 .map(str::to_string)
570 .map(|keys| keys.split(',').map(str::to_string).collect::<Vec<_>>())
571 .enumerate()
572 .map(|(i, keys)| {
573 let validator_key = keys[0].to_string();
574 let account_key = keys[1].to_string();
575 (i, (validator_key, account_key))
576 })
577 .collect();
578 Ok(())
579 }
580
581 async fn run_proxy(&mut self, validator: usize) -> Result<Child> {
582 let storage = self
583 .initialized_validator_storages
584 .get(&validator)
585 .expect("initialized storage");
586 let child = self
587 .command_for_binary("linera-proxy")
588 .await?
589 .arg(format!("server_{}.json", validator))
590 .args(["--storage", &storage.to_string()])
591 .args(["--genesis", "genesis.json"])
592 .spawn_into()?;
593
594 let port = Self::proxy_public_port(validator, 0);
595 let nickname = format!("validator proxy {validator}");
596 match self.network.external {
597 Network::Grpc => {
598 Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
599 let nickname = format!("validator proxy {validator}");
600 Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
601 }
602 Network::Grpcs => {
603 let nickname = format!("validator proxy {validator}");
604 Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
605 }
606 Network::Tcp => {
607 Self::ensure_simple_server_has_started(&nickname, port, "tcp").await?;
608 }
609 Network::Udp => {
610 Self::ensure_simple_server_has_started(&nickname, port, "udp").await?;
611 }
612 }
613 Ok(child)
614 }
615
616 async fn run_exporter(&mut self, validator: usize, exporter_id: u32) -> Result<Child> {
617 let config_path = format!("exporter_config_{validator}:{exporter_id}.toml");
618 let storage = self
619 .initialized_validator_storages
620 .get(&validator)
621 .expect("initialized storage");
622
623 let child = self
624 .command_for_binary("linera-exporter")
625 .await?
626 .arg(config_path)
627 .args(["--storage", &storage.to_string()])
628 .args(["--genesis", "genesis.json"])
629 .spawn_into()?;
630
631 match self.network.internal {
632 Network::Grpc => {
633 let port = Self::block_exporter_port(validator, exporter_id as usize);
634 let nickname = format!("block exporter {validator}:{exporter_id}");
635 Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
636 }
637 Network::Grpcs => {
638 let port = Self::block_exporter_port(validator, exporter_id as usize);
639 let nickname = format!("block exporter {validator}:{exporter_id}");
640 Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
641 }
642 Network::Tcp | Network::Udp => {
643 unreachable!("Only allowed options are grpc and grpcs")
644 }
645 }
646
647 Ok(child)
648 }
649
650 pub async fn ensure_grpc_server_has_started(
651 nickname: &str,
652 port: usize,
653 scheme: &str,
654 ) -> Result<()> {
655 let endpoint = match scheme {
656 "http" => Endpoint::new(format!("http://localhost:{port}"))
657 .context("endpoint should always parse")?,
658 "https" => {
659 use linera_rpc::CERT_PEM;
660 let certificate = tonic::transport::Certificate::from_pem(CERT_PEM);
661 let tls_config = ClientTlsConfig::new().ca_certificate(certificate);
662 Endpoint::new(format!("https://localhost:{port}"))
663 .context("endpoint should always parse")?
664 .tls_config(tls_config)?
665 }
666 _ => bail!("Only supported scheme are http and https"),
667 };
668 let connection = endpoint.connect_lazy();
669 let mut client = HealthClient::new(connection);
670 linera_base::time::timer::sleep(Duration::from_millis(100)).await;
671 for i in 0..10 {
672 linera_base::time::timer::sleep(Duration::from_millis(i * 500)).await;
673 let result = client.check(HealthCheckRequest::default()).await;
674 if result.is_ok() && result.unwrap().get_ref().status() == ServingStatus::Serving {
675 info!("Successfully started {nickname}");
676 return Ok(());
677 } else {
678 warn!("Waiting for {nickname} to start");
679 }
680 }
681 bail!("Failed to start {nickname}");
682 }
683
684 pub async fn ensure_simple_server_has_started(
685 nickname: &str,
686 port: usize,
687 protocol: &str,
688 ) -> Result<()> {
689 use linera_core::node::ValidatorNode as _;
690
691 let options = linera_rpc::NodeOptions {
692 send_timeout: Duration::from_secs(5),
693 recv_timeout: Duration::from_secs(5),
694 retry_delay: Duration::from_secs(1),
695 max_retries: 1,
696 };
697 let provider = linera_rpc::simple::SimpleNodeProvider::new(options);
698 let address = format!("{protocol}:127.0.0.1:{port}");
699 let node = provider.make_node(&address)?;
702 linera_base::time::timer::sleep(Duration::from_millis(100)).await;
703 for i in 0..10 {
704 linera_base::time::timer::sleep(Duration::from_millis(i * 500)).await;
705 let result = node.get_version_info().await;
706 if result.is_ok() {
707 info!("Successfully started {nickname}");
708 return Ok(());
709 } else {
710 warn!("Waiting for {nickname} to start");
711 }
712 }
713 bail!("Failed to start {nickname}");
714 }
715
716 async fn initialize_storage(&mut self, validator: usize) -> Result<()> {
717 let namespace = format!("{}_server_{}_db", self.common_namespace, validator);
718 let storage_config = self.common_storage_config.clone();
719 let storage = StorageConfigNamespace {
720 storage_config,
721 namespace,
722 };
723
724 let mut command = self.command_for_binary("linera-server").await?;
725 if let Ok(var) = env::var(SERVER_ENV) {
726 command.args(var.split_whitespace());
727 }
728 command.arg("initialize");
729 command
730 .args(["--storage", &storage.to_string()])
731 .args(["--genesis", "genesis.json"])
732 .spawn_and_wait_for_stdout()
733 .await?;
734
735 self.initialized_validator_storages
736 .insert(validator, storage);
737 Ok(())
738 }
739
740 async fn run_server(&mut self, validator: usize, shard: usize) -> Result<Child> {
741 let mut storage = self
742 .initialized_validator_storages
743 .get(&validator)
744 .expect("initialized storage")
745 .clone();
746
747 storage.storage_config.maybe_append_shard_path(shard)?;
750
751 let mut command = self.command_for_binary("linera-server").await?;
752 if let Ok(var) = env::var(SERVER_ENV) {
753 command.args(var.split_whitespace());
754 }
755 command
756 .arg("run")
757 .args(["--storage", &storage.to_string()])
758 .args(["--server", &format!("server_{}.json", validator)])
759 .args(["--shard", &shard.to_string()])
760 .args(["--genesis", "genesis.json"])
761 .args(self.cross_chain_config.to_args());
762 let child = command.spawn_into()?;
763
764 let port = Self::shard_port(validator, shard);
765 let nickname = format!("validator server {validator}:{shard}");
766 match self.network.internal {
767 Network::Grpc => {
768 Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
769 }
770 Network::Grpcs => {
771 Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
772 }
773 Network::Tcp => {
774 Self::ensure_simple_server_has_started(&nickname, port, "tcp").await?;
775 }
776 Network::Udp => {
777 Self::ensure_simple_server_has_started(&nickname, port, "udp").await?;
778 }
779 }
780 Ok(child)
781 }
782
783 async fn run(&mut self) -> Result<()> {
784 for validator in 0..self.num_initial_validators {
785 self.start_validator(validator).await?;
786 }
787 Ok(())
788 }
789
790 pub async fn start_validator(&mut self, index: usize) -> Result<()> {
792 self.initialize_storage(index).await?;
793 self.restart_validator(index).await
794 }
795
796 pub async fn restart_validator(&mut self, index: usize) -> Result<()> {
799 let proxy = self.run_proxy(index).await?;
800 let mut validator = Validator::new(proxy);
801 for shard in 0..self.num_shards {
802 let server = self.run_server(index, shard).await?;
803 validator.add_server(server);
804 }
805 for block_exporter in 0..self.num_block_exporters {
806 let exporter = self.run_exporter(index, block_exporter).await?;
807 validator.add_block_exporter(exporter);
808 }
809 self.running_validators.insert(index, validator);
810 Ok(())
811 }
812
813 pub async fn stop_validator(&mut self, index: usize) -> Result<()> {
815 if let Some(mut validator) = self.running_validators.remove(&index) {
816 if let Err(error) = validator.terminate().await {
817 error!("Failed to stop validator {index}: {error}");
818 return Err(error);
819 }
820 }
821 Ok(())
822 }
823
824 pub async fn validator_client(&mut self, validator: usize) -> Result<linera_rpc::Client> {
826 let node_provider = linera_rpc::NodeProvider::new(linera_rpc::NodeOptions {
827 send_timeout: Duration::from_secs(1),
828 recv_timeout: Duration::from_secs(1),
829 retry_delay: Duration::ZERO,
830 max_retries: 0,
831 });
832
833 Ok(node_provider.make_node(&self.validator_address(validator))?)
834 }
835
836 pub fn validator_address(&self, validator: usize) -> String {
839 let port = Self::proxy_public_port(validator, 0);
840 let schema = self.network.external.schema();
841
842 format!("{schema}:localhost:{port}")
843 }
844}
845
846#[cfg(with_testing)]
847impl LocalNet {
848 pub fn validator_keys(&self, validator: usize) -> Option<&(String, String)> {
850 self.validator_keys.get(&validator)
851 }
852
853 pub async fn generate_validator_config(&mut self, validator: usize) -> Result<()> {
854 let stdout = self
855 .command_for_binary("linera-server")
856 .await?
857 .arg("generate")
858 .arg("--validators")
859 .arg(&self.configuration_string(validator)?)
860 .spawn_and_wait_for_stdout()
861 .await?;
862 let keys = stdout
863 .trim()
864 .split(',')
865 .map(str::to_string)
866 .collect::<Vec<_>>();
867 self.validator_keys
868 .insert(validator, (keys[0].clone(), keys[1].clone()));
869 Ok(())
870 }
871
872 pub async fn terminate_server(&mut self, validator: usize, shard: usize) -> Result<()> {
873 self.running_validators
874 .get_mut(&validator)
875 .context("server not found")?
876 .terminate_server(shard)
877 .await?;
878 Ok(())
879 }
880
881 pub fn remove_validator(&mut self, validator: usize) -> Result<()> {
882 self.running_validators
883 .remove(&validator)
884 .context("validator not found")?;
885 Ok(())
886 }
887
888 pub async fn start_server(&mut self, validator: usize, shard: usize) -> Result<()> {
889 let server = self.run_server(validator, shard).await?;
890 self.running_validators
891 .get_mut(&validator)
892 .context("could not find validator")?
893 .add_server(server);
894 Ok(())
895 }
896}