1#[cfg(with_testing)]
5use std::sync::LazyLock;
6use std::{
7 collections::BTreeMap,
8 env,
9 num::NonZeroU16,
10 path::{Path, PathBuf},
11 sync::Arc,
12 time::Duration,
13};
14
15use anyhow::{anyhow, bail, ensure, Context, Result};
16#[cfg(with_testing)]
17use async_lock::RwLock;
18use async_trait::async_trait;
19use linera_base::{
20 command::{resolve_binary, CommandExt},
21 data_types::Amount,
22};
23use linera_client::client_options::ResourceControlPolicyConfig;
24use linera_core::node::ValidatorNodeProvider;
25use linera_rpc::config::{CrossChainConfig, ExporterServiceConfig, TlsConfig};
26#[cfg(all(feature = "storage-service", with_testing))]
27use linera_storage_service::common::storage_service_test_endpoint;
28#[cfg(all(feature = "rocksdb", feature = "scylladb", with_testing))]
29use linera_views::rocks_db::{RocksDbDatabase, RocksDbSpawnMode};
30#[cfg(all(feature = "scylladb", with_testing))]
31use linera_views::{scylla_db::ScyllaDbDatabase, store::TestKeyValueDatabase as _};
32use tempfile::{tempdir, TempDir};
33use tokio::process::{Child, Command};
34use tonic::transport::{channel::ClientTlsConfig, Endpoint};
35use tonic_health::pb::{
36 health_check_response::ServingStatus, health_client::HealthClient, HealthCheckRequest,
37};
38use tracing::{error, info, warn};
39
40use crate::{
41 cli_wrappers::{
42 ClientWrapper, LineraNet, LineraNetConfig, Network, NetworkConfig, OnClientDrop,
43 },
44 config::{BlockExporterConfig, Destination, DestinationConfig},
45 storage::{InnerStorageConfig, StorageConfig},
46 util::ChildExt,
47};
48
49const MAX_NUMBER_SHARDS: usize = 1000;
51
52pub enum ProcessInbox {
53 Skip,
54 Automatic,
55}
56
57#[cfg(with_testing)]
58static PORT_PROVIDER: LazyLock<RwLock<u16>> = LazyLock::new(|| RwLock::new(7080));
59
60fn test_offset_port() -> usize {
62 std::env::var("TEST_OFFSET_PORT")
63 .ok()
64 .and_then(|port_str| port_str.parse::<usize>().ok())
65 .unwrap_or(9000)
66}
67
68#[cfg(with_testing)]
70pub async fn get_node_port() -> u16 {
71 let mut port = PORT_PROVIDER.write().await;
72 let port_ret = *port;
73 *port += 1;
74 info!("get_node_port returning port_ret={}", port_ret);
75 assert!(port_selector::is_free(port_ret));
76 port_ret
77}
78
79#[cfg(with_testing)]
80async fn make_testing_config(database: Database) -> Result<InnerStorageConfig> {
81 match database {
82 Database::Service => {
83 #[cfg(feature = "storage-service")]
84 {
85 let endpoint = storage_service_test_endpoint()
86 .expect("Reading LINERA_STORAGE_SERVICE environment variable");
87 Ok(InnerStorageConfig::Service { endpoint })
88 }
89 #[cfg(not(feature = "storage-service"))]
90 panic!("Database::Service is selected without the feature storage_service");
91 }
92 Database::DynamoDb => {
93 #[cfg(feature = "dynamodb")]
94 {
95 let use_dynamodb_local = true;
96 Ok(InnerStorageConfig::DynamoDb { use_dynamodb_local })
97 }
98 #[cfg(not(feature = "dynamodb"))]
99 panic!("Database::DynamoDb is selected without the feature dynamodb");
100 }
101 Database::ScyllaDb => {
102 #[cfg(feature = "scylladb")]
103 {
104 let config = ScyllaDbDatabase::new_test_config().await?;
105 Ok(InnerStorageConfig::ScyllaDb {
106 uri: config.inner_config.uri,
107 })
108 }
109 #[cfg(not(feature = "scylladb"))]
110 panic!("Database::ScyllaDb is selected without the feature scylladb");
111 }
112 Database::DualRocksDbScyllaDb => {
113 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
114 {
115 let rocksdb_config = RocksDbDatabase::new_test_config().await?;
116 let scylla_config = ScyllaDbDatabase::new_test_config().await?;
117 let spawn_mode = RocksDbSpawnMode::get_spawn_mode_from_runtime();
118 Ok(InnerStorageConfig::DualRocksDbScyllaDb {
119 path_with_guard: rocksdb_config.inner_config.path_with_guard,
120 spawn_mode,
121 uri: scylla_config.inner_config.uri,
122 })
123 }
124 #[cfg(not(all(feature = "rocksdb", feature = "scylladb")))]
125 panic!("Database::DualRocksDbScyllaDb is selected without the features rocksdb and scylladb");
126 }
127 }
128}
129
130pub enum InnerStorageConfigBuilder {
131 #[cfg(with_testing)]
132 TestConfig,
133 ExistingConfig {
134 storage_config: InnerStorageConfig,
135 },
136}
137
138impl InnerStorageConfigBuilder {
139 #[cfg_attr(not(with_testing), expect(unused_variables))]
140 pub async fn build(self, database: Database) -> Result<InnerStorageConfig> {
141 match self {
142 #[cfg(with_testing)]
143 InnerStorageConfigBuilder::TestConfig => make_testing_config(database).await,
144 InnerStorageConfigBuilder::ExistingConfig { storage_config } => Ok(storage_config),
145 }
146 }
147}
148
149#[derive(Clone)]
152pub enum PathProvider {
153 ExternalPath { path_buf: PathBuf },
154 TemporaryDirectory { tmp_dir: Arc<TempDir> },
155}
156
157impl PathProvider {
158 pub fn path(&self) -> &Path {
159 match self {
160 PathProvider::ExternalPath { path_buf } => path_buf.as_path(),
161 PathProvider::TemporaryDirectory { tmp_dir } => tmp_dir.path(),
162 }
163 }
164
165 pub fn create_temporary_directory() -> Result<Self> {
166 let tmp_dir = Arc::new(tempdir()?);
167 Ok(PathProvider::TemporaryDirectory { tmp_dir })
168 }
169
170 pub fn from_path_option(path: &Option<String>) -> anyhow::Result<Self> {
171 Ok(match path {
172 None => {
173 let tmp_dir = Arc::new(tempfile::tempdir()?);
174 PathProvider::TemporaryDirectory { tmp_dir }
175 }
176 Some(path) => {
177 let path = Path::new(path);
178 let path_buf = path.to_path_buf();
179 PathProvider::ExternalPath { path_buf }
180 }
181 })
182 }
183}
184
185pub struct LocalNetConfig {
187 pub database: Database,
188 pub network: NetworkConfig,
189 pub testing_prng_seed: Option<u64>,
190 pub namespace: String,
191 pub num_other_initial_chains: u32,
192 pub initial_amount: Amount,
193 pub num_initial_validators: usize,
194 pub num_shards: usize,
195 pub num_proxies: usize,
196 pub policy_config: ResourceControlPolicyConfig,
197 pub http_request_allow_list: Option<Vec<String>>,
198 pub cross_chain_config: CrossChainConfig,
199 pub storage_config_builder: InnerStorageConfigBuilder,
200 pub path_provider: PathProvider,
201 pub block_exporters: ExportersSetup,
202 pub binary_dir: Option<PathBuf>,
205}
206
207#[derive(Clone, PartialEq)]
209pub enum ExportersSetup {
210 Local(Vec<BlockExporterConfig>),
212 Remote(Vec<ExporterServiceConfig>),
214}
215
216impl ExportersSetup {
217 pub fn new(
218 with_block_exporter: bool,
219 block_exporter_address: String,
220 block_exporter_port: NonZeroU16,
221 ) -> ExportersSetup {
222 if with_block_exporter {
223 let exporter_config =
224 ExporterServiceConfig::new(block_exporter_address, block_exporter_port.into());
225 ExportersSetup::Remote(vec![exporter_config])
226 } else {
227 ExportersSetup::Local(vec![])
228 }
229 }
230}
231
232pub struct LocalNet {
234 network: NetworkConfig,
235 testing_prng_seed: Option<u64>,
236 next_client_id: usize,
237 num_initial_validators: usize,
238 num_proxies: usize,
239 num_shards: usize,
240 validator_keys: BTreeMap<usize, (String, String)>,
241 running_validators: BTreeMap<usize, Validator>,
242 initialized_validator_storages: BTreeMap<usize, StorageConfig>,
243 common_namespace: String,
244 common_storage_config: InnerStorageConfig,
245 cross_chain_config: CrossChainConfig,
246 path_provider: PathProvider,
247 block_exporters: ExportersSetup,
248 binary_dir: Option<PathBuf>,
249}
250
251const SERVER_ENV: &str = "LINERA_SERVER_PARAMS";
254
255#[derive(Copy, Clone, Eq, PartialEq)]
257pub enum Database {
258 Service,
259 DynamoDb,
260 ScyllaDb,
261 DualRocksDbScyllaDb,
262}
263
264struct Validator {
266 proxies: Vec<Child>,
267 servers: Vec<Child>,
268 exporters: Vec<Child>,
269}
270
271impl Validator {
272 fn new() -> Self {
273 Self {
274 proxies: vec![],
275 servers: vec![],
276 exporters: vec![],
277 }
278 }
279
280 async fn terminate(&mut self) -> Result<()> {
281 for proxy in &mut self.proxies {
282 proxy.kill().await.context("terminating validator proxy")?;
283 }
284 for server in &mut self.servers {
285 server
286 .kill()
287 .await
288 .context("terminating validator server")?;
289 }
290 Ok(())
291 }
292
293 fn add_proxy(&mut self, proxy: Child) {
294 self.proxies.push(proxy)
295 }
296
297 fn add_server(&mut self, server: Child) {
298 self.servers.push(server)
299 }
300
301 #[cfg(with_testing)]
302 async fn terminate_server(&mut self, index: usize) -> Result<()> {
303 let mut server = self.servers.remove(index);
304 server
305 .kill()
306 .await
307 .context("terminating validator server")?;
308 Ok(())
309 }
310
311 fn add_block_exporter(&mut self, exporter: Child) {
312 self.exporters.push(exporter);
313 }
314
315 fn ensure_is_running(&mut self) -> Result<()> {
316 for proxy in &mut self.proxies {
317 proxy.ensure_is_running()?;
318 }
319 for child in &mut self.servers {
320 child.ensure_is_running()?;
321 }
322 for exporter in &mut self.exporters {
323 exporter.ensure_is_running()?;
324 }
325 Ok(())
326 }
327}
328
329#[cfg(with_testing)]
330impl LocalNetConfig {
331 pub fn new_test(database: Database, network: Network) -> Self {
332 let num_shards = 4;
333 let num_proxies = 1;
334 let storage_config_builder = InnerStorageConfigBuilder::TestConfig;
335 let path_provider = PathProvider::create_temporary_directory().unwrap();
336 let internal = network.drop_tls();
337 let external = network;
338 let network = NetworkConfig { internal, external };
339 let cross_chain_config = CrossChainConfig::default();
340 Self {
341 database,
342 network,
343 num_other_initial_chains: 2,
344 initial_amount: Amount::from_tokens(1_000_000),
345 policy_config: ResourceControlPolicyConfig::Testnet,
346 cross_chain_config,
347 testing_prng_seed: Some(37),
348 namespace: linera_views::random::generate_test_namespace(),
349 num_initial_validators: 4,
350 num_shards,
351 num_proxies,
352 storage_config_builder,
353 path_provider,
354 block_exporters: ExportersSetup::Local(vec![]),
355 http_request_allow_list: Some(vec!["localhost".to_string()]),
356 binary_dir: None,
357 }
358 }
359}
360
361#[async_trait]
362impl LineraNetConfig for LocalNetConfig {
363 type Net = LocalNet;
364
365 async fn instantiate(self) -> Result<(Self::Net, ClientWrapper)> {
366 let storage_config = self.storage_config_builder.build(self.database).await?;
367 let mut net = LocalNet::new(
368 self.network,
369 self.testing_prng_seed,
370 self.namespace,
371 self.num_initial_validators,
372 self.num_proxies,
373 self.num_shards,
374 storage_config,
375 self.cross_chain_config,
376 self.path_provider,
377 self.block_exporters,
378 self.binary_dir,
379 );
380 let client = net.make_client().await;
381 ensure!(
382 self.num_initial_validators > 0,
383 "There should be at least one initial validator"
384 );
385 let total_number_shards = self.num_initial_validators * self.num_shards;
386 ensure!(
387 total_number_shards <= MAX_NUMBER_SHARDS,
388 "Total number of shards ({}) exceeds maximum allowed ({})",
389 self.num_shards,
390 MAX_NUMBER_SHARDS
391 );
392 net.generate_initial_validator_config().await?;
393 client
394 .create_genesis_config(
395 self.num_other_initial_chains,
396 self.initial_amount,
397 self.policy_config,
398 self.http_request_allow_list
399 .clone()
400 .or_else(|| Some(vec!["localhost".to_owned()])),
401 )
402 .await?;
403 net.run().await?;
404 Ok((net, client))
405 }
406}
407
408#[async_trait]
409impl LineraNet for LocalNet {
410 async fn ensure_is_running(&mut self) -> Result<()> {
411 for validator in self.running_validators.values_mut() {
412 validator.ensure_is_running().context("in local network")?;
413 }
414 Ok(())
415 }
416
417 async fn make_client(&mut self) -> ClientWrapper {
418 let client = ClientWrapper::new_with_extra_args(
419 self.path_provider.clone(),
420 self.network.external,
421 self.testing_prng_seed,
422 self.next_client_id,
423 OnClientDrop::LeakChains,
424 vec!["--wait-for-outgoing-messages".to_string()],
425 self.binary_dir.clone(),
426 );
427 if let Some(seed) = self.testing_prng_seed {
428 self.testing_prng_seed = Some(seed + 1);
429 }
430 self.next_client_id += 1;
431 client
432 }
433
434 async fn terminate(&mut self) -> Result<()> {
435 for validator in self.running_validators.values_mut() {
436 validator.terminate().await.context("in local network")?
437 }
438 Ok(())
439 }
440}
441
442impl LocalNet {
443 #[expect(clippy::too_many_arguments)]
444 fn new(
445 network: NetworkConfig,
446 testing_prng_seed: Option<u64>,
447 common_namespace: String,
448 num_initial_validators: usize,
449 num_proxies: usize,
450 num_shards: usize,
451 common_storage_config: InnerStorageConfig,
452 cross_chain_config: CrossChainConfig,
453 path_provider: PathProvider,
454 block_exporters: ExportersSetup,
455 binary_dir: Option<PathBuf>,
456 ) -> Self {
457 Self {
458 network,
459 testing_prng_seed,
460 next_client_id: 0,
461 num_initial_validators,
462 num_proxies,
463 num_shards,
464 validator_keys: BTreeMap::new(),
465 running_validators: BTreeMap::new(),
466 initialized_validator_storages: BTreeMap::new(),
467 common_namespace,
468 common_storage_config,
469 cross_chain_config,
470 path_provider,
471 block_exporters,
472 binary_dir,
473 }
474 }
475
476 async fn command_for_binary(&self, name: &'static str) -> Result<Command> {
477 let path = if let Some(dir) = &self.binary_dir {
478 dir.join(name)
479 } else {
480 resolve_binary(name, env!("CARGO_PKG_NAME")).await?
481 };
482 let mut command = Command::new(path);
483 command.current_dir(self.path_provider.path());
484 Ok(command)
485 }
486
487 #[cfg(with_testing)]
488 pub fn genesis_config(&self) -> Result<linera_client::config::GenesisConfig> {
489 let path = self.path_provider.path();
490 crate::util::read_json(path.join("genesis.json"))
491 }
492
493 fn shard_port(&self, validator: usize, shard: usize) -> usize {
494 test_offset_port() + validator * self.num_shards + shard + 1
495 }
496
497 fn proxy_internal_port(&self, validator: usize, proxy_id: usize) -> usize {
498 test_offset_port() + 1000 + validator * self.num_proxies + proxy_id + 1
499 }
500
501 fn shard_metrics_port(&self, validator: usize, shard: usize) -> usize {
502 test_offset_port() + 2000 + validator * self.num_shards + shard + 1
503 }
504
505 fn proxy_metrics_port(&self, validator: usize, proxy_id: usize) -> usize {
506 test_offset_port() + 3000 + validator * self.num_proxies + proxy_id + 1
507 }
508
509 fn block_exporter_port(&self, validator: usize, exporter_id: usize) -> usize {
510 test_offset_port() + 3000 + validator * self.num_shards + exporter_id + 1
511 }
512
513 pub fn proxy_public_port(&self, validator: usize, proxy_id: usize) -> usize {
514 test_offset_port() + 4000 + validator * self.num_proxies + proxy_id + 1
515 }
516
517 pub fn first_public_port() -> usize {
518 test_offset_port() + 4000 + 1
519 }
520
521 fn block_exporter_metrics_port(exporter_id: usize) -> usize {
522 test_offset_port() + 4000 + exporter_id + 1
523 }
524
525 fn configuration_string(&self, server_number: usize) -> Result<String> {
526 let n = server_number;
527 let path = self
528 .path_provider
529 .path()
530 .join(format!("validator_{n}.toml"));
531 let port = self.proxy_public_port(n, 0);
532 let external_protocol = self.network.external.toml();
533 let internal_protocol = self.network.internal.toml();
534 let external_host = self.network.external.localhost();
535 let internal_host = self.network.internal.localhost();
536 let mut content = format!(
537 r#"
538 server_config_path = "server_{n}.json"
539 host = "{external_host}"
540 port = {port}
541 external_protocol = {external_protocol}
542 internal_protocol = {internal_protocol}
543 "#
544 );
545
546 for k in 0..self.num_proxies {
547 let public_port = self.proxy_public_port(n, k);
548 let internal_port = self.proxy_internal_port(n, k);
549 let metrics_port = self.proxy_metrics_port(n, k);
550 content.push_str(&format!(
554 r#"
555 [[proxies]]
556 host = "{internal_host}"
557 public_port = {public_port}
558 private_port = {internal_port}
559 metrics_port = {metrics_port}
560 "#
561 ));
562 }
563
564 for k in 0..self.num_shards {
565 let shard_port = self.shard_port(n, k);
566 let shard_metrics_port = self.shard_metrics_port(n, k);
567 content.push_str(&format!(
568 r#"
569
570 [[shards]]
571 host = "{internal_host}"
572 port = {shard_port}
573 metrics_port = {shard_metrics_port}
574 "#
575 ));
576 }
577
578 match self.block_exporters {
579 ExportersSetup::Local(ref exporters) => {
580 for (j, exporter) in exporters.iter().enumerate() {
581 let host = Network::Grpc.localhost();
582 let port = self.block_exporter_port(n, j);
583 let config_content = format!(
584 r#"
585
586 [[block_exporters]]
587 host = "{host}"
588 port = {port}
589 "#
590 );
591
592 content.push_str(&config_content);
593 let exporter_config = self.generate_block_exporter_config(
594 n,
595 j as u32,
596 &exporter.destination_config,
597 );
598 let config_path = self
599 .path_provider
600 .path()
601 .join(format!("exporter_config_{n}:{j}.toml"));
602
603 fs_err::write(&config_path, &exporter_config)?;
604 }
605 }
606 ExportersSetup::Remote(ref exporters) => {
607 for exporter in exporters {
608 let host = exporter.host.clone();
609 let port = exporter.port;
610 let config_content = format!(
611 r#"
612
613 [[block_exporters]]
614 host = "{host}"
615 port = {port}
616 "#
617 );
618
619 content.push_str(&config_content);
620 }
621 }
622 }
623
624 fs_err::write(&path, content)?;
625 path.into_os_string().into_string().map_err(|error| {
626 anyhow!(
627 "could not parse OS string into string: {}",
628 error.to_string_lossy()
629 )
630 })
631 }
632
633 fn generate_block_exporter_config(
634 &self,
635 validator: usize,
636 exporter_id: u32,
637 destination_config: &DestinationConfig,
638 ) -> String {
639 let n = validator;
640 let host = Network::Grpc.localhost();
641 let port = self.block_exporter_port(n, exporter_id as usize);
642 let metrics_port = Self::block_exporter_metrics_port(exporter_id as usize);
643 let mut config = format!(
644 r#"
645 id = {exporter_id}
646
647 metrics_port = {metrics_port}
648
649 [service_config]
650 host = "{host}"
651 port = {port}
652
653 "#
654 );
655
656 let DestinationConfig {
657 destinations,
658 committee_destination,
659 } = destination_config;
660
661 if *committee_destination {
662 let destination_string_to_push = r#"
663
664 [destination_config]
665 committee_destination = true
666 "#
667 .to_string();
668
669 config.push_str(&destination_string_to_push);
670 }
671
672 for destination in destinations {
673 let destination_string_to_push = match destination {
674 Destination::Indexer {
675 tls,
676 endpoint,
677 port,
678 } => {
679 let tls = match tls {
680 TlsConfig::ClearText => "ClearText",
681 TlsConfig::Tls => "Tls",
682 };
683 format!(
684 r#"
685 [[destination_config.destinations]]
686 tls = "{tls}"
687 endpoint = "{endpoint}"
688 port = {port}
689 kind = "Indexer"
690 "#
691 )
692 }
693 Destination::Validator { endpoint, port } => {
694 format!(
695 r#"
696 [[destination_config.destinations]]
697 endpoint = "{endpoint}"
698 port = {port}
699 kind = "Validator"
700 "#
701 )
702 }
703 Destination::Logging { file_name } => {
704 format!(
705 r#"
706 [[destination_config.destinations]]
707 file_name = "{file_name}"
708 kind = "Logging"
709 "#
710 )
711 }
712 };
713
714 config.push_str(&destination_string_to_push);
715 }
716
717 config
718 }
719
720 async fn generate_initial_validator_config(&mut self) -> Result<()> {
721 let mut command = self.command_for_binary("linera-server").await?;
722 command.arg("generate");
723 if let Some(seed) = self.testing_prng_seed {
724 command.arg("--testing-prng-seed").arg(seed.to_string());
725 self.testing_prng_seed = Some(seed + 1);
726 }
727 command.arg("--validators");
728 for i in 0..self.num_initial_validators {
729 command.arg(&self.configuration_string(i)?);
730 }
731 let output = command
732 .args(["--committee", "committee.json"])
733 .spawn_and_wait_for_stdout()
734 .await?;
735 self.validator_keys = output
736 .split_whitespace()
737 .map(str::to_string)
738 .map(|keys| keys.split(',').map(str::to_string).collect::<Vec<_>>())
739 .enumerate()
740 .map(|(i, keys)| {
741 let validator_key = keys[0].to_string();
742 let account_key = keys[1].to_string();
743 (i, (validator_key, account_key))
744 })
745 .collect();
746 Ok(())
747 }
748
749 async fn run_proxy(&self, validator: usize, proxy_id: usize) -> Result<Child> {
750 let storage = self
751 .initialized_validator_storages
752 .get(&validator)
753 .expect("initialized storage");
754 let child = self
755 .command_for_binary("linera-proxy")
756 .await?
757 .arg(format!("server_{}.json", validator))
758 .args(["--storage", &storage.to_string()])
759 .args(["--id", &proxy_id.to_string()])
760 .spawn_into()?;
761
762 let port = self.proxy_public_port(validator, proxy_id);
763 let nickname = format!("validator proxy {validator}");
764 match self.network.external {
765 Network::Grpc => {
766 Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
767 let nickname = format!("validator proxy {validator}");
768 Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
769 }
770 Network::Grpcs => {
771 let nickname = format!("validator proxy {validator}");
772 Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
773 }
774 Network::Tcp => {
775 Self::ensure_simple_server_has_started(&nickname, port, "tcp").await?;
776 }
777 Network::Udp => {
778 Self::ensure_simple_server_has_started(&nickname, port, "udp").await?;
779 }
780 }
781 Ok(child)
782 }
783
784 async fn run_exporter(&self, validator: usize, exporter_id: u32) -> Result<Child> {
785 let config_path = format!("exporter_config_{validator}:{exporter_id}.toml");
786 let storage = self
787 .initialized_validator_storages
788 .get(&validator)
789 .expect("initialized storage");
790
791 tracing::debug!(config=?config_path, storage=?storage.to_string(), "starting block exporter");
792
793 let child = self
794 .command_for_binary("linera-exporter")
795 .await?
796 .args(["run", "--config-path", &config_path])
797 .args(["--storage", &storage.to_string()])
798 .spawn_into()?;
799
800 match self.network.internal {
801 Network::Grpc => {
802 let port = self.block_exporter_port(validator, exporter_id as usize);
803 let nickname = format!("block exporter {validator}:{exporter_id}");
804 Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
805 }
806 Network::Grpcs => {
807 let port = self.block_exporter_port(validator, exporter_id as usize);
808 let nickname = format!("block exporter {validator}:{exporter_id}");
809 Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
810 }
811 Network::Tcp | Network::Udp => {
812 unreachable!("Only allowed options are grpc and grpcs")
813 }
814 }
815
816 tracing::info!("block exporter started {validator}:{exporter_id}");
817
818 Ok(child)
819 }
820
821 pub async fn ensure_grpc_server_has_started(
822 nickname: &str,
823 port: usize,
824 scheme: &str,
825 ) -> Result<()> {
826 let endpoint = match scheme {
827 "http" => Endpoint::new(format!("http://localhost:{port}"))
828 .context("endpoint should always parse")?,
829 "https" => {
830 use linera_rpc::CERT_PEM;
831 let certificate = tonic::transport::Certificate::from_pem(CERT_PEM);
832 let tls_config = ClientTlsConfig::new().ca_certificate(certificate);
833 Endpoint::new(format!("https://localhost:{port}"))
834 .context("endpoint should always parse")?
835 .tls_config(tls_config)?
836 }
837 _ => bail!("Only supported scheme are http and https"),
838 };
839 let connection = endpoint.connect_lazy();
840 let mut client = HealthClient::new(connection);
841 linera_base::time::timer::sleep(Duration::from_millis(100)).await;
842 for i in 0..10 {
843 linera_base::time::timer::sleep(Duration::from_millis(i * 500)).await;
844 let result = client.check(HealthCheckRequest::default()).await;
845 if result.is_ok() && result.unwrap().get_ref().status() == ServingStatus::Serving {
846 info!(?port, "Successfully started {nickname}");
847 return Ok(());
848 } else {
849 warn!("Waiting for {nickname} to start");
850 }
851 }
852 bail!("Failed to start {nickname}");
853 }
854
855 async fn ensure_simple_server_has_started(
856 nickname: &str,
857 port: usize,
858 protocol: &str,
859 ) -> Result<()> {
860 use linera_core::node::ValidatorNode as _;
861
862 let options = linera_rpc::NodeOptions {
863 send_timeout: Duration::from_secs(5),
864 recv_timeout: Duration::from_secs(5),
865 retry_delay: Duration::from_secs(1),
866 max_retries: 1,
867 ..Default::default()
868 };
869 let provider = linera_rpc::simple::SimpleNodeProvider::new(options);
870 let address = format!("{protocol}:127.0.0.1:{port}");
871 let node = provider.make_node(&address)?;
874 linera_base::time::timer::sleep(Duration::from_millis(100)).await;
875 for i in 0..10 {
876 linera_base::time::timer::sleep(Duration::from_millis(i * 500)).await;
877 let result = node.get_version_info().await;
878 if result.is_ok() {
879 info!("Successfully started {nickname}");
880 return Ok(());
881 } else {
882 warn!("Waiting for {nickname} to start");
883 }
884 }
885 bail!("Failed to start {nickname}");
886 }
887
888 async fn initialize_storage(&mut self, validator: usize) -> Result<()> {
889 let namespace = format!("{}_server_{}_db", self.common_namespace, validator);
890 let inner_storage_config = self.common_storage_config.clone();
891 let storage = StorageConfig {
892 inner_storage_config,
893 namespace,
894 };
895 let mut command = self.command_for_binary("linera").await?;
896 if let Ok(var) = env::var(SERVER_ENV) {
897 command.args(var.split_whitespace());
898 }
899 command.args(["storage", "initialize"]);
900 command
901 .args(["--storage", &storage.to_string()])
902 .args(["--genesis", "genesis.json"])
903 .spawn_and_wait_for_stdout()
904 .await?;
905
906 self.initialized_validator_storages
907 .insert(validator, storage);
908 Ok(())
909 }
910
911 async fn run_server(&self, validator: usize, shard: usize) -> Result<Child> {
912 let mut storage = self
913 .initialized_validator_storages
914 .get(&validator)
915 .expect("initialized storage")
916 .clone();
917
918 storage.maybe_append_shard_path(shard)?;
921
922 let mut command = self.command_for_binary("linera-server").await?;
923 if let Ok(var) = env::var(SERVER_ENV) {
924 command.args(var.split_whitespace());
925 }
926 command
927 .arg("run")
928 .args(["--storage", &storage.to_string()])
929 .args(["--server", &format!("server_{}.json", validator)])
930 .args(["--shard", &shard.to_string()])
931 .args(self.cross_chain_config.to_args());
932 let child = command.spawn_into()?;
933
934 let port = self.shard_port(validator, shard);
935 let nickname = format!("validator server {validator}:{shard}");
936 match self.network.internal {
937 Network::Grpc => {
938 Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
939 }
940 Network::Grpcs => {
941 Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
942 }
943 Network::Tcp => {
944 Self::ensure_simple_server_has_started(&nickname, port, "tcp").await?;
945 }
946 Network::Udp => {
947 Self::ensure_simple_server_has_started(&nickname, port, "udp").await?;
948 }
949 }
950 Ok(child)
951 }
952
953 async fn run(&mut self) -> Result<()> {
954 for validator in 0..self.num_initial_validators {
955 self.start_validator(validator).await?;
956 }
957 Ok(())
958 }
959
960 pub async fn start_validator(&mut self, index: usize) -> Result<()> {
962 self.initialize_storage(index).await?;
963 self.restart_validator(index).await
964 }
965
966 pub async fn restart_validator(&mut self, index: usize) -> Result<()> {
969 let mut validator = Validator::new();
970 for k in 0..self.num_proxies {
971 let proxy = self.run_proxy(index, k).await?;
972 validator.add_proxy(proxy);
973 }
974 for shard in 0..self.num_shards {
975 let server = self.run_server(index, shard).await?;
976 validator.add_server(server);
977 }
978 if let ExportersSetup::Local(ref exporters) = self.block_exporters {
979 for block_exporter in 0..exporters.len() {
980 let exporter = self.run_exporter(index, block_exporter as u32).await?;
981 validator.add_block_exporter(exporter);
982 }
983 }
984
985 self.running_validators.insert(index, validator);
986 Ok(())
987 }
988
989 pub async fn stop_validator(&mut self, index: usize) -> Result<()> {
991 if let Some(mut validator) = self.running_validators.remove(&index) {
992 if let Err(error) = validator.terminate().await {
993 error!("Failed to stop validator {index}: {error}");
994 return Err(error);
995 }
996 }
997 Ok(())
998 }
999
1000 pub fn validator_client(&mut self, validator: usize) -> Result<linera_rpc::Client> {
1002 let node_provider = linera_rpc::NodeProvider::new(linera_rpc::NodeOptions {
1003 send_timeout: Duration::from_secs(1),
1004 recv_timeout: Duration::from_secs(1),
1005 retry_delay: Duration::ZERO,
1006 max_retries: 0,
1007 ..Default::default()
1008 });
1009
1010 Ok(node_provider.make_node(&self.validator_address(validator))?)
1011 }
1012
1013 pub fn validator_address(&self, validator: usize) -> String {
1016 let port = self.proxy_public_port(validator, 0);
1017 let schema = self.network.external.schema();
1018
1019 format!("{schema}:localhost:{port}")
1020 }
1021}
1022
1023#[cfg(with_testing)]
1024impl LocalNet {
1025 pub fn validator_keys(&self, validator: usize) -> Option<&(String, String)> {
1027 self.validator_keys.get(&validator)
1028 }
1029
1030 pub async fn generate_validator_config(&mut self, validator: usize) -> Result<()> {
1031 let stdout = self
1032 .command_for_binary("linera-server")
1033 .await?
1034 .arg("generate")
1035 .arg("--validators")
1036 .arg(&self.configuration_string(validator)?)
1037 .spawn_and_wait_for_stdout()
1038 .await?;
1039 let keys = stdout
1040 .trim()
1041 .split(',')
1042 .map(str::to_string)
1043 .collect::<Vec<_>>();
1044 self.validator_keys
1045 .insert(validator, (keys[0].clone(), keys[1].clone()));
1046 Ok(())
1047 }
1048
1049 pub async fn terminate_server(&mut self, validator: usize, shard: usize) -> Result<()> {
1050 self.running_validators
1051 .get_mut(&validator)
1052 .context("server not found")?
1053 .terminate_server(shard)
1054 .await?;
1055 Ok(())
1056 }
1057
1058 pub fn remove_validator(&mut self, validator: usize) -> Result<()> {
1059 self.running_validators
1060 .remove(&validator)
1061 .context("validator not found")?;
1062 Ok(())
1063 }
1064
1065 pub async fn start_server(&mut self, validator: usize, shard: usize) -> Result<()> {
1066 let server = self.run_server(validator, shard).await?;
1067 self.running_validators
1068 .get_mut(&validator)
1069 .context("could not find validator")?
1070 .add_server(server);
1071 Ok(())
1072 }
1073}