linera_service/cli_wrappers/
local_net.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4#[cfg(with_testing)]
5use std::sync::LazyLock;
6use std::{
7    collections::BTreeMap,
8    env,
9    path::{Path, PathBuf},
10    sync::Arc,
11    time::Duration,
12};
13
14use anyhow::{anyhow, bail, ensure, Context, Result};
15#[cfg(with_testing)]
16use async_lock::RwLock;
17use async_trait::async_trait;
18use linera_base::{
19    command::{resolve_binary, CommandExt},
20    data_types::Amount,
21};
22use linera_client::client_options::ResourceControlPolicyConfig;
23use linera_core::node::ValidatorNodeProvider;
24use linera_rpc::config::{CrossChainConfig, ExporterServiceConfig, TlsConfig};
25#[cfg(all(feature = "storage-service", with_testing))]
26use linera_storage_service::common::storage_service_test_endpoint;
27#[cfg(all(feature = "rocksdb", feature = "scylladb", with_testing))]
28use linera_views::rocks_db::{RocksDbSpawnMode, RocksDbStore};
29#[cfg(all(feature = "scylladb", with_testing))]
30use linera_views::{scylla_db::ScyllaDbStore, store::TestKeyValueStore as _};
31use tempfile::{tempdir, TempDir};
32use tokio::process::{Child, Command};
33use tonic::transport::{channel::ClientTlsConfig, Endpoint};
34use tonic_health::pb::{
35    health_check_response::ServingStatus, health_client::HealthClient, HealthCheckRequest,
36};
37use tracing::{error, info, warn};
38
39use crate::{
40    cli_wrappers::{
41        ClientWrapper, LineraNet, LineraNetConfig, Network, NetworkConfig, OnClientDrop,
42    },
43    config::{BlockExporterConfig, Destination, DestinationConfig},
44    storage::{InnerStorageConfig, StorageConfig},
45    util::ChildExt,
46};
47
48pub enum ProcessInbox {
49    Skip,
50    Automatic,
51}
52
53#[cfg(with_testing)]
54static PORT_PROVIDER: LazyLock<RwLock<u16>> = LazyLock::new(|| RwLock::new(7080));
55
56/// Provides a port for the node service. Increment the port numbers.
57#[cfg(with_testing)]
58pub async fn get_node_port() -> u16 {
59    let mut port = PORT_PROVIDER.write().await;
60    let port_ret = *port;
61    *port += 1;
62    info!("get_node_port returning port_ret={}", port_ret);
63    assert!(port_selector::is_free(port_ret));
64    port_ret
65}
66
67#[cfg(with_testing)]
68async fn make_testing_config(database: Database) -> Result<InnerStorageConfig> {
69    match database {
70        Database::Service => {
71            #[cfg(feature = "storage-service")]
72            {
73                let endpoint = storage_service_test_endpoint()
74                    .expect("Reading LINERA_STORAGE_SERVICE environment variable");
75                Ok(InnerStorageConfig::Service { endpoint })
76            }
77            #[cfg(not(feature = "storage-service"))]
78            panic!("Database::Service is selected without the feature storage_service");
79        }
80        Database::DynamoDb => {
81            #[cfg(feature = "dynamodb")]
82            {
83                let use_dynamodb_local = true;
84                Ok(InnerStorageConfig::DynamoDb { use_dynamodb_local })
85            }
86            #[cfg(not(feature = "dynamodb"))]
87            panic!("Database::DynamoDb is selected without the feature dynamodb");
88        }
89        Database::ScyllaDb => {
90            #[cfg(feature = "scylladb")]
91            {
92                let config = ScyllaDbStore::new_test_config().await?;
93                Ok(InnerStorageConfig::ScyllaDb {
94                    uri: config.inner_config.uri,
95                })
96            }
97            #[cfg(not(feature = "scylladb"))]
98            panic!("Database::ScyllaDb is selected without the feature scylladb");
99        }
100        Database::DualRocksDbScyllaDb => {
101            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
102            {
103                let rocksdb_config = RocksDbStore::new_test_config().await?;
104                let scylla_config = ScyllaDbStore::new_test_config().await?;
105                let spawn_mode = RocksDbSpawnMode::get_spawn_mode_from_runtime();
106                Ok(InnerStorageConfig::DualRocksDbScyllaDb {
107                    path_with_guard: rocksdb_config.inner_config.path_with_guard,
108                    spawn_mode,
109                    uri: scylla_config.inner_config.uri,
110                })
111            }
112            #[cfg(not(all(feature = "rocksdb", feature = "scylladb")))]
113            panic!("Database::DualRocksDbScyllaDb is selected without the features rocksdb and scylladb");
114        }
115    }
116}
117
118pub enum InnerStorageConfigBuilder {
119    #[cfg(with_testing)]
120    TestConfig,
121    ExistingConfig {
122        storage_config: InnerStorageConfig,
123    },
124}
125
126impl InnerStorageConfigBuilder {
127    #[allow(unused_variables)]
128    pub async fn build(self, database: Database) -> Result<InnerStorageConfig> {
129        match self {
130            #[cfg(with_testing)]
131            InnerStorageConfigBuilder::TestConfig => make_testing_config(database).await,
132            InnerStorageConfigBuilder::ExistingConfig { storage_config } => Ok(storage_config),
133        }
134    }
135}
136
137/// Path used for the run can come from a path whose lifetime is controlled
138/// by an external user or as a temporary directory
139#[derive(Clone)]
140pub enum PathProvider {
141    ExternalPath { path_buf: PathBuf },
142    TemporaryDirectory { tmp_dir: Arc<TempDir> },
143}
144
145impl PathProvider {
146    pub fn path(&self) -> &Path {
147        match self {
148            PathProvider::ExternalPath { path_buf } => path_buf.as_path(),
149            PathProvider::TemporaryDirectory { tmp_dir } => tmp_dir.path(),
150        }
151    }
152
153    pub fn create_temporary_directory() -> Result<Self> {
154        let tmp_dir = Arc::new(tempdir()?);
155        Ok(PathProvider::TemporaryDirectory { tmp_dir })
156    }
157
158    pub fn from_path_option(path: &Option<String>) -> anyhow::Result<Self> {
159        Ok(match path {
160            None => {
161                let tmp_dir = Arc::new(tempfile::tempdir()?);
162                PathProvider::TemporaryDirectory { tmp_dir }
163            }
164            Some(path) => {
165                let path = Path::new(path);
166                let path_buf = path.to_path_buf();
167                PathProvider::ExternalPath { path_buf }
168            }
169        })
170    }
171}
172
173/// The information needed to start a [`LocalNet`].
174pub struct LocalNetConfig {
175    pub database: Database,
176    pub network: NetworkConfig,
177    pub testing_prng_seed: Option<u64>,
178    pub namespace: String,
179    pub num_other_initial_chains: u32,
180    pub initial_amount: Amount,
181    pub num_initial_validators: usize,
182    pub num_shards: usize,
183    pub num_proxies: usize,
184    pub policy_config: ResourceControlPolicyConfig,
185    pub cross_chain_config: CrossChainConfig,
186    pub storage_config_builder: InnerStorageConfigBuilder,
187    pub path_provider: PathProvider,
188    pub block_exporters: ExportersSetup,
189}
190
191/// The setup for the block exporters.
192pub enum ExportersSetup {
193    // Block exporters are meant to be started and managed by the testing framework.
194    Local(Vec<BlockExporterConfig>),
195    // Block exporters are already started and we just need to connect to them.
196    Remote(Vec<ExporterServiceConfig>),
197}
198
199/// A set of Linera validators running locally as native processes.
200pub struct LocalNet {
201    network: NetworkConfig,
202    testing_prng_seed: Option<u64>,
203    next_client_id: usize,
204    num_initial_validators: usize,
205    num_proxies: usize,
206    num_shards: usize,
207    validator_keys: BTreeMap<usize, (String, String)>,
208    running_validators: BTreeMap<usize, Validator>,
209    initialized_validator_storages: BTreeMap<usize, StorageConfig>,
210    common_namespace: String,
211    common_storage_config: InnerStorageConfig,
212    cross_chain_config: CrossChainConfig,
213    path_provider: PathProvider,
214    block_exporters: ExportersSetup,
215}
216
217/// The name of the environment variable that allows specifying additional arguments to be passed
218/// to the binary when starting a server.
219const SERVER_ENV: &str = "LINERA_SERVER_PARAMS";
220
221/// Description of the database engine to use inside a local Linera network.
222#[derive(Copy, Clone, Eq, PartialEq)]
223pub enum Database {
224    Service,
225    DynamoDb,
226    ScyllaDb,
227    DualRocksDbScyllaDb,
228}
229
230/// The processes of a running validator.
231struct Validator {
232    proxy: Child,
233    servers: Vec<Child>,
234    exporters: Vec<Child>,
235}
236
237impl Validator {
238    fn new(proxy: Child) -> Self {
239        Self {
240            proxy,
241            servers: vec![],
242            exporters: vec![],
243        }
244    }
245
246    async fn terminate(&mut self) -> Result<()> {
247        self.proxy
248            .kill()
249            .await
250            .context("terminating validator proxy")?;
251        for server in &mut self.servers {
252            server
253                .kill()
254                .await
255                .context("terminating validator server")?;
256        }
257        Ok(())
258    }
259
260    fn add_server(&mut self, server: Child) {
261        self.servers.push(server)
262    }
263
264    #[cfg(with_testing)]
265    async fn terminate_server(&mut self, index: usize) -> Result<()> {
266        let mut server = self.servers.remove(index);
267        server
268            .kill()
269            .await
270            .context("terminating validator server")?;
271        Ok(())
272    }
273
274    fn add_block_exporter(&mut self, exporter: Child) {
275        self.exporters.push(exporter);
276    }
277
278    fn ensure_is_running(&mut self) -> Result<()> {
279        self.proxy.ensure_is_running()?;
280        for child in &mut self.servers {
281            child.ensure_is_running()?;
282        }
283        for exporter in &mut self.exporters {
284            exporter.ensure_is_running()?;
285        }
286        Ok(())
287    }
288}
289
290#[cfg(with_testing)]
291impl LocalNetConfig {
292    pub fn new_test(database: Database, network: Network) -> Self {
293        let num_shards = 4;
294        let num_proxies = 1;
295        let storage_config_builder = InnerStorageConfigBuilder::TestConfig;
296        let path_provider = PathProvider::create_temporary_directory().unwrap();
297        let internal = network.drop_tls();
298        let external = network;
299        let network = NetworkConfig { internal, external };
300        let cross_chain_config = CrossChainConfig::default();
301        Self {
302            database,
303            network,
304            num_other_initial_chains: 2,
305            initial_amount: Amount::from_tokens(1_000_000),
306            policy_config: ResourceControlPolicyConfig::Testnet,
307            cross_chain_config,
308            testing_prng_seed: Some(37),
309            namespace: linera_views::random::generate_test_namespace(),
310            num_initial_validators: 4,
311            num_shards,
312            num_proxies,
313            storage_config_builder,
314            path_provider,
315            block_exporters: ExportersSetup::Local(vec![]),
316        }
317    }
318}
319
320#[async_trait]
321impl LineraNetConfig for LocalNetConfig {
322    type Net = LocalNet;
323
324    async fn instantiate(self) -> Result<(Self::Net, ClientWrapper)> {
325        let storage_config = self.storage_config_builder.build(self.database).await?;
326        let mut net = LocalNet::new(
327            self.network,
328            self.testing_prng_seed,
329            self.namespace,
330            self.num_initial_validators,
331            self.num_shards,
332            self.num_proxies,
333            storage_config,
334            self.cross_chain_config,
335            self.path_provider,
336            self.block_exporters,
337        )?;
338        let client = net.make_client().await;
339        ensure!(
340            self.num_initial_validators > 0,
341            "There should be at least one initial validator"
342        );
343        net.generate_initial_validator_config().await?;
344        client
345            .create_genesis_config(
346                self.num_other_initial_chains,
347                self.initial_amount,
348                self.policy_config,
349                Some(vec!["localhost".to_owned()]),
350            )
351            .await?;
352        net.run().await?;
353        Ok((net, client))
354    }
355}
356
357#[async_trait]
358impl LineraNet for LocalNet {
359    async fn ensure_is_running(&mut self) -> Result<()> {
360        for validator in self.running_validators.values_mut() {
361            validator.ensure_is_running().context("in local network")?;
362        }
363        Ok(())
364    }
365
366    async fn make_client(&mut self) -> ClientWrapper {
367        let client = ClientWrapper::new(
368            self.path_provider.clone(),
369            self.network.external,
370            self.testing_prng_seed,
371            self.next_client_id,
372            OnClientDrop::LeakChains,
373        );
374        if let Some(seed) = self.testing_prng_seed {
375            self.testing_prng_seed = Some(seed + 1);
376        }
377        self.next_client_id += 1;
378        client
379    }
380
381    async fn terminate(&mut self) -> Result<()> {
382        for validator in self.running_validators.values_mut() {
383            validator.terminate().await.context("in local network")?
384        }
385        Ok(())
386    }
387}
388
389impl LocalNet {
390    #[expect(clippy::too_many_arguments)]
391    fn new(
392        network: NetworkConfig,
393        testing_prng_seed: Option<u64>,
394        common_namespace: String,
395        num_initial_validators: usize,
396        num_proxies: usize,
397        num_shards: usize,
398        common_storage_config: InnerStorageConfig,
399        cross_chain_config: CrossChainConfig,
400        path_provider: PathProvider,
401        block_exporters: ExportersSetup,
402    ) -> Result<Self> {
403        Ok(Self {
404            network,
405            testing_prng_seed,
406            next_client_id: 0,
407            num_initial_validators,
408            num_proxies,
409            num_shards,
410            validator_keys: BTreeMap::new(),
411            running_validators: BTreeMap::new(),
412            initialized_validator_storages: BTreeMap::new(),
413            common_namespace,
414            common_storage_config,
415            cross_chain_config,
416            path_provider,
417            block_exporters,
418        })
419    }
420
421    async fn command_for_binary(&self, name: &'static str) -> Result<Command> {
422        let path = resolve_binary(name, env!("CARGO_PKG_NAME")).await?;
423        let mut command = Command::new(path);
424        command.current_dir(self.path_provider.path());
425        Ok(command)
426    }
427
428    #[cfg(with_testing)]
429    pub fn genesis_config(&self) -> Result<linera_client::config::GenesisConfig> {
430        let path = self.path_provider.path();
431        crate::util::read_json(path.join("genesis.json"))
432    }
433
434    pub fn proxy_public_port(validator: usize, proxy_id: usize) -> usize {
435        13000 + validator * 100 + proxy_id + 1
436    }
437
438    pub fn proxy_internal_port(validator: usize, proxy_id: usize) -> usize {
439        10000 + validator * 100 + proxy_id + 1
440    }
441
442    fn shard_port(validator: usize, shard: usize) -> usize {
443        9000 + validator * 100 + shard + 1
444    }
445
446    fn proxy_metrics_port(validator: usize, proxy_id: usize) -> usize {
447        12000 + validator * 100 + proxy_id + 1
448    }
449
450    fn shard_metrics_port(validator: usize, shard: usize) -> usize {
451        11000 + validator * 100 + shard + 1
452    }
453
454    fn block_exporter_port(validator: usize, exporter_id: usize) -> usize {
455        12000 + validator * 100 + exporter_id + 1
456    }
457
458    fn block_exporter_metrics_port(exporter_id: usize) -> usize {
459        13000 + exporter_id + 1
460    }
461
462    fn configuration_string(&self, server_number: usize) -> Result<String> {
463        let n = server_number;
464        let path = self
465            .path_provider
466            .path()
467            .join(format!("validator_{n}.toml"));
468        let port = Self::proxy_public_port(n, 0);
469        let external_protocol = self.network.external.toml();
470        let internal_protocol = self.network.internal.toml();
471        let external_host = self.network.external.localhost();
472        let internal_host = self.network.internal.localhost();
473        let mut content = format!(
474            r#"
475                server_config_path = "server_{n}.json"
476                host = "{external_host}"
477                port = {port}
478                external_protocol = {external_protocol}
479                internal_protocol = {internal_protocol}
480            "#
481        );
482
483        for k in 0..self.num_proxies {
484            let internal_port = Self::proxy_internal_port(n, k);
485            let metrics_port = Self::proxy_metrics_port(n, k);
486            // In the local network, the validator ingress is
487            // the proxy - so the `public_port` is the validator
488            // port.
489            content.push_str(&format!(
490                r#"
491                [[proxies]]
492                host = "{internal_host}"
493                public_port = {port}
494                private_port = {internal_port}
495                metrics_host = "{external_host}"
496                metrics_port = {metrics_port}
497                "#
498            ));
499        }
500
501        for k in 0..self.num_shards {
502            let shard_port = Self::shard_port(n, k);
503            let shard_metrics_port = Self::shard_metrics_port(n, k);
504            content.push_str(&format!(
505                r#"
506
507                [[shards]]
508                host = "{internal_host}"
509                port = {shard_port}
510                metrics_port = {shard_metrics_port}
511                "#
512            ));
513        }
514
515        match self.block_exporters {
516            ExportersSetup::Local(ref exporters) => {
517                for (j, exporter) in exporters.iter().enumerate() {
518                    let host = Network::Grpc.localhost();
519                    let port = Self::block_exporter_port(n, j);
520                    let config_content = format!(
521                        r#"
522
523                        [[block_exporters]]
524                        host = "{host}"
525                        port = {port}
526                        "#
527                    );
528
529                    content.push_str(&config_content);
530                    let exporter_config = self.generate_block_exporter_config(
531                        n,
532                        j as u32,
533                        &exporter.destination_config,
534                    );
535                    let config_path = self
536                        .path_provider
537                        .path()
538                        .join(format!("exporter_config_{n}:{j}.toml"));
539
540                    fs_err::write(&config_path, &exporter_config)?;
541                }
542            }
543            ExportersSetup::Remote(ref exporters) => {
544                for exporter in exporters.iter() {
545                    let host = exporter.host.clone();
546                    let port = exporter.port;
547                    let config_content = format!(
548                        r#"
549
550                        [[block_exporters]]
551                        host = "{host}"
552                        port = {port}
553                        "#
554                    );
555
556                    content.push_str(&config_content);
557                }
558            }
559        }
560
561        fs_err::write(&path, content)?;
562        path.into_os_string().into_string().map_err(|error| {
563            anyhow!(
564                "could not parse OS string into string: {}",
565                error.to_string_lossy()
566            )
567        })
568    }
569
570    fn generate_block_exporter_config(
571        &self,
572        validator: usize,
573        exporter_id: u32,
574        destination_config: &DestinationConfig,
575    ) -> String {
576        let n = validator;
577        let host = Network::Grpc.localhost();
578        let port = Self::block_exporter_port(n, exporter_id as usize);
579        let metrics_port = Self::block_exporter_metrics_port(exporter_id as usize);
580        let mut config = format!(
581            r#"
582            id = {exporter_id}
583
584            metrics_port = {metrics_port}
585
586            [service_config]
587            host = "{host}"
588            port = {port}
589
590            "#
591        );
592
593        let DestinationConfig {
594            destinations,
595            committee_destination,
596        } = destination_config;
597
598        if *committee_destination {
599            let destination_string_to_push = r#"
600
601            [destination_config]
602            committee_destination = true
603            "#
604            .to_string();
605
606            config.push_str(&destination_string_to_push);
607        }
608
609        for destination in destinations {
610            let destination_string_to_push = match destination {
611                Destination::Indexer {
612                    tls,
613                    endpoint,
614                    port,
615                } => {
616                    let tls = match tls {
617                        TlsConfig::ClearText => "ClearText",
618                        TlsConfig::Tls => "Tls",
619                    };
620                    format!(
621                        r#"
622                        [[destination_config.destinations]]
623                        tls = "{tls}"
624                        endpoint = "{endpoint}"
625                        port = {port}
626                        kind = "Indexer"
627                        "#
628                    )
629                }
630                Destination::Validator { endpoint, port } => {
631                    format!(
632                        r#"
633                        [[destination_config.destinations]]
634                        endpoint = "{endpoint}"
635                        port = {port}
636                        kind = "Validator"
637                        "#
638                    )
639                }
640                Destination::Logging { file_name } => {
641                    format!(
642                        r#"
643                        [[destination_config.destinations]]
644                        file_name = "{file_name}"
645                        kind = "Logging"
646                        "#
647                    )
648                }
649            };
650
651            config.push_str(&destination_string_to_push);
652        }
653
654        config
655    }
656
657    async fn generate_initial_validator_config(&mut self) -> Result<()> {
658        let mut command = self.command_for_binary("linera-server").await?;
659        command.arg("generate");
660        if let Some(seed) = self.testing_prng_seed {
661            command.arg("--testing-prng-seed").arg(seed.to_string());
662            self.testing_prng_seed = Some(seed + 1);
663        }
664        command.arg("--validators");
665        for i in 0..self.num_initial_validators {
666            command.arg(&self.configuration_string(i)?);
667        }
668        let output = command
669            .args(["--committee", "committee.json"])
670            .spawn_and_wait_for_stdout()
671            .await?;
672        self.validator_keys = output
673            .split_whitespace()
674            .map(str::to_string)
675            .map(|keys| keys.split(',').map(str::to_string).collect::<Vec<_>>())
676            .enumerate()
677            .map(|(i, keys)| {
678                let validator_key = keys[0].to_string();
679                let account_key = keys[1].to_string();
680                (i, (validator_key, account_key))
681            })
682            .collect();
683        Ok(())
684    }
685
686    async fn run_proxy(&mut self, validator: usize) -> Result<Child> {
687        let storage = self
688            .initialized_validator_storages
689            .get(&validator)
690            .expect("initialized storage");
691        let child = self
692            .command_for_binary("linera-proxy")
693            .await?
694            .arg(format!("server_{}.json", validator))
695            .args(["--storage", &storage.to_string()])
696            .spawn_into()?;
697
698        let port = Self::proxy_public_port(validator, 0);
699        let nickname = format!("validator proxy {validator}");
700        match self.network.external {
701            Network::Grpc => {
702                Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
703                let nickname = format!("validator proxy {validator}");
704                Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
705            }
706            Network::Grpcs => {
707                let nickname = format!("validator proxy {validator}");
708                Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
709            }
710            Network::Tcp => {
711                Self::ensure_simple_server_has_started(&nickname, port, "tcp").await?;
712            }
713            Network::Udp => {
714                Self::ensure_simple_server_has_started(&nickname, port, "udp").await?;
715            }
716        }
717        Ok(child)
718    }
719
720    async fn run_exporter(&mut self, validator: usize, exporter_id: u32) -> Result<Child> {
721        let config_path = format!("exporter_config_{validator}:{exporter_id}.toml");
722        let storage = self
723            .initialized_validator_storages
724            .get(&validator)
725            .expect("initialized storage");
726
727        tracing::debug!(config=?config_path, storage=?storage.to_string(), "starting block exporter");
728
729        let child = self
730            .command_for_binary("linera-exporter")
731            .await?
732            .args(["--config-path", &config_path])
733            .args(["--storage", &storage.to_string()])
734            .spawn_into()?;
735
736        match self.network.internal {
737            Network::Grpc => {
738                let port = Self::block_exporter_port(validator, exporter_id as usize);
739                let nickname = format!("block exporter {validator}:{exporter_id}");
740                Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
741            }
742            Network::Grpcs => {
743                let port = Self::block_exporter_port(validator, exporter_id as usize);
744                let nickname = format!("block exporter  {validator}:{exporter_id}");
745                Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
746            }
747            Network::Tcp | Network::Udp => {
748                unreachable!("Only allowed options are grpc and grpcs")
749            }
750        }
751
752        tracing::info!("block exporter started {validator}:{exporter_id}");
753
754        Ok(child)
755    }
756
757    pub async fn ensure_grpc_server_has_started(
758        nickname: &str,
759        port: usize,
760        scheme: &str,
761    ) -> Result<()> {
762        let endpoint = match scheme {
763            "http" => Endpoint::new(format!("http://localhost:{port}"))
764                .context("endpoint should always parse")?,
765            "https" => {
766                use linera_rpc::CERT_PEM;
767                let certificate = tonic::transport::Certificate::from_pem(CERT_PEM);
768                let tls_config = ClientTlsConfig::new().ca_certificate(certificate);
769                Endpoint::new(format!("https://localhost:{port}"))
770                    .context("endpoint should always parse")?
771                    .tls_config(tls_config)?
772            }
773            _ => bail!("Only supported scheme are http and https"),
774        };
775        let connection = endpoint.connect_lazy();
776        let mut client = HealthClient::new(connection);
777        linera_base::time::timer::sleep(Duration::from_millis(100)).await;
778        for i in 0..10 {
779            linera_base::time::timer::sleep(Duration::from_millis(i * 500)).await;
780            let result = client.check(HealthCheckRequest::default()).await;
781            if result.is_ok() && result.unwrap().get_ref().status() == ServingStatus::Serving {
782                info!(?port, "Successfully started {nickname}");
783                return Ok(());
784            } else {
785                warn!("Waiting for {nickname} to start");
786            }
787        }
788        bail!("Failed to start {nickname}");
789    }
790
791    pub async fn ensure_simple_server_has_started(
792        nickname: &str,
793        port: usize,
794        protocol: &str,
795    ) -> Result<()> {
796        use linera_core::node::ValidatorNode as _;
797
798        let options = linera_rpc::NodeOptions {
799            send_timeout: Duration::from_secs(5),
800            recv_timeout: Duration::from_secs(5),
801            retry_delay: Duration::from_secs(1),
802            max_retries: 1,
803        };
804        let provider = linera_rpc::simple::SimpleNodeProvider::new(options);
805        let address = format!("{protocol}:127.0.0.1:{port}");
806        // All "simple" services (i.e. proxy and "server") are based on `RpcMessage` and
807        // support `VersionInfoQuery`.
808        let node = provider.make_node(&address)?;
809        linera_base::time::timer::sleep(Duration::from_millis(100)).await;
810        for i in 0..10 {
811            linera_base::time::timer::sleep(Duration::from_millis(i * 500)).await;
812            let result = node.get_version_info().await;
813            if result.is_ok() {
814                info!("Successfully started {nickname}");
815                return Ok(());
816            } else {
817                warn!("Waiting for {nickname} to start");
818            }
819        }
820        bail!("Failed to start {nickname}");
821    }
822
823    async fn initialize_storage(&mut self, validator: usize) -> Result<()> {
824        let namespace = format!("{}_server_{}_db", self.common_namespace, validator);
825        let inner_storage_config = self.common_storage_config.clone();
826        let storage = StorageConfig {
827            inner_storage_config,
828            namespace,
829        };
830        let mut command = self.command_for_binary("linera").await?;
831        if let Ok(var) = env::var(SERVER_ENV) {
832            command.args(var.split_whitespace());
833        }
834        command.args(["storage", "initialize"]);
835        command
836            .args(["--storage", &storage.to_string()])
837            .args(["--genesis", "genesis.json"])
838            .spawn_and_wait_for_stdout()
839            .await?;
840
841        self.initialized_validator_storages
842            .insert(validator, storage);
843        Ok(())
844    }
845
846    async fn run_server(&mut self, validator: usize, shard: usize) -> Result<Child> {
847        let mut storage = self
848            .initialized_validator_storages
849            .get(&validator)
850            .expect("initialized storage")
851            .clone();
852
853        // For the storage backends with a local directory, make sure that we don't reuse
854        // the same directory for all the shards.
855        storage.maybe_append_shard_path(shard)?;
856
857        let mut command = self.command_for_binary("linera-server").await?;
858        if let Ok(var) = env::var(SERVER_ENV) {
859            command.args(var.split_whitespace());
860        }
861        command
862            .arg("run")
863            .args(["--storage", &storage.to_string()])
864            .args(["--server", &format!("server_{}.json", validator)])
865            .args(["--shard", &shard.to_string()])
866            .args(self.cross_chain_config.to_args());
867        let child = command.spawn_into()?;
868
869        let port = Self::shard_port(validator, shard);
870        let nickname = format!("validator server {validator}:{shard}");
871        match self.network.internal {
872            Network::Grpc => {
873                Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
874            }
875            Network::Grpcs => {
876                Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
877            }
878            Network::Tcp => {
879                Self::ensure_simple_server_has_started(&nickname, port, "tcp").await?;
880            }
881            Network::Udp => {
882                Self::ensure_simple_server_has_started(&nickname, port, "udp").await?;
883            }
884        }
885        Ok(child)
886    }
887
888    async fn run(&mut self) -> Result<()> {
889        for validator in 0..self.num_initial_validators {
890            self.start_validator(validator).await?;
891        }
892        Ok(())
893    }
894
895    /// Start a validator.
896    pub async fn start_validator(&mut self, index: usize) -> Result<()> {
897        self.initialize_storage(index).await?;
898        self.restart_validator(index).await
899    }
900
901    /// Restart a validator. This is similar to `start_validator` except that the
902    /// database was already initialized once.
903    pub async fn restart_validator(&mut self, index: usize) -> Result<()> {
904        let proxy = self.run_proxy(index).await?;
905        let mut validator = Validator::new(proxy);
906        for shard in 0..self.num_shards {
907            let server = self.run_server(index, shard).await?;
908            validator.add_server(server);
909        }
910        if let ExportersSetup::Local(ref exporters) = self.block_exporters {
911            for block_exporter in 0..exporters.len() {
912                let exporter = self.run_exporter(index, block_exporter as u32).await?;
913                validator.add_block_exporter(exporter);
914            }
915        }
916
917        self.running_validators.insert(index, validator);
918        Ok(())
919    }
920
921    /// Terminates all the processes of a given validator.
922    pub async fn stop_validator(&mut self, index: usize) -> Result<()> {
923        if let Some(mut validator) = self.running_validators.remove(&index) {
924            if let Err(error) = validator.terminate().await {
925                error!("Failed to stop validator {index}: {error}");
926                return Err(error);
927            }
928        }
929        Ok(())
930    }
931
932    /// Returns a [`linera_rpc::Client`] to interact directly with a `validator`.
933    pub async fn validator_client(&mut self, validator: usize) -> Result<linera_rpc::Client> {
934        let node_provider = linera_rpc::NodeProvider::new(linera_rpc::NodeOptions {
935            send_timeout: Duration::from_secs(1),
936            recv_timeout: Duration::from_secs(1),
937            retry_delay: Duration::ZERO,
938            max_retries: 0,
939        });
940
941        Ok(node_provider.make_node(&self.validator_address(validator))?)
942    }
943
944    /// Returns the address to connect to a validator's proxy.
945    /// In local networks, the zeroth proxy _is_ the validator ingress.
946    pub fn validator_address(&self, validator: usize) -> String {
947        let port = Self::proxy_public_port(validator, 0);
948        let schema = self.network.external.schema();
949
950        format!("{schema}:localhost:{port}")
951    }
952}
953
954#[cfg(with_testing)]
955impl LocalNet {
956    /// Returns the validating key and an account key of the validator.
957    pub fn validator_keys(&self, validator: usize) -> Option<&(String, String)> {
958        self.validator_keys.get(&validator)
959    }
960
961    pub async fn generate_validator_config(&mut self, validator: usize) -> Result<()> {
962        let stdout = self
963            .command_for_binary("linera-server")
964            .await?
965            .arg("generate")
966            .arg("--validators")
967            .arg(&self.configuration_string(validator)?)
968            .spawn_and_wait_for_stdout()
969            .await?;
970        let keys = stdout
971            .trim()
972            .split(',')
973            .map(str::to_string)
974            .collect::<Vec<_>>();
975        self.validator_keys
976            .insert(validator, (keys[0].clone(), keys[1].clone()));
977        Ok(())
978    }
979
980    pub async fn terminate_server(&mut self, validator: usize, shard: usize) -> Result<()> {
981        self.running_validators
982            .get_mut(&validator)
983            .context("server not found")?
984            .terminate_server(shard)
985            .await?;
986        Ok(())
987    }
988
989    pub fn remove_validator(&mut self, validator: usize) -> Result<()> {
990        self.running_validators
991            .remove(&validator)
992            .context("validator not found")?;
993        Ok(())
994    }
995
996    pub async fn start_server(&mut self, validator: usize, shard: usize) -> Result<()> {
997        let server = self.run_server(validator, shard).await?;
998        self.running_validators
999            .get_mut(&validator)
1000            .context("could not find validator")?
1001            .add_server(server);
1002        Ok(())
1003    }
1004}