1#[cfg(with_testing)]
5use std::sync::LazyLock;
6use std::{
7 collections::BTreeMap,
8 env,
9 num::NonZeroU16,
10 path::{Path, PathBuf},
11 sync::Arc,
12 time::Duration,
13};
14
15use anyhow::{anyhow, bail, ensure, Context, Result};
16#[cfg(with_testing)]
17use async_lock::RwLock;
18use async_trait::async_trait;
19use linera_base::{
20 command::{resolve_binary, CommandExt},
21 data_types::Amount,
22};
23use linera_client::client_options::ResourceControlPolicyConfig;
24use linera_core::node::ValidatorNodeProvider;
25use linera_rpc::config::{CrossChainConfig, ExporterServiceConfig, TlsConfig};
26#[cfg(all(feature = "storage-service", with_testing))]
27use linera_storage_service::common::storage_service_test_endpoint;
28#[cfg(all(feature = "rocksdb", feature = "scylladb", with_testing))]
29use linera_views::rocks_db::{RocksDbDatabase, RocksDbSpawnMode};
30#[cfg(all(feature = "scylladb", with_testing))]
31use linera_views::{scylla_db::ScyllaDbDatabase, store::TestKeyValueDatabase as _};
32use tempfile::{tempdir, TempDir};
33use tokio::process::{Child, Command};
34use tonic::transport::{channel::ClientTlsConfig, Endpoint};
35use tonic_health::pb::{
36 health_check_response::ServingStatus, health_client::HealthClient, HealthCheckRequest,
37};
38use tracing::{error, info, warn};
39
40use crate::{
41 cli_wrappers::{
42 ClientWrapper, LineraNet, LineraNetConfig, Network, NetworkConfig, OnClientDrop,
43 },
44 config::{BlockExporterConfig, Destination, DestinationConfig},
45 storage::{InnerStorageConfig, StorageConfig},
46 util::ChildExt,
47};
48
49const MAX_NUMBER_SHARDS: usize = 1000;
51
52pub enum ProcessInbox {
53 Skip,
54 Automatic,
55}
56
57#[cfg(with_testing)]
58static PORT_PROVIDER: LazyLock<RwLock<u16>> = LazyLock::new(|| RwLock::new(7080));
59
60fn test_offset_port() -> usize {
62 std::env::var("TEST_OFFSET_PORT")
63 .ok()
64 .and_then(|port_str| port_str.parse::<usize>().ok())
65 .unwrap_or(9000)
66}
67
68#[cfg(with_testing)]
70pub async fn get_node_port() -> u16 {
71 let mut port = PORT_PROVIDER.write().await;
72 let port_ret = *port;
73 *port += 1;
74 info!("get_node_port returning port_ret={}", port_ret);
75 assert!(port_selector::is_free(port_ret));
76 port_ret
77}
78
79#[cfg(with_testing)]
80async fn make_testing_config(database: Database) -> Result<InnerStorageConfig> {
81 match database {
82 Database::Service => {
83 #[cfg(feature = "storage-service")]
84 {
85 let endpoint = storage_service_test_endpoint()
86 .expect("Reading LINERA_STORAGE_SERVICE environment variable");
87 Ok(InnerStorageConfig::Service { endpoint })
88 }
89 #[cfg(not(feature = "storage-service"))]
90 panic!("Database::Service is selected without the feature storage_service");
91 }
92 Database::DynamoDb => {
93 #[cfg(feature = "dynamodb")]
94 {
95 let use_dynamodb_local = true;
96 Ok(InnerStorageConfig::DynamoDb { use_dynamodb_local })
97 }
98 #[cfg(not(feature = "dynamodb"))]
99 panic!("Database::DynamoDb is selected without the feature dynamodb");
100 }
101 Database::ScyllaDb => {
102 #[cfg(feature = "scylladb")]
103 {
104 let config = ScyllaDbDatabase::new_test_config().await?;
105 Ok(InnerStorageConfig::ScyllaDb {
106 uri: config.inner_config.uri,
107 })
108 }
109 #[cfg(not(feature = "scylladb"))]
110 panic!("Database::ScyllaDb is selected without the feature scylladb");
111 }
112 Database::DualRocksDbScyllaDb => {
113 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
114 {
115 let rocksdb_config = RocksDbDatabase::new_test_config().await?;
116 let scylla_config = ScyllaDbDatabase::new_test_config().await?;
117 let spawn_mode = RocksDbSpawnMode::get_spawn_mode_from_runtime();
118 Ok(InnerStorageConfig::DualRocksDbScyllaDb {
119 path_with_guard: rocksdb_config.inner_config.path_with_guard,
120 spawn_mode,
121 uri: scylla_config.inner_config.uri,
122 })
123 }
124 #[cfg(not(all(feature = "rocksdb", feature = "scylladb")))]
125 panic!("Database::DualRocksDbScyllaDb is selected without the features rocksdb and scylladb");
126 }
127 }
128}
129
130pub enum InnerStorageConfigBuilder {
131 #[cfg(with_testing)]
132 TestConfig,
133 ExistingConfig {
134 storage_config: InnerStorageConfig,
135 },
136}
137
138impl InnerStorageConfigBuilder {
139 #[cfg_attr(not(with_testing), expect(unused_variables))]
140 pub async fn build(self, database: Database) -> Result<InnerStorageConfig> {
141 match self {
142 #[cfg(with_testing)]
143 InnerStorageConfigBuilder::TestConfig => make_testing_config(database).await,
144 InnerStorageConfigBuilder::ExistingConfig { storage_config } => Ok(storage_config),
145 }
146 }
147}
148
149#[derive(Clone)]
152pub enum PathProvider {
153 ExternalPath { path_buf: PathBuf },
154 TemporaryDirectory { tmp_dir: Arc<TempDir> },
155}
156
157impl PathProvider {
158 pub fn path(&self) -> &Path {
159 match self {
160 PathProvider::ExternalPath { path_buf } => path_buf.as_path(),
161 PathProvider::TemporaryDirectory { tmp_dir } => tmp_dir.path(),
162 }
163 }
164
165 pub fn create_temporary_directory() -> Result<Self> {
166 let tmp_dir = Arc::new(tempdir()?);
167 Ok(PathProvider::TemporaryDirectory { tmp_dir })
168 }
169
170 pub fn from_path_option(path: &Option<String>) -> anyhow::Result<Self> {
171 Ok(match path {
172 None => {
173 let tmp_dir = Arc::new(tempfile::tempdir()?);
174 PathProvider::TemporaryDirectory { tmp_dir }
175 }
176 Some(path) => {
177 let path = Path::new(path);
178 let path_buf = path.to_path_buf();
179 PathProvider::ExternalPath { path_buf }
180 }
181 })
182 }
183}
184
185pub struct LocalNetConfig {
187 pub database: Database,
188 pub network: NetworkConfig,
189 pub testing_prng_seed: Option<u64>,
190 pub namespace: String,
191 pub num_other_initial_chains: u32,
192 pub initial_amount: Amount,
193 pub num_initial_validators: usize,
194 pub num_shards: usize,
195 pub num_proxies: usize,
196 pub policy_config: ResourceControlPolicyConfig,
197 pub cross_chain_config: CrossChainConfig,
198 pub storage_config_builder: InnerStorageConfigBuilder,
199 pub path_provider: PathProvider,
200 pub block_exporters: ExportersSetup,
201}
202
203#[derive(Clone, PartialEq)]
205pub enum ExportersSetup {
206 Local(Vec<BlockExporterConfig>),
208 Remote(Vec<ExporterServiceConfig>),
210}
211
212impl ExportersSetup {
213 pub fn new(
214 with_block_exporter: bool,
215 block_exporter_address: String,
216 block_exporter_port: NonZeroU16,
217 ) -> ExportersSetup {
218 if with_block_exporter {
219 let exporter_config =
220 ExporterServiceConfig::new(block_exporter_address, block_exporter_port.into());
221 ExportersSetup::Remote(vec![exporter_config])
222 } else {
223 ExportersSetup::Local(vec![])
224 }
225 }
226}
227
228pub struct LocalNet {
230 network: NetworkConfig,
231 testing_prng_seed: Option<u64>,
232 next_client_id: usize,
233 num_initial_validators: usize,
234 num_proxies: usize,
235 num_shards: usize,
236 validator_keys: BTreeMap<usize, (String, String)>,
237 running_validators: BTreeMap<usize, Validator>,
238 initialized_validator_storages: BTreeMap<usize, StorageConfig>,
239 common_namespace: String,
240 common_storage_config: InnerStorageConfig,
241 cross_chain_config: CrossChainConfig,
242 path_provider: PathProvider,
243 block_exporters: ExportersSetup,
244}
245
246const SERVER_ENV: &str = "LINERA_SERVER_PARAMS";
249
250#[derive(Copy, Clone, Eq, PartialEq)]
252pub enum Database {
253 Service,
254 DynamoDb,
255 ScyllaDb,
256 DualRocksDbScyllaDb,
257}
258
259struct Validator {
261 proxies: Vec<Child>,
262 servers: Vec<Child>,
263 exporters: Vec<Child>,
264}
265
266impl Validator {
267 fn new() -> Self {
268 Self {
269 proxies: vec![],
270 servers: vec![],
271 exporters: vec![],
272 }
273 }
274
275 async fn terminate(&mut self) -> Result<()> {
276 for proxy in &mut self.proxies {
277 proxy.kill().await.context("terminating validator proxy")?;
278 }
279 for server in &mut self.servers {
280 server
281 .kill()
282 .await
283 .context("terminating validator server")?;
284 }
285 Ok(())
286 }
287
288 fn add_proxy(&mut self, proxy: Child) {
289 self.proxies.push(proxy)
290 }
291
292 fn add_server(&mut self, server: Child) {
293 self.servers.push(server)
294 }
295
296 #[cfg(with_testing)]
297 async fn terminate_server(&mut self, index: usize) -> Result<()> {
298 let mut server = self.servers.remove(index);
299 server
300 .kill()
301 .await
302 .context("terminating validator server")?;
303 Ok(())
304 }
305
306 fn add_block_exporter(&mut self, exporter: Child) {
307 self.exporters.push(exporter);
308 }
309
310 fn ensure_is_running(&mut self) -> Result<()> {
311 for proxy in &mut self.proxies {
312 proxy.ensure_is_running()?;
313 }
314 for child in &mut self.servers {
315 child.ensure_is_running()?;
316 }
317 for exporter in &mut self.exporters {
318 exporter.ensure_is_running()?;
319 }
320 Ok(())
321 }
322}
323
324#[cfg(with_testing)]
325impl LocalNetConfig {
326 pub fn new_test(database: Database, network: Network) -> Self {
327 let num_shards = 4;
328 let num_proxies = 1;
329 let storage_config_builder = InnerStorageConfigBuilder::TestConfig;
330 let path_provider = PathProvider::create_temporary_directory().unwrap();
331 let internal = network.drop_tls();
332 let external = network;
333 let network = NetworkConfig { internal, external };
334 let cross_chain_config = CrossChainConfig::default();
335 Self {
336 database,
337 network,
338 num_other_initial_chains: 2,
339 initial_amount: Amount::from_tokens(1_000_000),
340 policy_config: ResourceControlPolicyConfig::Testnet,
341 cross_chain_config,
342 testing_prng_seed: Some(37),
343 namespace: linera_views::random::generate_test_namespace(),
344 num_initial_validators: 4,
345 num_shards,
346 num_proxies,
347 storage_config_builder,
348 path_provider,
349 block_exporters: ExportersSetup::Local(vec![]),
350 }
351 }
352}
353
354#[async_trait]
355impl LineraNetConfig for LocalNetConfig {
356 type Net = LocalNet;
357
358 async fn instantiate(self) -> Result<(Self::Net, ClientWrapper)> {
359 let storage_config = self.storage_config_builder.build(self.database).await?;
360 let mut net = LocalNet::new(
361 self.network,
362 self.testing_prng_seed,
363 self.namespace,
364 self.num_initial_validators,
365 self.num_proxies,
366 self.num_shards,
367 storage_config,
368 self.cross_chain_config,
369 self.path_provider,
370 self.block_exporters,
371 );
372 let client = net.make_client().await;
373 ensure!(
374 self.num_initial_validators > 0,
375 "There should be at least one initial validator"
376 );
377 let total_number_shards = self.num_initial_validators * self.num_shards;
378 ensure!(
379 total_number_shards <= MAX_NUMBER_SHARDS,
380 "Total number of shards ({}) exceeds maximum allowed ({})",
381 self.num_shards,
382 MAX_NUMBER_SHARDS
383 );
384 net.generate_initial_validator_config().await?;
385 client
386 .create_genesis_config(
387 self.num_other_initial_chains,
388 self.initial_amount,
389 self.policy_config,
390 Some(vec!["localhost".to_owned()]),
391 )
392 .await?;
393 net.run().await?;
394 Ok((net, client))
395 }
396}
397
398#[async_trait]
399impl LineraNet for LocalNet {
400 async fn ensure_is_running(&mut self) -> Result<()> {
401 for validator in self.running_validators.values_mut() {
402 validator.ensure_is_running().context("in local network")?;
403 }
404 Ok(())
405 }
406
407 async fn make_client(&mut self) -> ClientWrapper {
408 let client = ClientWrapper::new(
409 self.path_provider.clone(),
410 self.network.external,
411 self.testing_prng_seed,
412 self.next_client_id,
413 OnClientDrop::LeakChains,
414 );
415 if let Some(seed) = self.testing_prng_seed {
416 self.testing_prng_seed = Some(seed + 1);
417 }
418 self.next_client_id += 1;
419 client
420 }
421
422 async fn terminate(&mut self) -> Result<()> {
423 for validator in self.running_validators.values_mut() {
424 validator.terminate().await.context("in local network")?
425 }
426 Ok(())
427 }
428}
429
430impl LocalNet {
431 #[expect(clippy::too_many_arguments)]
432 fn new(
433 network: NetworkConfig,
434 testing_prng_seed: Option<u64>,
435 common_namespace: String,
436 num_initial_validators: usize,
437 num_proxies: usize,
438 num_shards: usize,
439 common_storage_config: InnerStorageConfig,
440 cross_chain_config: CrossChainConfig,
441 path_provider: PathProvider,
442 block_exporters: ExportersSetup,
443 ) -> Self {
444 Self {
445 network,
446 testing_prng_seed,
447 next_client_id: 0,
448 num_initial_validators,
449 num_proxies,
450 num_shards,
451 validator_keys: BTreeMap::new(),
452 running_validators: BTreeMap::new(),
453 initialized_validator_storages: BTreeMap::new(),
454 common_namespace,
455 common_storage_config,
456 cross_chain_config,
457 path_provider,
458 block_exporters,
459 }
460 }
461
462 async fn command_for_binary(&self, name: &'static str) -> Result<Command> {
463 let path = resolve_binary(name, env!("CARGO_PKG_NAME")).await?;
464 let mut command = Command::new(path);
465 command.current_dir(self.path_provider.path());
466 Ok(command)
467 }
468
469 #[cfg(with_testing)]
470 pub fn genesis_config(&self) -> Result<linera_client::config::GenesisConfig> {
471 let path = self.path_provider.path();
472 crate::util::read_json(path.join("genesis.json"))
473 }
474
475 fn shard_port(&self, validator: usize, shard: usize) -> usize {
476 test_offset_port() + validator * self.num_shards + shard + 1
477 }
478
479 fn proxy_internal_port(&self, validator: usize, proxy_id: usize) -> usize {
480 test_offset_port() + 1000 + validator * self.num_proxies + proxy_id + 1
481 }
482
483 fn shard_metrics_port(&self, validator: usize, shard: usize) -> usize {
484 test_offset_port() + 2000 + validator * self.num_shards + shard + 1
485 }
486
487 fn proxy_metrics_port(&self, validator: usize, proxy_id: usize) -> usize {
488 test_offset_port() + 3000 + validator * self.num_proxies + proxy_id + 1
489 }
490
491 fn block_exporter_port(&self, validator: usize, exporter_id: usize) -> usize {
492 test_offset_port() + 3000 + validator * self.num_shards + exporter_id + 1
493 }
494
495 pub fn proxy_public_port(&self, validator: usize, proxy_id: usize) -> usize {
496 test_offset_port() + 4000 + validator * self.num_proxies + proxy_id + 1
497 }
498
499 pub fn first_public_port() -> usize {
500 test_offset_port() + 4000 + 1
501 }
502
503 fn block_exporter_metrics_port(exporter_id: usize) -> usize {
504 test_offset_port() + 4000 + exporter_id + 1
505 }
506
507 fn configuration_string(&self, server_number: usize) -> Result<String> {
508 let n = server_number;
509 let path = self
510 .path_provider
511 .path()
512 .join(format!("validator_{n}.toml"));
513 let port = self.proxy_public_port(n, 0);
514 let external_protocol = self.network.external.toml();
515 let internal_protocol = self.network.internal.toml();
516 let external_host = self.network.external.localhost();
517 let internal_host = self.network.internal.localhost();
518 let mut content = format!(
519 r#"
520 server_config_path = "server_{n}.json"
521 host = "{external_host}"
522 port = {port}
523 external_protocol = {external_protocol}
524 internal_protocol = {internal_protocol}
525 "#
526 );
527
528 for k in 0..self.num_proxies {
529 let public_port = self.proxy_public_port(n, k);
530 let internal_port = self.proxy_internal_port(n, k);
531 let metrics_port = self.proxy_metrics_port(n, k);
532 content.push_str(&format!(
536 r#"
537 [[proxies]]
538 host = "{internal_host}"
539 public_port = {public_port}
540 private_port = {internal_port}
541 metrics_port = {metrics_port}
542 "#
543 ));
544 }
545
546 for k in 0..self.num_shards {
547 let shard_port = self.shard_port(n, k);
548 let shard_metrics_port = self.shard_metrics_port(n, k);
549 content.push_str(&format!(
550 r#"
551
552 [[shards]]
553 host = "{internal_host}"
554 port = {shard_port}
555 metrics_port = {shard_metrics_port}
556 "#
557 ));
558 }
559
560 match self.block_exporters {
561 ExportersSetup::Local(ref exporters) => {
562 for (j, exporter) in exporters.iter().enumerate() {
563 let host = Network::Grpc.localhost();
564 let port = self.block_exporter_port(n, j);
565 let config_content = format!(
566 r#"
567
568 [[block_exporters]]
569 host = "{host}"
570 port = {port}
571 "#
572 );
573
574 content.push_str(&config_content);
575 let exporter_config = self.generate_block_exporter_config(
576 n,
577 j as u32,
578 &exporter.destination_config,
579 );
580 let config_path = self
581 .path_provider
582 .path()
583 .join(format!("exporter_config_{n}:{j}.toml"));
584
585 fs_err::write(&config_path, &exporter_config)?;
586 }
587 }
588 ExportersSetup::Remote(ref exporters) => {
589 for exporter in exporters {
590 let host = exporter.host.clone();
591 let port = exporter.port;
592 let config_content = format!(
593 r#"
594
595 [[block_exporters]]
596 host = "{host}"
597 port = {port}
598 "#
599 );
600
601 content.push_str(&config_content);
602 }
603 }
604 }
605
606 fs_err::write(&path, content)?;
607 path.into_os_string().into_string().map_err(|error| {
608 anyhow!(
609 "could not parse OS string into string: {}",
610 error.to_string_lossy()
611 )
612 })
613 }
614
615 fn generate_block_exporter_config(
616 &self,
617 validator: usize,
618 exporter_id: u32,
619 destination_config: &DestinationConfig,
620 ) -> String {
621 let n = validator;
622 let host = Network::Grpc.localhost();
623 let port = self.block_exporter_port(n, exporter_id as usize);
624 let metrics_port = Self::block_exporter_metrics_port(exporter_id as usize);
625 let mut config = format!(
626 r#"
627 id = {exporter_id}
628
629 metrics_port = {metrics_port}
630
631 [service_config]
632 host = "{host}"
633 port = {port}
634
635 "#
636 );
637
638 let DestinationConfig {
639 destinations,
640 committee_destination,
641 } = destination_config;
642
643 if *committee_destination {
644 let destination_string_to_push = r#"
645
646 [destination_config]
647 committee_destination = true
648 "#
649 .to_string();
650
651 config.push_str(&destination_string_to_push);
652 }
653
654 for destination in destinations {
655 let destination_string_to_push = match destination {
656 Destination::Indexer {
657 tls,
658 endpoint,
659 port,
660 } => {
661 let tls = match tls {
662 TlsConfig::ClearText => "ClearText",
663 TlsConfig::Tls => "Tls",
664 };
665 format!(
666 r#"
667 [[destination_config.destinations]]
668 tls = "{tls}"
669 endpoint = "{endpoint}"
670 port = {port}
671 kind = "Indexer"
672 "#
673 )
674 }
675 Destination::Validator { endpoint, port } => {
676 format!(
677 r#"
678 [[destination_config.destinations]]
679 endpoint = "{endpoint}"
680 port = {port}
681 kind = "Validator"
682 "#
683 )
684 }
685 Destination::Logging { file_name } => {
686 format!(
687 r#"
688 [[destination_config.destinations]]
689 file_name = "{file_name}"
690 kind = "Logging"
691 "#
692 )
693 }
694 };
695
696 config.push_str(&destination_string_to_push);
697 }
698
699 config
700 }
701
702 async fn generate_initial_validator_config(&mut self) -> Result<()> {
703 let mut command = self.command_for_binary("linera-server").await?;
704 command.arg("generate");
705 if let Some(seed) = self.testing_prng_seed {
706 command.arg("--testing-prng-seed").arg(seed.to_string());
707 self.testing_prng_seed = Some(seed + 1);
708 }
709 command.arg("--validators");
710 for i in 0..self.num_initial_validators {
711 command.arg(&self.configuration_string(i)?);
712 }
713 let output = command
714 .args(["--committee", "committee.json"])
715 .spawn_and_wait_for_stdout()
716 .await?;
717 self.validator_keys = output
718 .split_whitespace()
719 .map(str::to_string)
720 .map(|keys| keys.split(',').map(str::to_string).collect::<Vec<_>>())
721 .enumerate()
722 .map(|(i, keys)| {
723 let validator_key = keys[0].to_string();
724 let account_key = keys[1].to_string();
725 (i, (validator_key, account_key))
726 })
727 .collect();
728 Ok(())
729 }
730
731 async fn run_proxy(&mut self, validator: usize, proxy_id: usize) -> Result<Child> {
732 let storage = self
733 .initialized_validator_storages
734 .get(&validator)
735 .expect("initialized storage");
736 let child = self
737 .command_for_binary("linera-proxy")
738 .await?
739 .arg(format!("server_{}.json", validator))
740 .args(["--storage", &storage.to_string()])
741 .args(["--id", &proxy_id.to_string()])
742 .spawn_into()?;
743
744 let port = self.proxy_public_port(validator, proxy_id);
745 let nickname = format!("validator proxy {validator}");
746 match self.network.external {
747 Network::Grpc => {
748 Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
749 let nickname = format!("validator proxy {validator}");
750 Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
751 }
752 Network::Grpcs => {
753 let nickname = format!("validator proxy {validator}");
754 Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
755 }
756 Network::Tcp => {
757 Self::ensure_simple_server_has_started(&nickname, port, "tcp").await?;
758 }
759 Network::Udp => {
760 Self::ensure_simple_server_has_started(&nickname, port, "udp").await?;
761 }
762 }
763 Ok(child)
764 }
765
766 async fn run_exporter(&mut self, validator: usize, exporter_id: u32) -> Result<Child> {
767 let config_path = format!("exporter_config_{validator}:{exporter_id}.toml");
768 let storage = self
769 .initialized_validator_storages
770 .get(&validator)
771 .expect("initialized storage");
772
773 tracing::debug!(config=?config_path, storage=?storage.to_string(), "starting block exporter");
774
775 let child = self
776 .command_for_binary("linera-exporter")
777 .await?
778 .args(["--config-path", &config_path])
779 .args(["--storage", &storage.to_string()])
780 .spawn_into()?;
781
782 match self.network.internal {
783 Network::Grpc => {
784 let port = self.block_exporter_port(validator, exporter_id as usize);
785 let nickname = format!("block exporter {validator}:{exporter_id}");
786 Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
787 }
788 Network::Grpcs => {
789 let port = self.block_exporter_port(validator, exporter_id as usize);
790 let nickname = format!("block exporter {validator}:{exporter_id}");
791 Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
792 }
793 Network::Tcp | Network::Udp => {
794 unreachable!("Only allowed options are grpc and grpcs")
795 }
796 }
797
798 tracing::info!("block exporter started {validator}:{exporter_id}");
799
800 Ok(child)
801 }
802
803 pub async fn ensure_grpc_server_has_started(
804 nickname: &str,
805 port: usize,
806 scheme: &str,
807 ) -> Result<()> {
808 let endpoint = match scheme {
809 "http" => Endpoint::new(format!("http://localhost:{port}"))
810 .context("endpoint should always parse")?,
811 "https" => {
812 use linera_rpc::CERT_PEM;
813 let certificate = tonic::transport::Certificate::from_pem(CERT_PEM);
814 let tls_config = ClientTlsConfig::new().ca_certificate(certificate);
815 Endpoint::new(format!("https://localhost:{port}"))
816 .context("endpoint should always parse")?
817 .tls_config(tls_config)?
818 }
819 _ => bail!("Only supported scheme are http and https"),
820 };
821 let connection = endpoint.connect_lazy();
822 let mut client = HealthClient::new(connection);
823 linera_base::time::timer::sleep(Duration::from_millis(100)).await;
824 for i in 0..10 {
825 linera_base::time::timer::sleep(Duration::from_millis(i * 500)).await;
826 let result = client.check(HealthCheckRequest::default()).await;
827 if result.is_ok() && result.unwrap().get_ref().status() == ServingStatus::Serving {
828 info!(?port, "Successfully started {nickname}");
829 return Ok(());
830 } else {
831 warn!("Waiting for {nickname} to start");
832 }
833 }
834 bail!("Failed to start {nickname}");
835 }
836
837 async fn ensure_simple_server_has_started(
838 nickname: &str,
839 port: usize,
840 protocol: &str,
841 ) -> Result<()> {
842 use linera_core::node::ValidatorNode as _;
843
844 let options = linera_rpc::NodeOptions {
845 send_timeout: Duration::from_secs(5),
846 recv_timeout: Duration::from_secs(5),
847 retry_delay: Duration::from_secs(1),
848 max_retries: 1,
849 };
850 let provider = linera_rpc::simple::SimpleNodeProvider::new(options);
851 let address = format!("{protocol}:127.0.0.1:{port}");
852 let node = provider.make_node(&address)?;
855 linera_base::time::timer::sleep(Duration::from_millis(100)).await;
856 for i in 0..10 {
857 linera_base::time::timer::sleep(Duration::from_millis(i * 500)).await;
858 let result = node.get_version_info().await;
859 if result.is_ok() {
860 info!("Successfully started {nickname}");
861 return Ok(());
862 } else {
863 warn!("Waiting for {nickname} to start");
864 }
865 }
866 bail!("Failed to start {nickname}");
867 }
868
869 async fn initialize_storage(&mut self, validator: usize) -> Result<()> {
870 let namespace = format!("{}_server_{}_db", self.common_namespace, validator);
871 let inner_storage_config = self.common_storage_config.clone();
872 let storage = StorageConfig {
873 inner_storage_config,
874 namespace,
875 };
876 let mut command = self.command_for_binary("linera").await?;
877 if let Ok(var) = env::var(SERVER_ENV) {
878 command.args(var.split_whitespace());
879 }
880 command.args(["storage", "initialize"]);
881 command
882 .args(["--storage", &storage.to_string()])
883 .args(["--genesis", "genesis.json"])
884 .spawn_and_wait_for_stdout()
885 .await?;
886
887 self.initialized_validator_storages
888 .insert(validator, storage);
889 Ok(())
890 }
891
892 async fn run_server(&mut self, validator: usize, shard: usize) -> Result<Child> {
893 let mut storage = self
894 .initialized_validator_storages
895 .get(&validator)
896 .expect("initialized storage")
897 .clone();
898
899 storage.maybe_append_shard_path(shard)?;
902
903 let mut command = self.command_for_binary("linera-server").await?;
904 if let Ok(var) = env::var(SERVER_ENV) {
905 command.args(var.split_whitespace());
906 }
907 command
908 .arg("run")
909 .args(["--storage", &storage.to_string()])
910 .args(["--server", &format!("server_{}.json", validator)])
911 .args(["--shard", &shard.to_string()])
912 .args(self.cross_chain_config.to_args());
913 let child = command.spawn_into()?;
914
915 let port = self.shard_port(validator, shard);
916 let nickname = format!("validator server {validator}:{shard}");
917 match self.network.internal {
918 Network::Grpc => {
919 Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
920 }
921 Network::Grpcs => {
922 Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
923 }
924 Network::Tcp => {
925 Self::ensure_simple_server_has_started(&nickname, port, "tcp").await?;
926 }
927 Network::Udp => {
928 Self::ensure_simple_server_has_started(&nickname, port, "udp").await?;
929 }
930 }
931 Ok(child)
932 }
933
934 async fn run(&mut self) -> Result<()> {
935 for validator in 0..self.num_initial_validators {
936 self.start_validator(validator).await?;
937 }
938 Ok(())
939 }
940
941 pub async fn start_validator(&mut self, index: usize) -> Result<()> {
943 self.initialize_storage(index).await?;
944 self.restart_validator(index).await
945 }
946
947 pub async fn restart_validator(&mut self, index: usize) -> Result<()> {
950 let mut validator = Validator::new();
951 for k in 0..self.num_proxies {
952 let proxy = self.run_proxy(index, k).await?;
953 validator.add_proxy(proxy);
954 }
955 for shard in 0..self.num_shards {
956 let server = self.run_server(index, shard).await?;
957 validator.add_server(server);
958 }
959 if let ExportersSetup::Local(ref exporters) = self.block_exporters {
960 for block_exporter in 0..exporters.len() {
961 let exporter = self.run_exporter(index, block_exporter as u32).await?;
962 validator.add_block_exporter(exporter);
963 }
964 }
965
966 self.running_validators.insert(index, validator);
967 Ok(())
968 }
969
970 pub async fn stop_validator(&mut self, index: usize) -> Result<()> {
972 if let Some(mut validator) = self.running_validators.remove(&index) {
973 if let Err(error) = validator.terminate().await {
974 error!("Failed to stop validator {index}: {error}");
975 return Err(error);
976 }
977 }
978 Ok(())
979 }
980
981 pub fn validator_client(&mut self, validator: usize) -> Result<linera_rpc::Client> {
983 let node_provider = linera_rpc::NodeProvider::new(linera_rpc::NodeOptions {
984 send_timeout: Duration::from_secs(1),
985 recv_timeout: Duration::from_secs(1),
986 retry_delay: Duration::ZERO,
987 max_retries: 0,
988 });
989
990 Ok(node_provider.make_node(&self.validator_address(validator))?)
991 }
992
993 pub fn validator_address(&self, validator: usize) -> String {
996 let port = self.proxy_public_port(validator, 0);
997 let schema = self.network.external.schema();
998
999 format!("{schema}:localhost:{port}")
1000 }
1001}
1002
1003#[cfg(with_testing)]
1004impl LocalNet {
1005 pub fn validator_keys(&self, validator: usize) -> Option<&(String, String)> {
1007 self.validator_keys.get(&validator)
1008 }
1009
1010 pub async fn generate_validator_config(&mut self, validator: usize) -> Result<()> {
1011 let stdout = self
1012 .command_for_binary("linera-server")
1013 .await?
1014 .arg("generate")
1015 .arg("--validators")
1016 .arg(&self.configuration_string(validator)?)
1017 .spawn_and_wait_for_stdout()
1018 .await?;
1019 let keys = stdout
1020 .trim()
1021 .split(',')
1022 .map(str::to_string)
1023 .collect::<Vec<_>>();
1024 self.validator_keys
1025 .insert(validator, (keys[0].clone(), keys[1].clone()));
1026 Ok(())
1027 }
1028
1029 pub async fn terminate_server(&mut self, validator: usize, shard: usize) -> Result<()> {
1030 self.running_validators
1031 .get_mut(&validator)
1032 .context("server not found")?
1033 .terminate_server(shard)
1034 .await?;
1035 Ok(())
1036 }
1037
1038 pub fn remove_validator(&mut self, validator: usize) -> Result<()> {
1039 self.running_validators
1040 .remove(&validator)
1041 .context("validator not found")?;
1042 Ok(())
1043 }
1044
1045 pub async fn start_server(&mut self, validator: usize, shard: usize) -> Result<()> {
1046 let server = self.run_server(validator, shard).await?;
1047 self.running_validators
1048 .get_mut(&validator)
1049 .context("could not find validator")?
1050 .add_server(server);
1051 Ok(())
1052 }
1053}