1#[cfg(with_testing)]
5use std::sync::LazyLock;
6use std::{
7 collections::BTreeMap,
8 env,
9 path::{Path, PathBuf},
10 sync::Arc,
11 time::Duration,
12};
13
14use anyhow::{anyhow, bail, ensure, Context, Result};
15#[cfg(with_testing)]
16use async_lock::RwLock;
17use async_trait::async_trait;
18use linera_base::{
19 command::{resolve_binary, CommandExt},
20 data_types::Amount,
21};
22use linera_client::client_options::ResourceControlPolicyConfig;
23use linera_core::node::ValidatorNodeProvider;
24use linera_rpc::config::{CrossChainConfig, ExporterServiceConfig, TlsConfig};
25#[cfg(all(feature = "storage-service", with_testing))]
26use linera_storage_service::common::storage_service_test_endpoint;
27#[cfg(all(feature = "rocksdb", feature = "scylladb", with_testing))]
28use linera_views::rocks_db::{RocksDbSpawnMode, RocksDbStore};
29#[cfg(all(feature = "scylladb", with_testing))]
30use linera_views::{scylla_db::ScyllaDbStore, store::TestKeyValueStore as _};
31use tempfile::{tempdir, TempDir};
32use tokio::process::{Child, Command};
33use tonic::transport::{channel::ClientTlsConfig, Endpoint};
34use tonic_health::pb::{
35 health_check_response::ServingStatus, health_client::HealthClient, HealthCheckRequest,
36};
37use tracing::{error, info, warn};
38
39use crate::{
40 cli_wrappers::{
41 ClientWrapper, LineraNet, LineraNetConfig, Network, NetworkConfig, OnClientDrop,
42 },
43 config::{BlockExporterConfig, Destination, DestinationConfig},
44 storage::{InnerStorageConfig, StorageConfig},
45 util::ChildExt,
46};
47
48pub enum ProcessInbox {
49 Skip,
50 Automatic,
51}
52
53#[cfg(with_testing)]
54static PORT_PROVIDER: LazyLock<RwLock<u16>> = LazyLock::new(|| RwLock::new(7080));
55
56#[cfg(with_testing)]
58pub async fn get_node_port() -> u16 {
59 let mut port = PORT_PROVIDER.write().await;
60 let port_ret = *port;
61 *port += 1;
62 info!("get_node_port returning port_ret={}", port_ret);
63 assert!(port_selector::is_free(port_ret));
64 port_ret
65}
66
67#[cfg(with_testing)]
68async fn make_testing_config(database: Database) -> Result<InnerStorageConfig> {
69 match database {
70 Database::Service => {
71 #[cfg(feature = "storage-service")]
72 {
73 let endpoint = storage_service_test_endpoint()
74 .expect("Reading LINERA_STORAGE_SERVICE environment variable");
75 Ok(InnerStorageConfig::Service { endpoint })
76 }
77 #[cfg(not(feature = "storage-service"))]
78 panic!("Database::Service is selected without the feature storage_service");
79 }
80 Database::DynamoDb => {
81 #[cfg(feature = "dynamodb")]
82 {
83 let use_dynamodb_local = true;
84 Ok(InnerStorageConfig::DynamoDb { use_dynamodb_local })
85 }
86 #[cfg(not(feature = "dynamodb"))]
87 panic!("Database::DynamoDb is selected without the feature dynamodb");
88 }
89 Database::ScyllaDb => {
90 #[cfg(feature = "scylladb")]
91 {
92 let config = ScyllaDbStore::new_test_config().await?;
93 Ok(InnerStorageConfig::ScyllaDb {
94 uri: config.inner_config.uri,
95 })
96 }
97 #[cfg(not(feature = "scylladb"))]
98 panic!("Database::ScyllaDb is selected without the feature scylladb");
99 }
100 Database::DualRocksDbScyllaDb => {
101 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
102 {
103 let rocksdb_config = RocksDbStore::new_test_config().await?;
104 let scylla_config = ScyllaDbStore::new_test_config().await?;
105 let spawn_mode = RocksDbSpawnMode::get_spawn_mode_from_runtime();
106 Ok(InnerStorageConfig::DualRocksDbScyllaDb {
107 path_with_guard: rocksdb_config.inner_config.path_with_guard,
108 spawn_mode,
109 uri: scylla_config.inner_config.uri,
110 })
111 }
112 #[cfg(not(all(feature = "rocksdb", feature = "scylladb")))]
113 panic!("Database::DualRocksDbScyllaDb is selected without the features rocksdb and scylladb");
114 }
115 }
116}
117
118pub enum InnerStorageConfigBuilder {
119 #[cfg(with_testing)]
120 TestConfig,
121 ExistingConfig {
122 storage_config: InnerStorageConfig,
123 },
124}
125
126impl InnerStorageConfigBuilder {
127 #[allow(unused_variables)]
128 pub async fn build(self, database: Database) -> Result<InnerStorageConfig> {
129 match self {
130 #[cfg(with_testing)]
131 InnerStorageConfigBuilder::TestConfig => make_testing_config(database).await,
132 InnerStorageConfigBuilder::ExistingConfig { storage_config } => Ok(storage_config),
133 }
134 }
135}
136
137#[derive(Clone)]
140pub enum PathProvider {
141 ExternalPath { path_buf: PathBuf },
142 TemporaryDirectory { tmp_dir: Arc<TempDir> },
143}
144
145impl PathProvider {
146 pub fn path(&self) -> &Path {
147 match self {
148 PathProvider::ExternalPath { path_buf } => path_buf.as_path(),
149 PathProvider::TemporaryDirectory { tmp_dir } => tmp_dir.path(),
150 }
151 }
152
153 pub fn create_temporary_directory() -> Result<Self> {
154 let tmp_dir = Arc::new(tempdir()?);
155 Ok(PathProvider::TemporaryDirectory { tmp_dir })
156 }
157
158 pub fn from_path_option(path: &Option<String>) -> anyhow::Result<Self> {
159 Ok(match path {
160 None => {
161 let tmp_dir = Arc::new(tempfile::tempdir()?);
162 PathProvider::TemporaryDirectory { tmp_dir }
163 }
164 Some(path) => {
165 let path = Path::new(path);
166 let path_buf = path.to_path_buf();
167 PathProvider::ExternalPath { path_buf }
168 }
169 })
170 }
171}
172
173pub struct LocalNetConfig {
175 pub database: Database,
176 pub network: NetworkConfig,
177 pub testing_prng_seed: Option<u64>,
178 pub namespace: String,
179 pub num_other_initial_chains: u32,
180 pub initial_amount: Amount,
181 pub num_initial_validators: usize,
182 pub num_shards: usize,
183 pub num_proxies: usize,
184 pub policy_config: ResourceControlPolicyConfig,
185 pub cross_chain_config: CrossChainConfig,
186 pub storage_config_builder: InnerStorageConfigBuilder,
187 pub path_provider: PathProvider,
188 pub block_exporters: ExportersSetup,
189}
190
191pub enum ExportersSetup {
193 Local(Vec<BlockExporterConfig>),
195 Remote(Vec<ExporterServiceConfig>),
197}
198
199pub struct LocalNet {
201 network: NetworkConfig,
202 testing_prng_seed: Option<u64>,
203 next_client_id: usize,
204 num_initial_validators: usize,
205 num_proxies: usize,
206 num_shards: usize,
207 validator_keys: BTreeMap<usize, (String, String)>,
208 running_validators: BTreeMap<usize, Validator>,
209 initialized_validator_storages: BTreeMap<usize, StorageConfig>,
210 common_namespace: String,
211 common_storage_config: InnerStorageConfig,
212 cross_chain_config: CrossChainConfig,
213 path_provider: PathProvider,
214 block_exporters: ExportersSetup,
215}
216
217const SERVER_ENV: &str = "LINERA_SERVER_PARAMS";
220
221#[derive(Copy, Clone, Eq, PartialEq)]
223pub enum Database {
224 Service,
225 DynamoDb,
226 ScyllaDb,
227 DualRocksDbScyllaDb,
228}
229
230struct Validator {
232 proxy: Child,
233 servers: Vec<Child>,
234 exporters: Vec<Child>,
235}
236
237impl Validator {
238 fn new(proxy: Child) -> Self {
239 Self {
240 proxy,
241 servers: vec![],
242 exporters: vec![],
243 }
244 }
245
246 async fn terminate(&mut self) -> Result<()> {
247 self.proxy
248 .kill()
249 .await
250 .context("terminating validator proxy")?;
251 for server in &mut self.servers {
252 server
253 .kill()
254 .await
255 .context("terminating validator server")?;
256 }
257 Ok(())
258 }
259
260 fn add_server(&mut self, server: Child) {
261 self.servers.push(server)
262 }
263
264 #[cfg(with_testing)]
265 async fn terminate_server(&mut self, index: usize) -> Result<()> {
266 let mut server = self.servers.remove(index);
267 server
268 .kill()
269 .await
270 .context("terminating validator server")?;
271 Ok(())
272 }
273
274 fn add_block_exporter(&mut self, exporter: Child) {
275 self.exporters.push(exporter);
276 }
277
278 fn ensure_is_running(&mut self) -> Result<()> {
279 self.proxy.ensure_is_running()?;
280 for child in &mut self.servers {
281 child.ensure_is_running()?;
282 }
283 for exporter in &mut self.exporters {
284 exporter.ensure_is_running()?;
285 }
286 Ok(())
287 }
288}
289
290#[cfg(with_testing)]
291impl LocalNetConfig {
292 pub fn new_test(database: Database, network: Network) -> Self {
293 let num_shards = 4;
294 let num_proxies = 1;
295 let storage_config_builder = InnerStorageConfigBuilder::TestConfig;
296 let path_provider = PathProvider::create_temporary_directory().unwrap();
297 let internal = network.drop_tls();
298 let external = network;
299 let network = NetworkConfig { internal, external };
300 let cross_chain_config = CrossChainConfig::default();
301 Self {
302 database,
303 network,
304 num_other_initial_chains: 2,
305 initial_amount: Amount::from_tokens(1_000_000),
306 policy_config: ResourceControlPolicyConfig::Testnet,
307 cross_chain_config,
308 testing_prng_seed: Some(37),
309 namespace: linera_views::random::generate_test_namespace(),
310 num_initial_validators: 4,
311 num_shards,
312 num_proxies,
313 storage_config_builder,
314 path_provider,
315 block_exporters: ExportersSetup::Local(vec![]),
316 }
317 }
318}
319
320#[async_trait]
321impl LineraNetConfig for LocalNetConfig {
322 type Net = LocalNet;
323
324 async fn instantiate(self) -> Result<(Self::Net, ClientWrapper)> {
325 let storage_config = self.storage_config_builder.build(self.database).await?;
326 let mut net = LocalNet::new(
327 self.network,
328 self.testing_prng_seed,
329 self.namespace,
330 self.num_initial_validators,
331 self.num_shards,
332 self.num_proxies,
333 storage_config,
334 self.cross_chain_config,
335 self.path_provider,
336 self.block_exporters,
337 )?;
338 let client = net.make_client().await;
339 ensure!(
340 self.num_initial_validators > 0,
341 "There should be at least one initial validator"
342 );
343 net.generate_initial_validator_config().await?;
344 client
345 .create_genesis_config(
346 self.num_other_initial_chains,
347 self.initial_amount,
348 self.policy_config,
349 Some(vec!["localhost".to_owned()]),
350 )
351 .await?;
352 net.run().await?;
353 Ok((net, client))
354 }
355}
356
357#[async_trait]
358impl LineraNet for LocalNet {
359 async fn ensure_is_running(&mut self) -> Result<()> {
360 for validator in self.running_validators.values_mut() {
361 validator.ensure_is_running().context("in local network")?;
362 }
363 Ok(())
364 }
365
366 async fn make_client(&mut self) -> ClientWrapper {
367 let client = ClientWrapper::new(
368 self.path_provider.clone(),
369 self.network.external,
370 self.testing_prng_seed,
371 self.next_client_id,
372 OnClientDrop::LeakChains,
373 );
374 if let Some(seed) = self.testing_prng_seed {
375 self.testing_prng_seed = Some(seed + 1);
376 }
377 self.next_client_id += 1;
378 client
379 }
380
381 async fn terminate(&mut self) -> Result<()> {
382 for validator in self.running_validators.values_mut() {
383 validator.terminate().await.context("in local network")?
384 }
385 Ok(())
386 }
387}
388
389impl LocalNet {
390 #[expect(clippy::too_many_arguments)]
391 fn new(
392 network: NetworkConfig,
393 testing_prng_seed: Option<u64>,
394 common_namespace: String,
395 num_initial_validators: usize,
396 num_proxies: usize,
397 num_shards: usize,
398 common_storage_config: InnerStorageConfig,
399 cross_chain_config: CrossChainConfig,
400 path_provider: PathProvider,
401 block_exporters: ExportersSetup,
402 ) -> Result<Self> {
403 Ok(Self {
404 network,
405 testing_prng_seed,
406 next_client_id: 0,
407 num_initial_validators,
408 num_proxies,
409 num_shards,
410 validator_keys: BTreeMap::new(),
411 running_validators: BTreeMap::new(),
412 initialized_validator_storages: BTreeMap::new(),
413 common_namespace,
414 common_storage_config,
415 cross_chain_config,
416 path_provider,
417 block_exporters,
418 })
419 }
420
421 async fn command_for_binary(&self, name: &'static str) -> Result<Command> {
422 let path = resolve_binary(name, env!("CARGO_PKG_NAME")).await?;
423 let mut command = Command::new(path);
424 command.current_dir(self.path_provider.path());
425 Ok(command)
426 }
427
428 #[cfg(with_testing)]
429 pub fn genesis_config(&self) -> Result<linera_client::config::GenesisConfig> {
430 let path = self.path_provider.path();
431 crate::util::read_json(path.join("genesis.json"))
432 }
433
434 pub fn proxy_public_port(validator: usize, proxy_id: usize) -> usize {
435 13000 + validator * 100 + proxy_id + 1
436 }
437
438 pub fn proxy_internal_port(validator: usize, proxy_id: usize) -> usize {
439 10000 + validator * 100 + proxy_id + 1
440 }
441
442 fn shard_port(validator: usize, shard: usize) -> usize {
443 9000 + validator * 100 + shard + 1
444 }
445
446 fn proxy_metrics_port(validator: usize, proxy_id: usize) -> usize {
447 12000 + validator * 100 + proxy_id + 1
448 }
449
450 fn shard_metrics_port(validator: usize, shard: usize) -> usize {
451 11000 + validator * 100 + shard + 1
452 }
453
454 fn block_exporter_port(validator: usize, exporter_id: usize) -> usize {
455 12000 + validator * 100 + exporter_id + 1
456 }
457
458 fn block_exporter_metrics_port(exporter_id: usize) -> usize {
459 13000 + exporter_id + 1
460 }
461
462 fn configuration_string(&self, server_number: usize) -> Result<String> {
463 let n = server_number;
464 let path = self
465 .path_provider
466 .path()
467 .join(format!("validator_{n}.toml"));
468 let port = Self::proxy_public_port(n, 0);
469 let external_protocol = self.network.external.toml();
470 let internal_protocol = self.network.internal.toml();
471 let external_host = self.network.external.localhost();
472 let internal_host = self.network.internal.localhost();
473 let mut content = format!(
474 r#"
475 server_config_path = "server_{n}.json"
476 host = "{external_host}"
477 port = {port}
478 external_protocol = {external_protocol}
479 internal_protocol = {internal_protocol}
480 "#
481 );
482
483 for k in 0..self.num_proxies {
484 let internal_port = Self::proxy_internal_port(n, k);
485 let metrics_port = Self::proxy_metrics_port(n, k);
486 content.push_str(&format!(
490 r#"
491 [[proxies]]
492 host = "{internal_host}"
493 public_port = {port}
494 private_port = {internal_port}
495 metrics_host = "{external_host}"
496 metrics_port = {metrics_port}
497 "#
498 ));
499 }
500
501 for k in 0..self.num_shards {
502 let shard_port = Self::shard_port(n, k);
503 let shard_metrics_port = Self::shard_metrics_port(n, k);
504 content.push_str(&format!(
505 r#"
506
507 [[shards]]
508 host = "{internal_host}"
509 port = {shard_port}
510 metrics_port = {shard_metrics_port}
511 "#
512 ));
513 }
514
515 match self.block_exporters {
516 ExportersSetup::Local(ref exporters) => {
517 for (j, exporter) in exporters.iter().enumerate() {
518 let host = Network::Grpc.localhost();
519 let port = Self::block_exporter_port(n, j);
520 let config_content = format!(
521 r#"
522
523 [[block_exporters]]
524 host = "{host}"
525 port = {port}
526 "#
527 );
528
529 content.push_str(&config_content);
530 let exporter_config = self.generate_block_exporter_config(
531 n,
532 j as u32,
533 &exporter.destination_config,
534 );
535 let config_path = self
536 .path_provider
537 .path()
538 .join(format!("exporter_config_{n}:{j}.toml"));
539
540 fs_err::write(&config_path, &exporter_config)?;
541 }
542 }
543 ExportersSetup::Remote(ref exporters) => {
544 for exporter in exporters.iter() {
545 let host = exporter.host.clone();
546 let port = exporter.port;
547 let config_content = format!(
548 r#"
549
550 [[block_exporters]]
551 host = "{host}"
552 port = {port}
553 "#
554 );
555
556 content.push_str(&config_content);
557 }
558 }
559 }
560
561 fs_err::write(&path, content)?;
562 path.into_os_string().into_string().map_err(|error| {
563 anyhow!(
564 "could not parse OS string into string: {}",
565 error.to_string_lossy()
566 )
567 })
568 }
569
570 fn generate_block_exporter_config(
571 &self,
572 validator: usize,
573 exporter_id: u32,
574 destination_config: &DestinationConfig,
575 ) -> String {
576 let n = validator;
577 let host = Network::Grpc.localhost();
578 let port = Self::block_exporter_port(n, exporter_id as usize);
579 let metrics_port = Self::block_exporter_metrics_port(exporter_id as usize);
580 let mut config = format!(
581 r#"
582 id = {exporter_id}
583
584 metrics_port = {metrics_port}
585
586 [service_config]
587 host = "{host}"
588 port = {port}
589
590 "#
591 );
592
593 let DestinationConfig {
594 destinations,
595 committee_destination,
596 } = destination_config;
597
598 if *committee_destination {
599 let destination_string_to_push = r#"
600
601 [destination_config]
602 committee_destination = true
603 "#
604 .to_string();
605
606 config.push_str(&destination_string_to_push);
607 }
608
609 for destination in destinations {
610 let destination_string_to_push = match destination {
611 Destination::Indexer {
612 tls,
613 endpoint,
614 port,
615 } => {
616 let tls = match tls {
617 TlsConfig::ClearText => "ClearText",
618 TlsConfig::Tls => "Tls",
619 };
620 format!(
621 r#"
622 [[destination_config.destinations]]
623 tls = "{tls}"
624 endpoint = "{endpoint}"
625 port = {port}
626 kind = "Indexer"
627 "#
628 )
629 }
630 Destination::Validator { endpoint, port } => {
631 format!(
632 r#"
633 [[destination_config.destinations]]
634 endpoint = "{endpoint}"
635 port = {port}
636 kind = "Validator"
637 "#
638 )
639 }
640 Destination::Logging { file_name } => {
641 format!(
642 r#"
643 [[destination_config.destinations]]
644 file_name = "{file_name}"
645 kind = "Logging"
646 "#
647 )
648 }
649 };
650
651 config.push_str(&destination_string_to_push);
652 }
653
654 config
655 }
656
657 async fn generate_initial_validator_config(&mut self) -> Result<()> {
658 let mut command = self.command_for_binary("linera-server").await?;
659 command.arg("generate");
660 if let Some(seed) = self.testing_prng_seed {
661 command.arg("--testing-prng-seed").arg(seed.to_string());
662 self.testing_prng_seed = Some(seed + 1);
663 }
664 command.arg("--validators");
665 for i in 0..self.num_initial_validators {
666 command.arg(&self.configuration_string(i)?);
667 }
668 let output = command
669 .args(["--committee", "committee.json"])
670 .spawn_and_wait_for_stdout()
671 .await?;
672 self.validator_keys = output
673 .split_whitespace()
674 .map(str::to_string)
675 .map(|keys| keys.split(',').map(str::to_string).collect::<Vec<_>>())
676 .enumerate()
677 .map(|(i, keys)| {
678 let validator_key = keys[0].to_string();
679 let account_key = keys[1].to_string();
680 (i, (validator_key, account_key))
681 })
682 .collect();
683 Ok(())
684 }
685
686 async fn run_proxy(&mut self, validator: usize) -> Result<Child> {
687 let storage = self
688 .initialized_validator_storages
689 .get(&validator)
690 .expect("initialized storage");
691 let child = self
692 .command_for_binary("linera-proxy")
693 .await?
694 .arg(format!("server_{}.json", validator))
695 .args(["--storage", &storage.to_string()])
696 .spawn_into()?;
697
698 let port = Self::proxy_public_port(validator, 0);
699 let nickname = format!("validator proxy {validator}");
700 match self.network.external {
701 Network::Grpc => {
702 Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
703 let nickname = format!("validator proxy {validator}");
704 Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
705 }
706 Network::Grpcs => {
707 let nickname = format!("validator proxy {validator}");
708 Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
709 }
710 Network::Tcp => {
711 Self::ensure_simple_server_has_started(&nickname, port, "tcp").await?;
712 }
713 Network::Udp => {
714 Self::ensure_simple_server_has_started(&nickname, port, "udp").await?;
715 }
716 }
717 Ok(child)
718 }
719
720 async fn run_exporter(&mut self, validator: usize, exporter_id: u32) -> Result<Child> {
721 let config_path = format!("exporter_config_{validator}:{exporter_id}.toml");
722 let storage = self
723 .initialized_validator_storages
724 .get(&validator)
725 .expect("initialized storage");
726
727 tracing::debug!(config=?config_path, storage=?storage.to_string(), "starting block exporter");
728
729 let child = self
730 .command_for_binary("linera-exporter")
731 .await?
732 .args(["--config-path", &config_path])
733 .args(["--storage", &storage.to_string()])
734 .spawn_into()?;
735
736 match self.network.internal {
737 Network::Grpc => {
738 let port = Self::block_exporter_port(validator, exporter_id as usize);
739 let nickname = format!("block exporter {validator}:{exporter_id}");
740 Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
741 }
742 Network::Grpcs => {
743 let port = Self::block_exporter_port(validator, exporter_id as usize);
744 let nickname = format!("block exporter {validator}:{exporter_id}");
745 Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
746 }
747 Network::Tcp | Network::Udp => {
748 unreachable!("Only allowed options are grpc and grpcs")
749 }
750 }
751
752 tracing::info!("block exporter started {validator}:{exporter_id}");
753
754 Ok(child)
755 }
756
757 pub async fn ensure_grpc_server_has_started(
758 nickname: &str,
759 port: usize,
760 scheme: &str,
761 ) -> Result<()> {
762 let endpoint = match scheme {
763 "http" => Endpoint::new(format!("http://localhost:{port}"))
764 .context("endpoint should always parse")?,
765 "https" => {
766 use linera_rpc::CERT_PEM;
767 let certificate = tonic::transport::Certificate::from_pem(CERT_PEM);
768 let tls_config = ClientTlsConfig::new().ca_certificate(certificate);
769 Endpoint::new(format!("https://localhost:{port}"))
770 .context("endpoint should always parse")?
771 .tls_config(tls_config)?
772 }
773 _ => bail!("Only supported scheme are http and https"),
774 };
775 let connection = endpoint.connect_lazy();
776 let mut client = HealthClient::new(connection);
777 linera_base::time::timer::sleep(Duration::from_millis(100)).await;
778 for i in 0..10 {
779 linera_base::time::timer::sleep(Duration::from_millis(i * 500)).await;
780 let result = client.check(HealthCheckRequest::default()).await;
781 if result.is_ok() && result.unwrap().get_ref().status() == ServingStatus::Serving {
782 info!(?port, "Successfully started {nickname}");
783 return Ok(());
784 } else {
785 warn!("Waiting for {nickname} to start");
786 }
787 }
788 bail!("Failed to start {nickname}");
789 }
790
791 pub async fn ensure_simple_server_has_started(
792 nickname: &str,
793 port: usize,
794 protocol: &str,
795 ) -> Result<()> {
796 use linera_core::node::ValidatorNode as _;
797
798 let options = linera_rpc::NodeOptions {
799 send_timeout: Duration::from_secs(5),
800 recv_timeout: Duration::from_secs(5),
801 retry_delay: Duration::from_secs(1),
802 max_retries: 1,
803 };
804 let provider = linera_rpc::simple::SimpleNodeProvider::new(options);
805 let address = format!("{protocol}:127.0.0.1:{port}");
806 let node = provider.make_node(&address)?;
809 linera_base::time::timer::sleep(Duration::from_millis(100)).await;
810 for i in 0..10 {
811 linera_base::time::timer::sleep(Duration::from_millis(i * 500)).await;
812 let result = node.get_version_info().await;
813 if result.is_ok() {
814 info!("Successfully started {nickname}");
815 return Ok(());
816 } else {
817 warn!("Waiting for {nickname} to start");
818 }
819 }
820 bail!("Failed to start {nickname}");
821 }
822
823 async fn initialize_storage(&mut self, validator: usize) -> Result<()> {
824 let namespace = format!("{}_server_{}_db", self.common_namespace, validator);
825 let inner_storage_config = self.common_storage_config.clone();
826 let storage = StorageConfig {
827 inner_storage_config,
828 namespace,
829 };
830 let mut command = self.command_for_binary("linera").await?;
831 if let Ok(var) = env::var(SERVER_ENV) {
832 command.args(var.split_whitespace());
833 }
834 command.args(["storage", "initialize"]);
835 command
836 .args(["--storage", &storage.to_string()])
837 .args(["--genesis", "genesis.json"])
838 .spawn_and_wait_for_stdout()
839 .await?;
840
841 self.initialized_validator_storages
842 .insert(validator, storage);
843 Ok(())
844 }
845
846 async fn run_server(&mut self, validator: usize, shard: usize) -> Result<Child> {
847 let mut storage = self
848 .initialized_validator_storages
849 .get(&validator)
850 .expect("initialized storage")
851 .clone();
852
853 storage.maybe_append_shard_path(shard)?;
856
857 let mut command = self.command_for_binary("linera-server").await?;
858 if let Ok(var) = env::var(SERVER_ENV) {
859 command.args(var.split_whitespace());
860 }
861 command
862 .arg("run")
863 .args(["--storage", &storage.to_string()])
864 .args(["--server", &format!("server_{}.json", validator)])
865 .args(["--shard", &shard.to_string()])
866 .args(self.cross_chain_config.to_args());
867 let child = command.spawn_into()?;
868
869 let port = Self::shard_port(validator, shard);
870 let nickname = format!("validator server {validator}:{shard}");
871 match self.network.internal {
872 Network::Grpc => {
873 Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
874 }
875 Network::Grpcs => {
876 Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
877 }
878 Network::Tcp => {
879 Self::ensure_simple_server_has_started(&nickname, port, "tcp").await?;
880 }
881 Network::Udp => {
882 Self::ensure_simple_server_has_started(&nickname, port, "udp").await?;
883 }
884 }
885 Ok(child)
886 }
887
888 async fn run(&mut self) -> Result<()> {
889 for validator in 0..self.num_initial_validators {
890 self.start_validator(validator).await?;
891 }
892 Ok(())
893 }
894
895 pub async fn start_validator(&mut self, index: usize) -> Result<()> {
897 self.initialize_storage(index).await?;
898 self.restart_validator(index).await
899 }
900
901 pub async fn restart_validator(&mut self, index: usize) -> Result<()> {
904 let proxy = self.run_proxy(index).await?;
905 let mut validator = Validator::new(proxy);
906 for shard in 0..self.num_shards {
907 let server = self.run_server(index, shard).await?;
908 validator.add_server(server);
909 }
910 if let ExportersSetup::Local(ref exporters) = self.block_exporters {
911 for block_exporter in 0..exporters.len() {
912 let exporter = self.run_exporter(index, block_exporter as u32).await?;
913 validator.add_block_exporter(exporter);
914 }
915 }
916
917 self.running_validators.insert(index, validator);
918 Ok(())
919 }
920
921 pub async fn stop_validator(&mut self, index: usize) -> Result<()> {
923 if let Some(mut validator) = self.running_validators.remove(&index) {
924 if let Err(error) = validator.terminate().await {
925 error!("Failed to stop validator {index}: {error}");
926 return Err(error);
927 }
928 }
929 Ok(())
930 }
931
932 pub async fn validator_client(&mut self, validator: usize) -> Result<linera_rpc::Client> {
934 let node_provider = linera_rpc::NodeProvider::new(linera_rpc::NodeOptions {
935 send_timeout: Duration::from_secs(1),
936 recv_timeout: Duration::from_secs(1),
937 retry_delay: Duration::ZERO,
938 max_retries: 0,
939 });
940
941 Ok(node_provider.make_node(&self.validator_address(validator))?)
942 }
943
944 pub fn validator_address(&self, validator: usize) -> String {
947 let port = Self::proxy_public_port(validator, 0);
948 let schema = self.network.external.schema();
949
950 format!("{schema}:localhost:{port}")
951 }
952}
953
954#[cfg(with_testing)]
955impl LocalNet {
956 pub fn validator_keys(&self, validator: usize) -> Option<&(String, String)> {
958 self.validator_keys.get(&validator)
959 }
960
961 pub async fn generate_validator_config(&mut self, validator: usize) -> Result<()> {
962 let stdout = self
963 .command_for_binary("linera-server")
964 .await?
965 .arg("generate")
966 .arg("--validators")
967 .arg(&self.configuration_string(validator)?)
968 .spawn_and_wait_for_stdout()
969 .await?;
970 let keys = stdout
971 .trim()
972 .split(',')
973 .map(str::to_string)
974 .collect::<Vec<_>>();
975 self.validator_keys
976 .insert(validator, (keys[0].clone(), keys[1].clone()));
977 Ok(())
978 }
979
980 pub async fn terminate_server(&mut self, validator: usize, shard: usize) -> Result<()> {
981 self.running_validators
982 .get_mut(&validator)
983 .context("server not found")?
984 .terminate_server(shard)
985 .await?;
986 Ok(())
987 }
988
989 pub fn remove_validator(&mut self, validator: usize) -> Result<()> {
990 self.running_validators
991 .remove(&validator)
992 .context("validator not found")?;
993 Ok(())
994 }
995
996 pub async fn start_server(&mut self, validator: usize, shard: usize) -> Result<()> {
997 let server = self.run_server(validator, shard).await?;
998 self.running_validators
999 .get_mut(&validator)
1000 .context("could not find validator")?
1001 .add_server(server);
1002 Ok(())
1003 }
1004}