1#[cfg(with_testing)]
5use std::sync::LazyLock;
6use std::{
7 collections::BTreeMap,
8 env,
9 num::NonZeroU16,
10 path::{Path, PathBuf},
11 sync::Arc,
12 time::Duration,
13};
14
15use anyhow::{anyhow, bail, ensure, Context, Result};
16#[cfg(with_testing)]
17use async_lock::RwLock;
18use async_trait::async_trait;
19use linera_base::{
20 command::{resolve_binary, CommandExt},
21 data_types::Amount,
22};
23use linera_client::client_options::ResourceControlPolicyConfig;
24use linera_core::node::ValidatorNodeProvider;
25use linera_rpc::config::{CrossChainConfig, ExporterServiceConfig, TlsConfig};
26#[cfg(all(feature = "storage-service", with_testing))]
27use linera_storage_service::common::storage_service_test_endpoint;
28#[cfg(all(feature = "rocksdb", feature = "scylladb", with_testing))]
29use linera_views::rocks_db::{RocksDbDatabase, RocksDbSpawnMode};
30#[cfg(all(feature = "scylladb", with_testing))]
31use linera_views::{scylla_db::ScyllaDbDatabase, store::TestKeyValueDatabase as _};
32use tempfile::{tempdir, TempDir};
33use tokio::process::{Child, Command};
34use tonic::transport::{channel::ClientTlsConfig, Endpoint};
35use tonic_health::pb::{
36 health_check_response::ServingStatus, health_client::HealthClient, HealthCheckRequest,
37};
38use tracing::{error, info, warn};
39
40use crate::{
41 cli_wrappers::{
42 ClientWrapper, LineraNet, LineraNetConfig, Network, NetworkConfig, OnClientDrop,
43 },
44 config::{BlockExporterConfig, Destination, DestinationConfig},
45 storage::{InnerStorageConfig, StorageConfig},
46 util::ChildExt,
47};
48
49const MAX_NUMBER_SHARDS: usize = 1000;
51
52pub const FIRST_PUBLIC_PORT: usize = 13000;
53
54pub enum ProcessInbox {
55 Skip,
56 Automatic,
57}
58
59#[cfg(with_testing)]
60static PORT_PROVIDER: LazyLock<RwLock<u16>> = LazyLock::new(|| RwLock::new(7080));
61
62#[cfg(with_testing)]
64pub async fn get_node_port() -> u16 {
65 let mut port = PORT_PROVIDER.write().await;
66 let port_ret = *port;
67 *port += 1;
68 info!("get_node_port returning port_ret={}", port_ret);
69 assert!(port_selector::is_free(port_ret));
70 port_ret
71}
72
73#[cfg(with_testing)]
74async fn make_testing_config(database: Database) -> Result<InnerStorageConfig> {
75 match database {
76 Database::Service => {
77 #[cfg(feature = "storage-service")]
78 {
79 let endpoint = storage_service_test_endpoint()
80 .expect("Reading LINERA_STORAGE_SERVICE environment variable");
81 Ok(InnerStorageConfig::Service { endpoint })
82 }
83 #[cfg(not(feature = "storage-service"))]
84 panic!("Database::Service is selected without the feature storage_service");
85 }
86 Database::DynamoDb => {
87 #[cfg(feature = "dynamodb")]
88 {
89 let use_dynamodb_local = true;
90 Ok(InnerStorageConfig::DynamoDb { use_dynamodb_local })
91 }
92 #[cfg(not(feature = "dynamodb"))]
93 panic!("Database::DynamoDb is selected without the feature dynamodb");
94 }
95 Database::ScyllaDb => {
96 #[cfg(feature = "scylladb")]
97 {
98 let config = ScyllaDbDatabase::new_test_config().await?;
99 Ok(InnerStorageConfig::ScyllaDb {
100 uri: config.inner_config.uri,
101 })
102 }
103 #[cfg(not(feature = "scylladb"))]
104 panic!("Database::ScyllaDb is selected without the feature scylladb");
105 }
106 Database::DualRocksDbScyllaDb => {
107 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
108 {
109 let rocksdb_config = RocksDbDatabase::new_test_config().await?;
110 let scylla_config = ScyllaDbDatabase::new_test_config().await?;
111 let spawn_mode = RocksDbSpawnMode::get_spawn_mode_from_runtime();
112 Ok(InnerStorageConfig::DualRocksDbScyllaDb {
113 path_with_guard: rocksdb_config.inner_config.path_with_guard,
114 spawn_mode,
115 uri: scylla_config.inner_config.uri,
116 })
117 }
118 #[cfg(not(all(feature = "rocksdb", feature = "scylladb")))]
119 panic!("Database::DualRocksDbScyllaDb is selected without the features rocksdb and scylladb");
120 }
121 }
122}
123
124pub enum InnerStorageConfigBuilder {
125 #[cfg(with_testing)]
126 TestConfig,
127 ExistingConfig {
128 storage_config: InnerStorageConfig,
129 },
130}
131
132impl InnerStorageConfigBuilder {
133 #[cfg_attr(not(with_testing), expect(unused_variables))]
134 pub async fn build(self, database: Database) -> Result<InnerStorageConfig> {
135 match self {
136 #[cfg(with_testing)]
137 InnerStorageConfigBuilder::TestConfig => make_testing_config(database).await,
138 InnerStorageConfigBuilder::ExistingConfig { storage_config } => Ok(storage_config),
139 }
140 }
141}
142
143#[derive(Clone)]
146pub enum PathProvider {
147 ExternalPath { path_buf: PathBuf },
148 TemporaryDirectory { tmp_dir: Arc<TempDir> },
149}
150
151impl PathProvider {
152 pub fn path(&self) -> &Path {
153 match self {
154 PathProvider::ExternalPath { path_buf } => path_buf.as_path(),
155 PathProvider::TemporaryDirectory { tmp_dir } => tmp_dir.path(),
156 }
157 }
158
159 pub fn create_temporary_directory() -> Result<Self> {
160 let tmp_dir = Arc::new(tempdir()?);
161 Ok(PathProvider::TemporaryDirectory { tmp_dir })
162 }
163
164 pub fn from_path_option(path: &Option<String>) -> anyhow::Result<Self> {
165 Ok(match path {
166 None => {
167 let tmp_dir = Arc::new(tempfile::tempdir()?);
168 PathProvider::TemporaryDirectory { tmp_dir }
169 }
170 Some(path) => {
171 let path = Path::new(path);
172 let path_buf = path.to_path_buf();
173 PathProvider::ExternalPath { path_buf }
174 }
175 })
176 }
177}
178
179pub struct LocalNetConfig {
181 pub database: Database,
182 pub network: NetworkConfig,
183 pub testing_prng_seed: Option<u64>,
184 pub namespace: String,
185 pub num_other_initial_chains: u32,
186 pub initial_amount: Amount,
187 pub num_initial_validators: usize,
188 pub num_shards: usize,
189 pub num_proxies: usize,
190 pub policy_config: ResourceControlPolicyConfig,
191 pub cross_chain_config: CrossChainConfig,
192 pub storage_config_builder: InnerStorageConfigBuilder,
193 pub path_provider: PathProvider,
194 pub block_exporters: ExportersSetup,
195}
196
197#[derive(Clone, PartialEq)]
199pub enum ExportersSetup {
200 Local(Vec<BlockExporterConfig>),
202 Remote(Vec<ExporterServiceConfig>),
204}
205
206impl ExportersSetup {
207 pub fn new(
208 with_block_exporter: bool,
209 block_exporter_address: String,
210 block_exporter_port: NonZeroU16,
211 ) -> ExportersSetup {
212 if with_block_exporter {
213 let exporter_config =
214 ExporterServiceConfig::new(block_exporter_address, block_exporter_port.into());
215 ExportersSetup::Remote(vec![exporter_config])
216 } else {
217 ExportersSetup::Local(vec![])
218 }
219 }
220}
221
222pub struct LocalNet {
224 network: NetworkConfig,
225 testing_prng_seed: Option<u64>,
226 next_client_id: usize,
227 num_initial_validators: usize,
228 num_proxies: usize,
229 num_shards: usize,
230 validator_keys: BTreeMap<usize, (String, String)>,
231 running_validators: BTreeMap<usize, Validator>,
232 initialized_validator_storages: BTreeMap<usize, StorageConfig>,
233 common_namespace: String,
234 common_storage_config: InnerStorageConfig,
235 cross_chain_config: CrossChainConfig,
236 path_provider: PathProvider,
237 block_exporters: ExportersSetup,
238}
239
240const SERVER_ENV: &str = "LINERA_SERVER_PARAMS";
243
244#[derive(Copy, Clone, Eq, PartialEq)]
246pub enum Database {
247 Service,
248 DynamoDb,
249 ScyllaDb,
250 DualRocksDbScyllaDb,
251}
252
253struct Validator {
255 proxies: Vec<Child>,
256 servers: Vec<Child>,
257 exporters: Vec<Child>,
258}
259
260impl Validator {
261 fn new() -> Self {
262 Self {
263 proxies: vec![],
264 servers: vec![],
265 exporters: vec![],
266 }
267 }
268
269 async fn terminate(&mut self) -> Result<()> {
270 for proxy in &mut self.proxies {
271 proxy.kill().await.context("terminating validator proxy")?;
272 }
273 for server in &mut self.servers {
274 server
275 .kill()
276 .await
277 .context("terminating validator server")?;
278 }
279 Ok(())
280 }
281
282 fn add_proxy(&mut self, proxy: Child) {
283 self.proxies.push(proxy)
284 }
285
286 fn add_server(&mut self, server: Child) {
287 self.servers.push(server)
288 }
289
290 #[cfg(with_testing)]
291 async fn terminate_server(&mut self, index: usize) -> Result<()> {
292 let mut server = self.servers.remove(index);
293 server
294 .kill()
295 .await
296 .context("terminating validator server")?;
297 Ok(())
298 }
299
300 fn add_block_exporter(&mut self, exporter: Child) {
301 self.exporters.push(exporter);
302 }
303
304 fn ensure_is_running(&mut self) -> Result<()> {
305 for proxy in &mut self.proxies {
306 proxy.ensure_is_running()?;
307 }
308 for child in &mut self.servers {
309 child.ensure_is_running()?;
310 }
311 for exporter in &mut self.exporters {
312 exporter.ensure_is_running()?;
313 }
314 Ok(())
315 }
316}
317
318#[cfg(with_testing)]
319impl LocalNetConfig {
320 pub fn new_test(database: Database, network: Network) -> Self {
321 let num_shards = 4;
322 let num_proxies = 1;
323 let storage_config_builder = InnerStorageConfigBuilder::TestConfig;
324 let path_provider = PathProvider::create_temporary_directory().unwrap();
325 let internal = network.drop_tls();
326 let external = network;
327 let network = NetworkConfig { internal, external };
328 let cross_chain_config = CrossChainConfig::default();
329 Self {
330 database,
331 network,
332 num_other_initial_chains: 2,
333 initial_amount: Amount::from_tokens(1_000_000),
334 policy_config: ResourceControlPolicyConfig::Testnet,
335 cross_chain_config,
336 testing_prng_seed: Some(37),
337 namespace: linera_views::random::generate_test_namespace(),
338 num_initial_validators: 4,
339 num_shards,
340 num_proxies,
341 storage_config_builder,
342 path_provider,
343 block_exporters: ExportersSetup::Local(vec![]),
344 }
345 }
346}
347
348#[async_trait]
349impl LineraNetConfig for LocalNetConfig {
350 type Net = LocalNet;
351
352 async fn instantiate(self) -> Result<(Self::Net, ClientWrapper)> {
353 let storage_config = self.storage_config_builder.build(self.database).await?;
354 let mut net = LocalNet::new(
355 self.network,
356 self.testing_prng_seed,
357 self.namespace,
358 self.num_initial_validators,
359 self.num_proxies,
360 self.num_shards,
361 storage_config,
362 self.cross_chain_config,
363 self.path_provider,
364 self.block_exporters,
365 );
366 let client = net.make_client().await;
367 ensure!(
368 self.num_initial_validators > 0,
369 "There should be at least one initial validator"
370 );
371 let total_number_shards = self.num_initial_validators * self.num_shards;
372 ensure!(
373 total_number_shards <= MAX_NUMBER_SHARDS,
374 "Total number of shards ({}) exceeds maximum allowed ({})",
375 self.num_shards,
376 MAX_NUMBER_SHARDS
377 );
378 net.generate_initial_validator_config().await?;
379 client
380 .create_genesis_config(
381 self.num_other_initial_chains,
382 self.initial_amount,
383 self.policy_config,
384 Some(vec!["localhost".to_owned()]),
385 )
386 .await?;
387 net.run().await?;
388 Ok((net, client))
389 }
390}
391
392#[async_trait]
393impl LineraNet for LocalNet {
394 async fn ensure_is_running(&mut self) -> Result<()> {
395 for validator in self.running_validators.values_mut() {
396 validator.ensure_is_running().context("in local network")?;
397 }
398 Ok(())
399 }
400
401 async fn make_client(&mut self) -> ClientWrapper {
402 let client = ClientWrapper::new(
403 self.path_provider.clone(),
404 self.network.external,
405 self.testing_prng_seed,
406 self.next_client_id,
407 OnClientDrop::LeakChains,
408 );
409 if let Some(seed) = self.testing_prng_seed {
410 self.testing_prng_seed = Some(seed + 1);
411 }
412 self.next_client_id += 1;
413 client
414 }
415
416 async fn terminate(&mut self) -> Result<()> {
417 for validator in self.running_validators.values_mut() {
418 validator.terminate().await.context("in local network")?
419 }
420 Ok(())
421 }
422}
423
424impl LocalNet {
425 #[expect(clippy::too_many_arguments)]
426 fn new(
427 network: NetworkConfig,
428 testing_prng_seed: Option<u64>,
429 common_namespace: String,
430 num_initial_validators: usize,
431 num_proxies: usize,
432 num_shards: usize,
433 common_storage_config: InnerStorageConfig,
434 cross_chain_config: CrossChainConfig,
435 path_provider: PathProvider,
436 block_exporters: ExportersSetup,
437 ) -> Self {
438 Self {
439 network,
440 testing_prng_seed,
441 next_client_id: 0,
442 num_initial_validators,
443 num_proxies,
444 num_shards,
445 validator_keys: BTreeMap::new(),
446 running_validators: BTreeMap::new(),
447 initialized_validator_storages: BTreeMap::new(),
448 common_namespace,
449 common_storage_config,
450 cross_chain_config,
451 path_provider,
452 block_exporters,
453 }
454 }
455
456 async fn command_for_binary(&self, name: &'static str) -> Result<Command> {
457 let path = resolve_binary(name, env!("CARGO_PKG_NAME")).await?;
458 let mut command = Command::new(path);
459 command.current_dir(self.path_provider.path());
460 Ok(command)
461 }
462
463 #[cfg(with_testing)]
464 pub fn genesis_config(&self) -> Result<linera_client::config::GenesisConfig> {
465 let path = self.path_provider.path();
466 crate::util::read_json(path.join("genesis.json"))
467 }
468
469 fn shard_port(&self, validator: usize, shard: usize) -> usize {
470 9000 + validator * self.num_shards + shard + 1
471 }
472
473 fn proxy_internal_port(&self, validator: usize, proxy_id: usize) -> usize {
474 10000 + validator * self.num_proxies + proxy_id + 1
475 }
476
477 fn shard_metrics_port(&self, validator: usize, shard: usize) -> usize {
478 11000 + validator * self.num_shards + shard + 1
479 }
480
481 fn proxy_metrics_port(&self, validator: usize, proxy_id: usize) -> usize {
482 12000 + validator * self.num_proxies + proxy_id + 1
483 }
484
485 fn block_exporter_port(&self, validator: usize, exporter_id: usize) -> usize {
486 12000 + validator * self.num_shards + exporter_id + 1
487 }
488
489 pub fn proxy_public_port(&self, validator: usize, proxy_id: usize) -> usize {
490 FIRST_PUBLIC_PORT + validator * self.num_proxies + proxy_id + 1
491 }
492
493 pub fn first_public_port() -> usize {
494 FIRST_PUBLIC_PORT + 1
495 }
496
497 fn block_exporter_metrics_port(exporter_id: usize) -> usize {
498 FIRST_PUBLIC_PORT + exporter_id + 1
499 }
500
501 fn configuration_string(&self, server_number: usize) -> Result<String> {
502 let n = server_number;
503 let path = self
504 .path_provider
505 .path()
506 .join(format!("validator_{n}.toml"));
507 let port = self.proxy_public_port(n, 0);
508 let external_protocol = self.network.external.toml();
509 let internal_protocol = self.network.internal.toml();
510 let external_host = self.network.external.localhost();
511 let internal_host = self.network.internal.localhost();
512 let mut content = format!(
513 r#"
514 server_config_path = "server_{n}.json"
515 host = "{external_host}"
516 port = {port}
517 external_protocol = {external_protocol}
518 internal_protocol = {internal_protocol}
519 "#
520 );
521
522 for k in 0..self.num_proxies {
523 let public_port = self.proxy_public_port(n, k);
524 let internal_port = self.proxy_internal_port(n, k);
525 let metrics_port = self.proxy_metrics_port(n, k);
526 content.push_str(&format!(
530 r#"
531 [[proxies]]
532 host = "{internal_host}"
533 public_port = {public_port}
534 private_port = {internal_port}
535 metrics_port = {metrics_port}
536 "#
537 ));
538 }
539
540 for k in 0..self.num_shards {
541 let shard_port = self.shard_port(n, k);
542 let shard_metrics_port = self.shard_metrics_port(n, k);
543 content.push_str(&format!(
544 r#"
545
546 [[shards]]
547 host = "{internal_host}"
548 port = {shard_port}
549 metrics_port = {shard_metrics_port}
550 "#
551 ));
552 }
553
554 match self.block_exporters {
555 ExportersSetup::Local(ref exporters) => {
556 for (j, exporter) in exporters.iter().enumerate() {
557 let host = Network::Grpc.localhost();
558 let port = self.block_exporter_port(n, j);
559 let config_content = format!(
560 r#"
561
562 [[block_exporters]]
563 host = "{host}"
564 port = {port}
565 "#
566 );
567
568 content.push_str(&config_content);
569 let exporter_config = self.generate_block_exporter_config(
570 n,
571 j as u32,
572 &exporter.destination_config,
573 );
574 let config_path = self
575 .path_provider
576 .path()
577 .join(format!("exporter_config_{n}:{j}.toml"));
578
579 fs_err::write(&config_path, &exporter_config)?;
580 }
581 }
582 ExportersSetup::Remote(ref exporters) => {
583 for exporter in exporters {
584 let host = exporter.host.clone();
585 let port = exporter.port;
586 let config_content = format!(
587 r#"
588
589 [[block_exporters]]
590 host = "{host}"
591 port = {port}
592 "#
593 );
594
595 content.push_str(&config_content);
596 }
597 }
598 }
599
600 fs_err::write(&path, content)?;
601 path.into_os_string().into_string().map_err(|error| {
602 anyhow!(
603 "could not parse OS string into string: {}",
604 error.to_string_lossy()
605 )
606 })
607 }
608
609 fn generate_block_exporter_config(
610 &self,
611 validator: usize,
612 exporter_id: u32,
613 destination_config: &DestinationConfig,
614 ) -> String {
615 let n = validator;
616 let host = Network::Grpc.localhost();
617 let port = self.block_exporter_port(n, exporter_id as usize);
618 let metrics_port = Self::block_exporter_metrics_port(exporter_id as usize);
619 let mut config = format!(
620 r#"
621 id = {exporter_id}
622
623 metrics_port = {metrics_port}
624
625 [service_config]
626 host = "{host}"
627 port = {port}
628
629 "#
630 );
631
632 let DestinationConfig {
633 destinations,
634 committee_destination,
635 } = destination_config;
636
637 if *committee_destination {
638 let destination_string_to_push = r#"
639
640 [destination_config]
641 committee_destination = true
642 "#
643 .to_string();
644
645 config.push_str(&destination_string_to_push);
646 }
647
648 for destination in destinations {
649 let destination_string_to_push = match destination {
650 Destination::Indexer {
651 tls,
652 endpoint,
653 port,
654 } => {
655 let tls = match tls {
656 TlsConfig::ClearText => "ClearText",
657 TlsConfig::Tls => "Tls",
658 };
659 format!(
660 r#"
661 [[destination_config.destinations]]
662 tls = "{tls}"
663 endpoint = "{endpoint}"
664 port = {port}
665 kind = "Indexer"
666 "#
667 )
668 }
669 Destination::Validator { endpoint, port } => {
670 format!(
671 r#"
672 [[destination_config.destinations]]
673 endpoint = "{endpoint}"
674 port = {port}
675 kind = "Validator"
676 "#
677 )
678 }
679 Destination::Logging { file_name } => {
680 format!(
681 r#"
682 [[destination_config.destinations]]
683 file_name = "{file_name}"
684 kind = "Logging"
685 "#
686 )
687 }
688 };
689
690 config.push_str(&destination_string_to_push);
691 }
692
693 config
694 }
695
696 async fn generate_initial_validator_config(&mut self) -> Result<()> {
697 let mut command = self.command_for_binary("linera-server").await?;
698 command.arg("generate");
699 if let Some(seed) = self.testing_prng_seed {
700 command.arg("--testing-prng-seed").arg(seed.to_string());
701 self.testing_prng_seed = Some(seed + 1);
702 }
703 command.arg("--validators");
704 for i in 0..self.num_initial_validators {
705 command.arg(&self.configuration_string(i)?);
706 }
707 let output = command
708 .args(["--committee", "committee.json"])
709 .spawn_and_wait_for_stdout()
710 .await?;
711 self.validator_keys = output
712 .split_whitespace()
713 .map(str::to_string)
714 .map(|keys| keys.split(',').map(str::to_string).collect::<Vec<_>>())
715 .enumerate()
716 .map(|(i, keys)| {
717 let validator_key = keys[0].to_string();
718 let account_key = keys[1].to_string();
719 (i, (validator_key, account_key))
720 })
721 .collect();
722 Ok(())
723 }
724
725 async fn run_proxy(&mut self, validator: usize, proxy_id: usize) -> Result<Child> {
726 let storage = self
727 .initialized_validator_storages
728 .get(&validator)
729 .expect("initialized storage");
730 let child = self
731 .command_for_binary("linera-proxy")
732 .await?
733 .arg(format!("server_{}.json", validator))
734 .args(["--storage", &storage.to_string()])
735 .args(["--id", &proxy_id.to_string()])
736 .spawn_into()?;
737
738 let port = self.proxy_public_port(validator, proxy_id);
739 let nickname = format!("validator proxy {validator}");
740 match self.network.external {
741 Network::Grpc => {
742 Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
743 let nickname = format!("validator proxy {validator}");
744 Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
745 }
746 Network::Grpcs => {
747 let nickname = format!("validator proxy {validator}");
748 Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
749 }
750 Network::Tcp => {
751 Self::ensure_simple_server_has_started(&nickname, port, "tcp").await?;
752 }
753 Network::Udp => {
754 Self::ensure_simple_server_has_started(&nickname, port, "udp").await?;
755 }
756 }
757 Ok(child)
758 }
759
760 async fn run_exporter(&mut self, validator: usize, exporter_id: u32) -> Result<Child> {
761 let config_path = format!("exporter_config_{validator}:{exporter_id}.toml");
762 let storage = self
763 .initialized_validator_storages
764 .get(&validator)
765 .expect("initialized storage");
766
767 tracing::debug!(config=?config_path, storage=?storage.to_string(), "starting block exporter");
768
769 let child = self
770 .command_for_binary("linera-exporter")
771 .await?
772 .args(["--config-path", &config_path])
773 .args(["--storage", &storage.to_string()])
774 .spawn_into()?;
775
776 match self.network.internal {
777 Network::Grpc => {
778 let port = self.block_exporter_port(validator, exporter_id as usize);
779 let nickname = format!("block exporter {validator}:{exporter_id}");
780 Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
781 }
782 Network::Grpcs => {
783 let port = self.block_exporter_port(validator, exporter_id as usize);
784 let nickname = format!("block exporter {validator}:{exporter_id}");
785 Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
786 }
787 Network::Tcp | Network::Udp => {
788 unreachable!("Only allowed options are grpc and grpcs")
789 }
790 }
791
792 tracing::info!("block exporter started {validator}:{exporter_id}");
793
794 Ok(child)
795 }
796
797 pub async fn ensure_grpc_server_has_started(
798 nickname: &str,
799 port: usize,
800 scheme: &str,
801 ) -> Result<()> {
802 let endpoint = match scheme {
803 "http" => Endpoint::new(format!("http://localhost:{port}"))
804 .context("endpoint should always parse")?,
805 "https" => {
806 use linera_rpc::CERT_PEM;
807 let certificate = tonic::transport::Certificate::from_pem(CERT_PEM);
808 let tls_config = ClientTlsConfig::new().ca_certificate(certificate);
809 Endpoint::new(format!("https://localhost:{port}"))
810 .context("endpoint should always parse")?
811 .tls_config(tls_config)?
812 }
813 _ => bail!("Only supported scheme are http and https"),
814 };
815 let connection = endpoint.connect_lazy();
816 let mut client = HealthClient::new(connection);
817 linera_base::time::timer::sleep(Duration::from_millis(100)).await;
818 for i in 0..10 {
819 linera_base::time::timer::sleep(Duration::from_millis(i * 500)).await;
820 let result = client.check(HealthCheckRequest::default()).await;
821 if result.is_ok() && result.unwrap().get_ref().status() == ServingStatus::Serving {
822 info!(?port, "Successfully started {nickname}");
823 return Ok(());
824 } else {
825 warn!("Waiting for {nickname} to start");
826 }
827 }
828 bail!("Failed to start {nickname}");
829 }
830
831 async fn ensure_simple_server_has_started(
832 nickname: &str,
833 port: usize,
834 protocol: &str,
835 ) -> Result<()> {
836 use linera_core::node::ValidatorNode as _;
837
838 let options = linera_rpc::NodeOptions {
839 send_timeout: Duration::from_secs(5),
840 recv_timeout: Duration::from_secs(5),
841 retry_delay: Duration::from_secs(1),
842 max_retries: 1,
843 };
844 let provider = linera_rpc::simple::SimpleNodeProvider::new(options);
845 let address = format!("{protocol}:127.0.0.1:{port}");
846 let node = provider.make_node(&address)?;
849 linera_base::time::timer::sleep(Duration::from_millis(100)).await;
850 for i in 0..10 {
851 linera_base::time::timer::sleep(Duration::from_millis(i * 500)).await;
852 let result = node.get_version_info().await;
853 if result.is_ok() {
854 info!("Successfully started {nickname}");
855 return Ok(());
856 } else {
857 warn!("Waiting for {nickname} to start");
858 }
859 }
860 bail!("Failed to start {nickname}");
861 }
862
863 async fn initialize_storage(&mut self, validator: usize) -> Result<()> {
864 let namespace = format!("{}_server_{}_db", self.common_namespace, validator);
865 let inner_storage_config = self.common_storage_config.clone();
866 let storage = StorageConfig {
867 inner_storage_config,
868 namespace,
869 };
870 let mut command = self.command_for_binary("linera").await?;
871 if let Ok(var) = env::var(SERVER_ENV) {
872 command.args(var.split_whitespace());
873 }
874 command.args(["storage", "initialize"]);
875 command
876 .args(["--storage", &storage.to_string()])
877 .args(["--genesis", "genesis.json"])
878 .spawn_and_wait_for_stdout()
879 .await?;
880
881 self.initialized_validator_storages
882 .insert(validator, storage);
883 Ok(())
884 }
885
886 async fn run_server(&mut self, validator: usize, shard: usize) -> Result<Child> {
887 let mut storage = self
888 .initialized_validator_storages
889 .get(&validator)
890 .expect("initialized storage")
891 .clone();
892
893 storage.maybe_append_shard_path(shard)?;
896
897 let mut command = self.command_for_binary("linera-server").await?;
898 if let Ok(var) = env::var(SERVER_ENV) {
899 command.args(var.split_whitespace());
900 }
901 command
902 .arg("run")
903 .args(["--storage", &storage.to_string()])
904 .args(["--server", &format!("server_{}.json", validator)])
905 .args(["--shard", &shard.to_string()])
906 .args(self.cross_chain_config.to_args());
907 let child = command.spawn_into()?;
908
909 let port = self.shard_port(validator, shard);
910 let nickname = format!("validator server {validator}:{shard}");
911 match self.network.internal {
912 Network::Grpc => {
913 Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
914 }
915 Network::Grpcs => {
916 Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
917 }
918 Network::Tcp => {
919 Self::ensure_simple_server_has_started(&nickname, port, "tcp").await?;
920 }
921 Network::Udp => {
922 Self::ensure_simple_server_has_started(&nickname, port, "udp").await?;
923 }
924 }
925 Ok(child)
926 }
927
928 async fn run(&mut self) -> Result<()> {
929 for validator in 0..self.num_initial_validators {
930 self.start_validator(validator).await?;
931 }
932 Ok(())
933 }
934
935 pub async fn start_validator(&mut self, index: usize) -> Result<()> {
937 self.initialize_storage(index).await?;
938 self.restart_validator(index).await
939 }
940
941 pub async fn restart_validator(&mut self, index: usize) -> Result<()> {
944 let mut validator = Validator::new();
945 for k in 0..self.num_proxies {
946 let proxy = self.run_proxy(index, k).await?;
947 validator.add_proxy(proxy);
948 }
949 for shard in 0..self.num_shards {
950 let server = self.run_server(index, shard).await?;
951 validator.add_server(server);
952 }
953 if let ExportersSetup::Local(ref exporters) = self.block_exporters {
954 for block_exporter in 0..exporters.len() {
955 let exporter = self.run_exporter(index, block_exporter as u32).await?;
956 validator.add_block_exporter(exporter);
957 }
958 }
959
960 self.running_validators.insert(index, validator);
961 Ok(())
962 }
963
964 pub async fn stop_validator(&mut self, index: usize) -> Result<()> {
966 if let Some(mut validator) = self.running_validators.remove(&index) {
967 if let Err(error) = validator.terminate().await {
968 error!("Failed to stop validator {index}: {error}");
969 return Err(error);
970 }
971 }
972 Ok(())
973 }
974
975 pub fn validator_client(&mut self, validator: usize) -> Result<linera_rpc::Client> {
977 let node_provider = linera_rpc::NodeProvider::new(linera_rpc::NodeOptions {
978 send_timeout: Duration::from_secs(1),
979 recv_timeout: Duration::from_secs(1),
980 retry_delay: Duration::ZERO,
981 max_retries: 0,
982 });
983
984 Ok(node_provider.make_node(&self.validator_address(validator))?)
985 }
986
987 pub fn validator_address(&self, validator: usize) -> String {
990 let port = self.proxy_public_port(validator, 0);
991 let schema = self.network.external.schema();
992
993 format!("{schema}:localhost:{port}")
994 }
995}
996
997#[cfg(with_testing)]
998impl LocalNet {
999 pub fn validator_keys(&self, validator: usize) -> Option<&(String, String)> {
1001 self.validator_keys.get(&validator)
1002 }
1003
1004 pub async fn generate_validator_config(&mut self, validator: usize) -> Result<()> {
1005 let stdout = self
1006 .command_for_binary("linera-server")
1007 .await?
1008 .arg("generate")
1009 .arg("--validators")
1010 .arg(&self.configuration_string(validator)?)
1011 .spawn_and_wait_for_stdout()
1012 .await?;
1013 let keys = stdout
1014 .trim()
1015 .split(',')
1016 .map(str::to_string)
1017 .collect::<Vec<_>>();
1018 self.validator_keys
1019 .insert(validator, (keys[0].clone(), keys[1].clone()));
1020 Ok(())
1021 }
1022
1023 pub async fn terminate_server(&mut self, validator: usize, shard: usize) -> Result<()> {
1024 self.running_validators
1025 .get_mut(&validator)
1026 .context("server not found")?
1027 .terminate_server(shard)
1028 .await?;
1029 Ok(())
1030 }
1031
1032 pub fn remove_validator(&mut self, validator: usize) -> Result<()> {
1033 self.running_validators
1034 .remove(&validator)
1035 .context("validator not found")?;
1036 Ok(())
1037 }
1038
1039 pub async fn start_server(&mut self, validator: usize, shard: usize) -> Result<()> {
1040 let server = self.run_server(validator, shard).await?;
1041 self.running_validators
1042 .get_mut(&validator)
1043 .context("could not find validator")?
1044 .add_server(server);
1045 Ok(())
1046 }
1047}