1#[cfg(with_testing)]
5use std::sync::LazyLock;
6use std::{
7 collections::BTreeMap,
8 env,
9 num::NonZeroU16,
10 path::{Path, PathBuf},
11 sync::Arc,
12 time::Duration,
13};
14
15use anyhow::{anyhow, bail, ensure, Context, Result};
16#[cfg(with_testing)]
17use async_lock::RwLock;
18use async_trait::async_trait;
19use linera_base::{
20 command::{resolve_binary, CommandExt},
21 data_types::Amount,
22};
23use linera_client::client_options::ResourceControlPolicyConfig;
24use linera_core::node::ValidatorNodeProvider;
25use linera_exporter::config::{BlockExporterConfig, Destination, DestinationConfig};
26use linera_rpc::config::{CrossChainConfig, ExporterServiceConfig, TlsConfig};
27#[cfg(all(feature = "storage-service", with_testing))]
28use linera_storage_service::common::storage_service_test_endpoint;
29#[cfg(all(feature = "rocksdb", feature = "scylladb", with_testing))]
30use linera_views::rocks_db::{RocksDbDatabase, RocksDbSpawnMode};
31#[cfg(all(feature = "scylladb", with_testing))]
32use linera_views::{scylla_db::ScyllaDbDatabase, store::TestKeyValueDatabase as _};
33use tempfile::{tempdir, TempDir};
34use tokio::process::{Child, Command};
35use tonic::transport::{channel::ClientTlsConfig, Endpoint};
36use tonic_health::pb::{
37 health_check_response::ServingStatus, health_client::HealthClient, HealthCheckRequest,
38};
39use tracing::{error, info, warn};
40
41use crate::{
42 cli_wrappers::{
43 ClientWrapper, LineraNet, LineraNetConfig, Network, NetworkConfig, OnClientDrop,
44 },
45 storage::{InnerStorageConfig, StorageConfig},
46 util::ChildExt,
47};
48
49const MAX_NUMBER_SHARDS: usize = 1000;
51
52pub enum ProcessInbox {
53 Skip,
54 Automatic,
55}
56
57#[cfg(with_testing)]
58static PORT_PROVIDER: LazyLock<RwLock<u16>> = LazyLock::new(|| RwLock::new(7080));
59
60fn test_offset_port() -> usize {
62 std::env::var("TEST_OFFSET_PORT")
63 .ok()
64 .and_then(|port_str| port_str.parse::<usize>().ok())
65 .unwrap_or(9000)
66}
67
68#[cfg(with_testing)]
70pub async fn get_node_port() -> u16 {
71 let mut port = PORT_PROVIDER.write().await;
72 let port_ret = *port;
73 *port += 1;
74 info!("get_node_port returning port_ret={}", port_ret);
75 assert!(port_selector::is_free(port_ret));
76 port_ret
77}
78
79#[cfg(with_testing)]
80async fn make_testing_config(database: Database) -> Result<InnerStorageConfig> {
81 match database {
82 Database::Service => {
83 #[cfg(feature = "storage-service")]
84 {
85 let endpoint = storage_service_test_endpoint()
86 .expect("Reading LINERA_STORAGE_SERVICE environment variable");
87 Ok(InnerStorageConfig::Service { endpoint })
88 }
89 #[cfg(not(feature = "storage-service"))]
90 panic!("Database::Service is selected without the feature storage_service");
91 }
92 Database::ScyllaDb => {
93 #[cfg(feature = "scylladb")]
94 {
95 let config = ScyllaDbDatabase::new_test_config().await?;
96 Ok(InnerStorageConfig::ScyllaDb {
97 uri: config.inner_config.uri,
98 })
99 }
100 #[cfg(not(feature = "scylladb"))]
101 panic!("Database::ScyllaDb is selected without the feature scylladb");
102 }
103 Database::DualRocksDbScyllaDb => {
104 #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
105 {
106 let rocksdb_config = RocksDbDatabase::new_test_config().await?;
107 let scylla_config = ScyllaDbDatabase::new_test_config().await?;
108 let spawn_mode = RocksDbSpawnMode::get_spawn_mode_from_runtime();
109 Ok(InnerStorageConfig::DualRocksDbScyllaDb {
110 path_with_guard: rocksdb_config.inner_config.path_with_guard,
111 spawn_mode,
112 uri: scylla_config.inner_config.uri,
113 })
114 }
115 #[cfg(not(all(feature = "rocksdb", feature = "scylladb")))]
116 panic!("Database::DualRocksDbScyllaDb is selected without the features rocksdb and scylladb");
117 }
118 }
119}
120
121pub enum InnerStorageConfigBuilder {
122 #[cfg(with_testing)]
123 TestConfig,
124 ExistingConfig {
125 storage_config: InnerStorageConfig,
126 },
127}
128
129impl InnerStorageConfigBuilder {
130 #[cfg_attr(not(with_testing), expect(unused_variables))]
131 pub async fn build(self, database: Database) -> Result<InnerStorageConfig> {
132 match self {
133 #[cfg(with_testing)]
134 InnerStorageConfigBuilder::TestConfig => make_testing_config(database).await,
135 InnerStorageConfigBuilder::ExistingConfig { storage_config } => Ok(storage_config),
136 }
137 }
138}
139
140#[derive(Clone)]
143pub enum PathProvider {
144 ExternalPath { path_buf: PathBuf },
145 TemporaryDirectory { tmp_dir: Arc<TempDir> },
146}
147
148impl PathProvider {
149 pub fn path(&self) -> &Path {
150 match self {
151 PathProvider::ExternalPath { path_buf } => path_buf.as_path(),
152 PathProvider::TemporaryDirectory { tmp_dir } => tmp_dir.path(),
153 }
154 }
155
156 pub fn create_temporary_directory() -> Result<Self> {
157 let tmp_dir = Arc::new(tempdir()?);
158 Ok(PathProvider::TemporaryDirectory { tmp_dir })
159 }
160
161 pub fn from_path_option(path: &Option<String>) -> anyhow::Result<Self> {
162 Ok(match path {
163 None => {
164 let tmp_dir = Arc::new(tempfile::tempdir()?);
165 PathProvider::TemporaryDirectory { tmp_dir }
166 }
167 Some(path) => {
168 let path = Path::new(path);
169 let path_buf = path.to_path_buf();
170 PathProvider::ExternalPath { path_buf }
171 }
172 })
173 }
174}
175
176pub struct LocalNetConfig {
178 pub database: Database,
179 pub network: NetworkConfig,
180 pub testing_prng_seed: Option<u64>,
181 pub namespace: String,
182 pub num_other_initial_chains: u32,
183 pub initial_amount: Amount,
184 pub num_initial_validators: usize,
185 pub num_shards: usize,
186 pub num_proxies: usize,
187 pub policy_config: ResourceControlPolicyConfig,
188 pub http_request_allow_list: Option<Vec<String>>,
189 pub cross_chain_config: CrossChainConfig,
190 pub storage_config_builder: InnerStorageConfigBuilder,
191 pub path_provider: PathProvider,
192 pub block_exporters: ExportersSetup,
193 pub binary_dir: Option<PathBuf>,
196}
197
198#[derive(Clone, PartialEq)]
200pub enum ExportersSetup {
201 Local(Vec<BlockExporterConfig>),
203 Remote(Vec<ExporterServiceConfig>),
205}
206
207impl ExportersSetup {
208 pub fn new(
209 with_block_exporter: bool,
210 block_exporter_address: String,
211 block_exporter_port: NonZeroU16,
212 ) -> ExportersSetup {
213 if with_block_exporter {
214 let exporter_config =
215 ExporterServiceConfig::new(block_exporter_address, block_exporter_port.into());
216 ExportersSetup::Remote(vec![exporter_config])
217 } else {
218 ExportersSetup::Local(vec![])
219 }
220 }
221}
222
223pub struct LocalNet {
225 network: NetworkConfig,
226 testing_prng_seed: Option<u64>,
227 next_client_id: usize,
228 num_initial_validators: usize,
229 num_proxies: usize,
230 num_shards: usize,
231 validator_keys: BTreeMap<usize, (String, String)>,
232 running_validators: BTreeMap<usize, Validator>,
233 initialized_validator_storages: BTreeMap<usize, StorageConfig>,
234 common_namespace: String,
235 common_storage_config: InnerStorageConfig,
236 cross_chain_config: CrossChainConfig,
237 path_provider: PathProvider,
238 block_exporters: ExportersSetup,
239 binary_dir: Option<PathBuf>,
240}
241
242const SERVER_ENV: &str = "LINERA_SERVER_PARAMS";
245
246#[derive(Copy, Clone, Eq, PartialEq)]
248pub enum Database {
249 Service,
250 ScyllaDb,
251 DualRocksDbScyllaDb,
252}
253
254struct Validator {
256 proxies: Vec<Child>,
257 servers: Vec<Child>,
258 exporters: Vec<Child>,
259}
260
261impl Validator {
262 fn new() -> Self {
263 Self {
264 proxies: vec![],
265 servers: vec![],
266 exporters: vec![],
267 }
268 }
269
270 async fn terminate(&mut self) -> Result<()> {
271 for proxy in &mut self.proxies {
272 proxy.kill().await.context("terminating validator proxy")?;
273 }
274 for server in &mut self.servers {
275 server
276 .kill()
277 .await
278 .context("terminating validator server")?;
279 }
280 Ok(())
281 }
282
283 fn add_proxy(&mut self, proxy: Child) {
284 self.proxies.push(proxy)
285 }
286
287 fn add_server(&mut self, server: Child) {
288 self.servers.push(server)
289 }
290
291 #[cfg(with_testing)]
292 async fn terminate_server(&mut self, index: usize) -> Result<()> {
293 let mut server = self.servers.remove(index);
294 server
295 .kill()
296 .await
297 .context("terminating validator server")?;
298 Ok(())
299 }
300
301 fn add_block_exporter(&mut self, exporter: Child) {
302 self.exporters.push(exporter);
303 }
304
305 fn ensure_is_running(&mut self) -> Result<()> {
306 for proxy in &mut self.proxies {
307 proxy.ensure_is_running()?;
308 }
309 for child in &mut self.servers {
310 child.ensure_is_running()?;
311 }
312 for exporter in &mut self.exporters {
313 exporter.ensure_is_running()?;
314 }
315 Ok(())
316 }
317}
318
319#[cfg(with_testing)]
320impl LocalNetConfig {
321 pub fn new_test(database: Database, network: Network) -> Self {
322 let num_shards = 4;
323 let num_proxies = 1;
324 let storage_config_builder = InnerStorageConfigBuilder::TestConfig;
325 let path_provider = PathProvider::create_temporary_directory().unwrap();
326 let internal = network.drop_tls();
327 let external = network;
328 let network = NetworkConfig { internal, external };
329 let cross_chain_config = CrossChainConfig::default();
330 Self {
331 database,
332 network,
333 num_other_initial_chains: 2,
334 initial_amount: Amount::from_tokens(1_000_000),
335 policy_config: ResourceControlPolicyConfig::Testnet,
336 cross_chain_config,
337 testing_prng_seed: Some(37),
338 namespace: linera_views::random::generate_test_namespace(),
339 num_initial_validators: 4,
340 num_shards,
341 num_proxies,
342 storage_config_builder,
343 path_provider,
344 block_exporters: ExportersSetup::Local(vec![]),
345 http_request_allow_list: Some(vec!["localhost".to_string()]),
346 binary_dir: None,
347 }
348 }
349}
350
351#[async_trait]
352impl LineraNetConfig for LocalNetConfig {
353 type Net = LocalNet;
354
355 async fn instantiate(self) -> Result<(Self::Net, ClientWrapper)> {
356 let storage_config = self.storage_config_builder.build(self.database).await?;
357 let mut net = LocalNet::new(
358 self.network,
359 self.testing_prng_seed,
360 self.namespace,
361 self.num_initial_validators,
362 self.num_proxies,
363 self.num_shards,
364 storage_config,
365 self.cross_chain_config,
366 self.path_provider,
367 self.block_exporters,
368 self.binary_dir,
369 );
370 let client = net.make_client().await;
371 ensure!(
372 self.num_initial_validators > 0,
373 "There should be at least one initial validator"
374 );
375 let total_number_shards = self.num_initial_validators * self.num_shards;
376 ensure!(
377 total_number_shards <= MAX_NUMBER_SHARDS,
378 "Total number of shards ({}) exceeds maximum allowed ({})",
379 self.num_shards,
380 MAX_NUMBER_SHARDS
381 );
382 net.generate_initial_validator_config().await?;
383 client
384 .create_genesis_config(
385 self.num_other_initial_chains,
386 self.initial_amount,
387 self.policy_config,
388 self.http_request_allow_list
389 .clone()
390 .or_else(|| Some(vec!["localhost".to_owned()])),
391 )
392 .await?;
393 net.run().await?;
394 Ok((net, client))
395 }
396}
397
398#[async_trait]
399impl LineraNet for LocalNet {
400 async fn ensure_is_running(&mut self) -> Result<()> {
401 for validator in self.running_validators.values_mut() {
402 validator.ensure_is_running().context("in local network")?;
403 }
404 Ok(())
405 }
406
407 async fn make_client(&mut self) -> ClientWrapper {
408 let client = ClientWrapper::new_with_extra_args(
409 self.path_provider.clone(),
410 self.network.external,
411 self.testing_prng_seed,
412 self.next_client_id,
413 OnClientDrop::LeakChains,
414 vec!["--wait-for-outgoing-messages".to_string()],
415 self.binary_dir.clone(),
416 );
417 if let Some(seed) = self.testing_prng_seed {
418 self.testing_prng_seed = Some(seed + 1);
419 }
420 self.next_client_id += 1;
421 client
422 }
423
424 async fn terminate(&mut self) -> Result<()> {
425 for validator in self.running_validators.values_mut() {
426 validator.terminate().await.context("in local network")?
427 }
428 Ok(())
429 }
430}
431
432impl LocalNet {
433 #[expect(clippy::too_many_arguments)]
434 fn new(
435 network: NetworkConfig,
436 testing_prng_seed: Option<u64>,
437 common_namespace: String,
438 num_initial_validators: usize,
439 num_proxies: usize,
440 num_shards: usize,
441 common_storage_config: InnerStorageConfig,
442 cross_chain_config: CrossChainConfig,
443 path_provider: PathProvider,
444 block_exporters: ExportersSetup,
445 binary_dir: Option<PathBuf>,
446 ) -> Self {
447 Self {
448 network,
449 testing_prng_seed,
450 next_client_id: 0,
451 num_initial_validators,
452 num_proxies,
453 num_shards,
454 validator_keys: BTreeMap::new(),
455 running_validators: BTreeMap::new(),
456 initialized_validator_storages: BTreeMap::new(),
457 common_namespace,
458 common_storage_config,
459 cross_chain_config,
460 path_provider,
461 block_exporters,
462 binary_dir,
463 }
464 }
465
466 async fn command_for_binary(&self, name: &'static str) -> Result<Command> {
467 let path = if let Some(dir) = &self.binary_dir {
468 dir.join(name)
469 } else {
470 resolve_binary(name, env!("CARGO_PKG_NAME")).await?
471 };
472 let mut command = Command::new(path);
473 command.current_dir(self.path_provider.path());
474 Ok(command)
475 }
476
477 #[cfg(with_testing)]
478 pub fn genesis_config(&self) -> Result<linera_client::config::GenesisConfig> {
479 let path = self.path_provider.path();
480 crate::util::read_json(path.join("genesis.json"))
481 }
482
483 fn shard_port(&self, validator: usize, shard: usize) -> usize {
484 test_offset_port() + validator * self.num_shards + shard + 1
485 }
486
487 fn proxy_internal_port(&self, validator: usize, proxy_id: usize) -> usize {
488 test_offset_port() + 1000 + validator * self.num_proxies + proxy_id + 1
489 }
490
491 fn shard_metrics_port(&self, validator: usize, shard: usize) -> usize {
492 test_offset_port() + 2000 + validator * self.num_shards + shard + 1
493 }
494
495 fn proxy_metrics_port(&self, validator: usize, proxy_id: usize) -> usize {
496 test_offset_port() + 3000 + validator * self.num_proxies + proxy_id + 1
497 }
498
499 fn block_exporter_port(&self, validator: usize, exporter_id: usize) -> usize {
500 test_offset_port() + 3000 + validator * self.num_shards + exporter_id + 1
501 }
502
503 pub fn proxy_public_port(&self, validator: usize, proxy_id: usize) -> usize {
504 test_offset_port() + 4000 + validator * self.num_proxies + proxy_id + 1
505 }
506
507 pub fn first_public_port() -> usize {
508 test_offset_port() + 4000 + 1
509 }
510
511 fn block_exporter_metrics_port(exporter_id: usize) -> usize {
512 test_offset_port() + 4000 + exporter_id + 1
513 }
514
515 fn configuration_string(&self, server_number: usize) -> Result<String> {
516 let n = server_number;
517 let path = self
518 .path_provider
519 .path()
520 .join(format!("validator_{n}.toml"));
521 let port = self.proxy_public_port(n, 0);
522 let external_protocol = self.network.external.toml();
523 let internal_protocol = self.network.internal.toml();
524 let external_host = self.network.external.localhost();
525 let internal_host = self.network.internal.localhost();
526 let mut content = format!(
527 r#"
528 server_config_path = "server_{n}.json"
529 host = "{external_host}"
530 port = {port}
531 external_protocol = {external_protocol}
532 internal_protocol = {internal_protocol}
533 "#
534 );
535
536 for k in 0..self.num_proxies {
537 let public_port = self.proxy_public_port(n, k);
538 let internal_port = self.proxy_internal_port(n, k);
539 let metrics_port = self.proxy_metrics_port(n, k);
540 content.push_str(&format!(
544 r#"
545 [[proxies]]
546 host = "{internal_host}"
547 public_port = {public_port}
548 private_port = {internal_port}
549 metrics_port = {metrics_port}
550 "#
551 ));
552 }
553
554 for k in 0..self.num_shards {
555 let shard_port = self.shard_port(n, k);
556 let shard_metrics_port = self.shard_metrics_port(n, k);
557 content.push_str(&format!(
558 r#"
559
560 [[shards]]
561 host = "{internal_host}"
562 port = {shard_port}
563 metrics_port = {shard_metrics_port}
564 "#
565 ));
566 }
567
568 match self.block_exporters {
569 ExportersSetup::Local(ref exporters) => {
570 for (j, exporter) in exporters.iter().enumerate() {
571 let host = Network::Grpc.localhost();
572 let port = self.block_exporter_port(n, j);
573 let config_content = format!(
574 r#"
575
576 [[block_exporters]]
577 host = "{host}"
578 port = {port}
579 "#
580 );
581
582 content.push_str(&config_content);
583 #[expect(
584 clippy::cast_possible_truncation,
585 reason = "loop index over a local config list bounded well below u32::MAX"
586 )]
587 let exporter_index = j as u32;
588 let exporter_config = self.generate_block_exporter_config(
589 n,
590 exporter_index,
591 &exporter.destination_config,
592 );
593 let config_path = self
594 .path_provider
595 .path()
596 .join(format!("exporter_config_{n}:{j}.toml"));
597
598 fs_err::write(&config_path, &exporter_config)?;
599 }
600 }
601 ExportersSetup::Remote(ref exporters) => {
602 for exporter in exporters {
603 let host = exporter.host.clone();
604 let port = exporter.port;
605 let config_content = format!(
606 r#"
607
608 [[block_exporters]]
609 host = "{host}"
610 port = {port}
611 "#
612 );
613
614 content.push_str(&config_content);
615 }
616 }
617 }
618
619 fs_err::write(&path, content)?;
620 path.into_os_string().into_string().map_err(|error| {
621 anyhow!(
622 "could not parse OS string into string: {}",
623 error.to_string_lossy()
624 )
625 })
626 }
627
628 fn generate_block_exporter_config(
629 &self,
630 validator: usize,
631 exporter_id: u32,
632 destination_config: &DestinationConfig,
633 ) -> String {
634 let n = validator;
635 let host = Network::Grpc.localhost();
636 let port = self.block_exporter_port(n, exporter_id as usize);
637 let metrics_port = Self::block_exporter_metrics_port(exporter_id as usize);
638 let mut config = format!(
639 r#"
640 id = {exporter_id}
641
642 metrics_port = {metrics_port}
643
644 [service_config]
645 host = "{host}"
646 port = {port}
647
648 "#
649 );
650
651 let DestinationConfig {
652 destinations,
653 committee_destination,
654 } = destination_config;
655
656 if *committee_destination {
657 let destination_string_to_push = r#"
658
659 [destination_config]
660 committee_destination = true
661 "#
662 .to_string();
663
664 config.push_str(&destination_string_to_push);
665 }
666
667 for destination in destinations {
668 let destination_string_to_push = match destination {
669 Destination::Indexer {
670 tls,
671 endpoint,
672 port,
673 } => {
674 let tls = match tls {
675 TlsConfig::ClearText => "ClearText",
676 TlsConfig::Tls => "Tls",
677 };
678 format!(
679 r#"
680 [[destination_config.destinations]]
681 tls = "{tls}"
682 endpoint = "{endpoint}"
683 port = {port}
684 kind = "Indexer"
685 "#
686 )
687 }
688 Destination::Validator { endpoint, port } => {
689 format!(
690 r#"
691 [[destination_config.destinations]]
692 endpoint = "{endpoint}"
693 port = {port}
694 kind = "Validator"
695 "#
696 )
697 }
698 Destination::Logging { file_name } => {
699 format!(
700 r#"
701 [[destination_config.destinations]]
702 file_name = "{file_name}"
703 kind = "Logging"
704 "#
705 )
706 }
707 };
708
709 config.push_str(&destination_string_to_push);
710 }
711
712 config
713 }
714
715 async fn generate_initial_validator_config(&mut self) -> Result<()> {
716 let mut command = self.command_for_binary("linera-server").await?;
717 command.arg("generate");
718 if let Some(seed) = self.testing_prng_seed {
719 command.arg("--testing-prng-seed").arg(seed.to_string());
720 self.testing_prng_seed = Some(seed + 1);
721 }
722 command.arg("--validators");
723 for i in 0..self.num_initial_validators {
724 command.arg(&self.configuration_string(i)?);
725 }
726 let output = command
727 .args(["--committee", "committee.json"])
728 .spawn_and_wait_for_stdout()
729 .await?;
730 self.validator_keys = output
731 .split_whitespace()
732 .map(str::to_string)
733 .map(|keys| keys.split(',').map(str::to_string).collect::<Vec<_>>())
734 .enumerate()
735 .map(|(i, keys)| {
736 let validator_key = keys[0].to_string();
737 let account_key = keys[1].to_string();
738 (i, (validator_key, account_key))
739 })
740 .collect();
741 Ok(())
742 }
743
744 async fn run_proxy(&self, validator: usize, proxy_id: usize) -> Result<Child> {
745 let storage = self
746 .initialized_validator_storages
747 .get(&validator)
748 .expect("initialized storage");
749 let child = self
750 .command_for_binary("linera-proxy")
751 .await?
752 .arg(format!("server_{validator}.json"))
753 .args(["--storage", &storage.to_string()])
754 .args(["--id", &proxy_id.to_string()])
755 .spawn_into()?;
756
757 let port = self.proxy_public_port(validator, proxy_id);
758 let nickname = format!("validator proxy {validator}");
759 match self.network.external {
760 Network::Grpc => {
761 Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
762 let nickname = format!("validator proxy {validator}");
763 Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
764 }
765 Network::Grpcs => {
766 let nickname = format!("validator proxy {validator}");
767 Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
768 }
769 Network::Tcp => {
770 Self::ensure_simple_server_has_started(&nickname, port, "tcp").await?;
771 }
772 Network::Udp => {
773 Self::ensure_simple_server_has_started(&nickname, port, "udp").await?;
774 }
775 }
776 Ok(child)
777 }
778
779 async fn run_exporter(&self, validator: usize, exporter_id: u32) -> Result<Child> {
780 let config_path = format!("exporter_config_{validator}:{exporter_id}.toml");
781 let storage = self
782 .initialized_validator_storages
783 .get(&validator)
784 .expect("initialized storage");
785
786 tracing::debug!(config=?config_path, storage=?storage.to_string(), "starting block exporter");
787
788 let child = self
789 .command_for_binary("linera-exporter")
790 .await?
791 .args(["run", "--config-path", &config_path])
792 .args(["--storage", &storage.to_string()])
793 .spawn_into()?;
794
795 match self.network.internal {
796 Network::Grpc => {
797 let port = self.block_exporter_port(validator, exporter_id as usize);
798 let nickname = format!("block exporter {validator}:{exporter_id}");
799 Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
800 }
801 Network::Grpcs => {
802 let port = self.block_exporter_port(validator, exporter_id as usize);
803 let nickname = format!("block exporter {validator}:{exporter_id}");
804 Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
805 }
806 Network::Tcp | Network::Udp => {
807 unreachable!("Only allowed options are grpc and grpcs")
808 }
809 }
810
811 tracing::info!("block exporter started {validator}:{exporter_id}");
812
813 Ok(child)
814 }
815
816 pub async fn ensure_grpc_server_has_started(
817 nickname: &str,
818 port: usize,
819 scheme: &str,
820 ) -> Result<()> {
821 let endpoint = match scheme {
822 "http" => Endpoint::new(format!("http://localhost:{port}"))
823 .context("endpoint should always parse")?,
824 "https" => {
825 use linera_rpc::CERT_PEM;
826 let certificate = tonic::transport::Certificate::from_pem(CERT_PEM);
827 let tls_config = ClientTlsConfig::new().ca_certificate(certificate);
828 Endpoint::new(format!("https://localhost:{port}"))
829 .context("endpoint should always parse")?
830 .tls_config(tls_config)?
831 }
832 _ => bail!("Only supported scheme are http and https"),
833 };
834 let connection = endpoint.connect_lazy();
835 let mut client = HealthClient::new(connection);
836 linera_base::time::timer::sleep(Duration::from_millis(100)).await;
837 for i in 0..10 {
838 linera_base::time::timer::sleep(Duration::from_millis(i * 500)).await;
839 let result = client.check(HealthCheckRequest::default()).await;
840 if result.is_ok() && result.unwrap().get_ref().status() == ServingStatus::Serving {
841 info!(?port, "Successfully started {nickname}");
842 return Ok(());
843 } else {
844 warn!("Waiting for {nickname} to start");
845 }
846 }
847 bail!("Failed to start {nickname}");
848 }
849
850 async fn ensure_simple_server_has_started(
851 nickname: &str,
852 port: usize,
853 protocol: &str,
854 ) -> Result<()> {
855 use linera_core::node::ValidatorNode as _;
856
857 let options = linera_rpc::NodeOptions {
858 send_timeout: Duration::from_secs(5),
859 recv_timeout: Duration::from_secs(5),
860 retry_delay: Duration::from_secs(1),
861 max_retries: 1,
862 ..Default::default()
863 };
864 let provider = linera_rpc::simple::SimpleNodeProvider::new(options);
865 let address = format!("{protocol}:127.0.0.1:{port}");
866 let node = provider.make_node(&address)?;
869 linera_base::time::timer::sleep(Duration::from_millis(100)).await;
870 for i in 0..10 {
871 linera_base::time::timer::sleep(Duration::from_millis(i * 500)).await;
872 let result = node.get_version_info().await;
873 if result.is_ok() {
874 info!("Successfully started {nickname}");
875 return Ok(());
876 } else {
877 warn!("Waiting for {nickname} to start");
878 }
879 }
880 bail!("Failed to start {nickname}");
881 }
882
883 async fn initialize_storage(&mut self, validator: usize) -> Result<()> {
884 let namespace = format!("{}_server_{}_db", self.common_namespace, validator);
885 let inner_storage_config = self.common_storage_config.clone();
886 let storage = StorageConfig {
887 inner_storage_config,
888 namespace,
889 };
890 let mut command = self.command_for_binary("linera").await?;
891 if let Ok(var) = env::var(SERVER_ENV) {
892 command.args(var.split_whitespace());
893 }
894 command.args(["storage", "initialize"]);
895 command
896 .args(["--storage", &storage.to_string()])
897 .args(["--genesis", "genesis.json"])
898 .spawn_and_wait_for_stdout()
899 .await?;
900
901 self.initialized_validator_storages
902 .insert(validator, storage);
903 Ok(())
904 }
905
906 async fn run_server(&self, validator: usize, shard: usize) -> Result<Child> {
907 let mut storage = self
908 .initialized_validator_storages
909 .get(&validator)
910 .expect("initialized storage")
911 .clone();
912
913 storage.maybe_append_shard_path(shard)?;
916
917 let mut command = self.command_for_binary("linera-server").await?;
918 if let Ok(var) = env::var(SERVER_ENV) {
919 command.args(var.split_whitespace());
920 }
921 command
922 .arg("run")
923 .args(["--storage", &storage.to_string()])
924 .args(["--server", &format!("server_{validator}.json")])
925 .args(["--shard", &shard.to_string()])
926 .args(self.cross_chain_config.to_args());
927 let child = command.spawn_into()?;
928
929 let port = self.shard_port(validator, shard);
930 let nickname = format!("validator server {validator}:{shard}");
931 match self.network.internal {
932 Network::Grpc => {
933 Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
934 }
935 Network::Grpcs => {
936 Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
937 }
938 Network::Tcp => {
939 Self::ensure_simple_server_has_started(&nickname, port, "tcp").await?;
940 }
941 Network::Udp => {
942 Self::ensure_simple_server_has_started(&nickname, port, "udp").await?;
943 }
944 }
945 Ok(child)
946 }
947
948 async fn run(&mut self) -> Result<()> {
949 for validator in 0..self.num_initial_validators {
950 self.start_validator(validator).await?;
951 }
952 Ok(())
953 }
954
955 pub async fn start_validator(&mut self, index: usize) -> Result<()> {
957 self.initialize_storage(index).await?;
958 self.restart_validator(index).await
959 }
960
961 pub async fn restart_validator(&mut self, index: usize) -> Result<()> {
964 let mut validator = Validator::new();
965 for k in 0..self.num_proxies {
966 let proxy = self.run_proxy(index, k).await?;
967 validator.add_proxy(proxy);
968 }
969 for shard in 0..self.num_shards {
970 let server = self.run_server(index, shard).await?;
971 validator.add_server(server);
972 }
973 if let ExportersSetup::Local(ref exporters) = self.block_exporters {
974 for block_exporter in 0..exporters.len() {
975 #[expect(
976 clippy::cast_possible_truncation,
977 reason = "loop index over a local exporter list bounded well below u32::MAX"
978 )]
979 let exporter_id = block_exporter as u32;
980 let exporter = self.run_exporter(index, exporter_id).await?;
981 validator.add_block_exporter(exporter);
982 }
983 }
984
985 self.running_validators.insert(index, validator);
986 Ok(())
987 }
988
989 pub async fn stop_validator(&mut self, index: usize) -> Result<()> {
991 if let Some(mut validator) = self.running_validators.remove(&index) {
992 if let Err(error) = validator.terminate().await {
993 error!("Failed to stop validator {index}: {error}");
994 return Err(error);
995 }
996 }
997 Ok(())
998 }
999
1000 pub fn validator_client(&mut self, validator: usize) -> Result<linera_rpc::Client> {
1002 let node_provider = linera_rpc::NodeProvider::new(linera_rpc::NodeOptions {
1003 send_timeout: Duration::from_secs(1),
1004 recv_timeout: Duration::from_secs(1),
1005 retry_delay: Duration::ZERO,
1006 max_retries: 0,
1007 ..Default::default()
1008 });
1009
1010 Ok(node_provider.make_node(&self.validator_address(validator))?)
1011 }
1012
1013 pub fn validator_address(&self, validator: usize) -> String {
1016 let port = self.proxy_public_port(validator, 0);
1017 let schema = self.network.external.schema();
1018
1019 format!("{schema}:localhost:{port}")
1020 }
1021}
1022
1023#[cfg(with_testing)]
1024impl LocalNet {
1025 pub fn validator_keys(&self, validator: usize) -> Option<&(String, String)> {
1027 self.validator_keys.get(&validator)
1028 }
1029
1030 pub async fn generate_validator_config(&mut self, validator: usize) -> Result<()> {
1031 let stdout = self
1032 .command_for_binary("linera-server")
1033 .await?
1034 .arg("generate")
1035 .arg("--validators")
1036 .arg(&self.configuration_string(validator)?)
1037 .spawn_and_wait_for_stdout()
1038 .await?;
1039 let keys = stdout
1040 .trim()
1041 .split(',')
1042 .map(str::to_string)
1043 .collect::<Vec<_>>();
1044 self.validator_keys
1045 .insert(validator, (keys[0].clone(), keys[1].clone()));
1046 Ok(())
1047 }
1048
1049 pub async fn terminate_server(&mut self, validator: usize, shard: usize) -> Result<()> {
1050 self.running_validators
1051 .get_mut(&validator)
1052 .context("server not found")?
1053 .terminate_server(shard)
1054 .await?;
1055 Ok(())
1056 }
1057
1058 pub fn remove_validator(&mut self, validator: usize) -> Result<()> {
1059 self.running_validators
1060 .remove(&validator)
1061 .context("validator not found")?;
1062 Ok(())
1063 }
1064
1065 pub async fn start_server(&mut self, validator: usize, shard: usize) -> Result<()> {
1066 let server = self.run_server(validator, shard).await?;
1067 self.running_validators
1068 .get_mut(&validator)
1069 .context("could not find validator")?
1070 .add_server(server);
1071 Ok(())
1072 }
1073}