linera_service/cli_wrappers/
local_net.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4#[cfg(with_testing)]
5use std::sync::LazyLock;
6use std::{
7    collections::BTreeMap,
8    env,
9    num::NonZeroU16,
10    path::{Path, PathBuf},
11    sync::Arc,
12    time::Duration,
13};
14
15use anyhow::{anyhow, bail, ensure, Context, Result};
16#[cfg(with_testing)]
17use async_lock::RwLock;
18use async_trait::async_trait;
19use linera_base::{
20    command::{resolve_binary, CommandExt},
21    data_types::Amount,
22};
23use linera_client::client_options::ResourceControlPolicyConfig;
24use linera_core::node::ValidatorNodeProvider;
25use linera_rpc::config::{CrossChainConfig, ExporterServiceConfig, TlsConfig};
26#[cfg(all(feature = "storage-service", with_testing))]
27use linera_storage_service::common::storage_service_test_endpoint;
28#[cfg(all(feature = "rocksdb", feature = "scylladb", with_testing))]
29use linera_views::rocks_db::{RocksDbDatabase, RocksDbSpawnMode};
30#[cfg(all(feature = "scylladb", with_testing))]
31use linera_views::{scylla_db::ScyllaDbDatabase, store::TestKeyValueDatabase as _};
32use tempfile::{tempdir, TempDir};
33use tokio::process::{Child, Command};
34use tonic::transport::{channel::ClientTlsConfig, Endpoint};
35use tonic_health::pb::{
36    health_check_response::ServingStatus, health_client::HealthClient, HealthCheckRequest,
37};
38use tracing::{error, info, warn};
39
40use crate::{
41    cli_wrappers::{
42        ClientWrapper, LineraNet, LineraNetConfig, Network, NetworkConfig, OnClientDrop,
43    },
44    config::{BlockExporterConfig, Destination, DestinationConfig},
45    storage::{InnerStorageConfig, StorageConfig},
46    util::ChildExt,
47};
48
49/// Maximum allowed number of shards over all validators.
50const MAX_NUMBER_SHARDS: usize = 1000;
51
52pub enum ProcessInbox {
53    Skip,
54    Automatic,
55}
56
57#[cfg(with_testing)]
58static PORT_PROVIDER: LazyLock<RwLock<u16>> = LazyLock::new(|| RwLock::new(7080));
59
60/// The offset of the port
61fn test_offset_port() -> usize {
62    std::env::var("TEST_OFFSET_PORT")
63        .ok()
64        .and_then(|port_str| port_str.parse::<usize>().ok())
65        .unwrap_or(9000)
66}
67
68/// Provides a port for the node service. Increment the port numbers.
69#[cfg(with_testing)]
70pub async fn get_node_port() -> u16 {
71    let mut port = PORT_PROVIDER.write().await;
72    let port_ret = *port;
73    *port += 1;
74    info!("get_node_port returning port_ret={}", port_ret);
75    assert!(port_selector::is_free(port_ret));
76    port_ret
77}
78
79#[cfg(with_testing)]
80async fn make_testing_config(database: Database) -> Result<InnerStorageConfig> {
81    match database {
82        Database::Service => {
83            #[cfg(feature = "storage-service")]
84            {
85                let endpoint = storage_service_test_endpoint()
86                    .expect("Reading LINERA_STORAGE_SERVICE environment variable");
87                Ok(InnerStorageConfig::Service { endpoint })
88            }
89            #[cfg(not(feature = "storage-service"))]
90            panic!("Database::Service is selected without the feature storage_service");
91        }
92        Database::DynamoDb => {
93            #[cfg(feature = "dynamodb")]
94            {
95                let use_dynamodb_local = true;
96                Ok(InnerStorageConfig::DynamoDb { use_dynamodb_local })
97            }
98            #[cfg(not(feature = "dynamodb"))]
99            panic!("Database::DynamoDb is selected without the feature dynamodb");
100        }
101        Database::ScyllaDb => {
102            #[cfg(feature = "scylladb")]
103            {
104                let config = ScyllaDbDatabase::new_test_config().await?;
105                Ok(InnerStorageConfig::ScyllaDb {
106                    uri: config.inner_config.uri,
107                })
108            }
109            #[cfg(not(feature = "scylladb"))]
110            panic!("Database::ScyllaDb is selected without the feature scylladb");
111        }
112        Database::DualRocksDbScyllaDb => {
113            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
114            {
115                let rocksdb_config = RocksDbDatabase::new_test_config().await?;
116                let scylla_config = ScyllaDbDatabase::new_test_config().await?;
117                let spawn_mode = RocksDbSpawnMode::get_spawn_mode_from_runtime();
118                Ok(InnerStorageConfig::DualRocksDbScyllaDb {
119                    path_with_guard: rocksdb_config.inner_config.path_with_guard,
120                    spawn_mode,
121                    uri: scylla_config.inner_config.uri,
122                })
123            }
124            #[cfg(not(all(feature = "rocksdb", feature = "scylladb")))]
125            panic!("Database::DualRocksDbScyllaDb is selected without the features rocksdb and scylladb");
126        }
127    }
128}
129
130pub enum InnerStorageConfigBuilder {
131    #[cfg(with_testing)]
132    TestConfig,
133    ExistingConfig {
134        storage_config: InnerStorageConfig,
135    },
136}
137
138impl InnerStorageConfigBuilder {
139    #[cfg_attr(not(with_testing), expect(unused_variables))]
140    pub async fn build(self, database: Database) -> Result<InnerStorageConfig> {
141        match self {
142            #[cfg(with_testing)]
143            InnerStorageConfigBuilder::TestConfig => make_testing_config(database).await,
144            InnerStorageConfigBuilder::ExistingConfig { storage_config } => Ok(storage_config),
145        }
146    }
147}
148
149/// Path used for the run can come from a path whose lifetime is controlled
150/// by an external user or as a temporary directory
151#[derive(Clone)]
152pub enum PathProvider {
153    ExternalPath { path_buf: PathBuf },
154    TemporaryDirectory { tmp_dir: Arc<TempDir> },
155}
156
157impl PathProvider {
158    pub fn path(&self) -> &Path {
159        match self {
160            PathProvider::ExternalPath { path_buf } => path_buf.as_path(),
161            PathProvider::TemporaryDirectory { tmp_dir } => tmp_dir.path(),
162        }
163    }
164
165    pub fn create_temporary_directory() -> Result<Self> {
166        let tmp_dir = Arc::new(tempdir()?);
167        Ok(PathProvider::TemporaryDirectory { tmp_dir })
168    }
169
170    pub fn from_path_option(path: &Option<String>) -> anyhow::Result<Self> {
171        Ok(match path {
172            None => {
173                let tmp_dir = Arc::new(tempfile::tempdir()?);
174                PathProvider::TemporaryDirectory { tmp_dir }
175            }
176            Some(path) => {
177                let path = Path::new(path);
178                let path_buf = path.to_path_buf();
179                PathProvider::ExternalPath { path_buf }
180            }
181        })
182    }
183}
184
185/// The information needed to start a [`LocalNet`].
186pub struct LocalNetConfig {
187    pub database: Database,
188    pub network: NetworkConfig,
189    pub testing_prng_seed: Option<u64>,
190    pub namespace: String,
191    pub num_other_initial_chains: u32,
192    pub initial_amount: Amount,
193    pub num_initial_validators: usize,
194    pub num_shards: usize,
195    pub num_proxies: usize,
196    pub policy_config: ResourceControlPolicyConfig,
197    pub http_request_allow_list: Option<Vec<String>>,
198    pub cross_chain_config: CrossChainConfig,
199    pub storage_config_builder: InnerStorageConfigBuilder,
200    pub path_provider: PathProvider,
201    pub block_exporters: ExportersSetup,
202    /// Optional directory where the `linera`, `linera-proxy`, and `linera-server` binaries
203    /// are located. If `None`, binaries are resolved from the current binary's directory.
204    pub binary_dir: Option<PathBuf>,
205}
206
207/// The setup for the block exporters.
208#[derive(Clone, PartialEq)]
209pub enum ExportersSetup {
210    // Block exporters are meant to be started and managed by the testing framework.
211    Local(Vec<BlockExporterConfig>),
212    // Block exporters are already started and we just need to connect to them.
213    Remote(Vec<ExporterServiceConfig>),
214}
215
216impl ExportersSetup {
217    pub fn new(
218        with_block_exporter: bool,
219        block_exporter_address: String,
220        block_exporter_port: NonZeroU16,
221    ) -> ExportersSetup {
222        if with_block_exporter {
223            let exporter_config =
224                ExporterServiceConfig::new(block_exporter_address, block_exporter_port.into());
225            ExportersSetup::Remote(vec![exporter_config])
226        } else {
227            ExportersSetup::Local(vec![])
228        }
229    }
230}
231
232/// A set of Linera validators running locally as native processes.
233pub struct LocalNet {
234    network: NetworkConfig,
235    testing_prng_seed: Option<u64>,
236    next_client_id: usize,
237    num_initial_validators: usize,
238    num_proxies: usize,
239    num_shards: usize,
240    validator_keys: BTreeMap<usize, (String, String)>,
241    running_validators: BTreeMap<usize, Validator>,
242    initialized_validator_storages: BTreeMap<usize, StorageConfig>,
243    common_namespace: String,
244    common_storage_config: InnerStorageConfig,
245    cross_chain_config: CrossChainConfig,
246    path_provider: PathProvider,
247    block_exporters: ExportersSetup,
248    binary_dir: Option<PathBuf>,
249}
250
251/// The name of the environment variable that allows specifying additional arguments to be passed
252/// to the binary when starting a server.
253const SERVER_ENV: &str = "LINERA_SERVER_PARAMS";
254
255/// Description of the database engine to use inside a local Linera network.
256#[derive(Copy, Clone, Eq, PartialEq)]
257pub enum Database {
258    Service,
259    DynamoDb,
260    ScyllaDb,
261    DualRocksDbScyllaDb,
262}
263
264/// The processes of a running validator.
265struct Validator {
266    proxies: Vec<Child>,
267    servers: Vec<Child>,
268    exporters: Vec<Child>,
269}
270
271impl Validator {
272    fn new() -> Self {
273        Self {
274            proxies: vec![],
275            servers: vec![],
276            exporters: vec![],
277        }
278    }
279
280    async fn terminate(&mut self) -> Result<()> {
281        for proxy in &mut self.proxies {
282            proxy.kill().await.context("terminating validator proxy")?;
283        }
284        for server in &mut self.servers {
285            server
286                .kill()
287                .await
288                .context("terminating validator server")?;
289        }
290        Ok(())
291    }
292
293    fn add_proxy(&mut self, proxy: Child) {
294        self.proxies.push(proxy)
295    }
296
297    fn add_server(&mut self, server: Child) {
298        self.servers.push(server)
299    }
300
301    #[cfg(with_testing)]
302    async fn terminate_server(&mut self, index: usize) -> Result<()> {
303        let mut server = self.servers.remove(index);
304        server
305            .kill()
306            .await
307            .context("terminating validator server")?;
308        Ok(())
309    }
310
311    fn add_block_exporter(&mut self, exporter: Child) {
312        self.exporters.push(exporter);
313    }
314
315    fn ensure_is_running(&mut self) -> Result<()> {
316        for proxy in &mut self.proxies {
317            proxy.ensure_is_running()?;
318        }
319        for child in &mut self.servers {
320            child.ensure_is_running()?;
321        }
322        for exporter in &mut self.exporters {
323            exporter.ensure_is_running()?;
324        }
325        Ok(())
326    }
327}
328
329#[cfg(with_testing)]
330impl LocalNetConfig {
331    pub fn new_test(database: Database, network: Network) -> Self {
332        let num_shards = 4;
333        let num_proxies = 1;
334        let storage_config_builder = InnerStorageConfigBuilder::TestConfig;
335        let path_provider = PathProvider::create_temporary_directory().unwrap();
336        let internal = network.drop_tls();
337        let external = network;
338        let network = NetworkConfig { internal, external };
339        let cross_chain_config = CrossChainConfig::default();
340        Self {
341            database,
342            network,
343            num_other_initial_chains: 2,
344            initial_amount: Amount::from_tokens(1_000_000),
345            policy_config: ResourceControlPolicyConfig::Testnet,
346            cross_chain_config,
347            testing_prng_seed: Some(37),
348            namespace: linera_views::random::generate_test_namespace(),
349            num_initial_validators: 4,
350            num_shards,
351            num_proxies,
352            storage_config_builder,
353            path_provider,
354            block_exporters: ExportersSetup::Local(vec![]),
355            http_request_allow_list: Some(vec!["localhost".to_string()]),
356            binary_dir: None,
357        }
358    }
359}
360
361#[async_trait]
362impl LineraNetConfig for LocalNetConfig {
363    type Net = LocalNet;
364
365    async fn instantiate(self) -> Result<(Self::Net, ClientWrapper)> {
366        let storage_config = self.storage_config_builder.build(self.database).await?;
367        let mut net = LocalNet::new(
368            self.network,
369            self.testing_prng_seed,
370            self.namespace,
371            self.num_initial_validators,
372            self.num_proxies,
373            self.num_shards,
374            storage_config,
375            self.cross_chain_config,
376            self.path_provider,
377            self.block_exporters,
378            self.binary_dir,
379        );
380        let client = net.make_client().await;
381        ensure!(
382            self.num_initial_validators > 0,
383            "There should be at least one initial validator"
384        );
385        let total_number_shards = self.num_initial_validators * self.num_shards;
386        ensure!(
387            total_number_shards <= MAX_NUMBER_SHARDS,
388            "Total number of shards ({}) exceeds maximum allowed ({})",
389            self.num_shards,
390            MAX_NUMBER_SHARDS
391        );
392        net.generate_initial_validator_config().await?;
393        client
394            .create_genesis_config(
395                self.num_other_initial_chains,
396                self.initial_amount,
397                self.policy_config,
398                self.http_request_allow_list
399                    .clone()
400                    .or_else(|| Some(vec!["localhost".to_owned()])),
401            )
402            .await?;
403        net.run().await?;
404        Ok((net, client))
405    }
406}
407
408#[async_trait]
409impl LineraNet for LocalNet {
410    async fn ensure_is_running(&mut self) -> Result<()> {
411        for validator in self.running_validators.values_mut() {
412            validator.ensure_is_running().context("in local network")?;
413        }
414        Ok(())
415    }
416
417    async fn make_client(&mut self) -> ClientWrapper {
418        let client = ClientWrapper::new_with_extra_args(
419            self.path_provider.clone(),
420            self.network.external,
421            self.testing_prng_seed,
422            self.next_client_id,
423            OnClientDrop::LeakChains,
424            vec!["--wait-for-outgoing-messages".to_string()],
425            self.binary_dir.clone(),
426        );
427        if let Some(seed) = self.testing_prng_seed {
428            self.testing_prng_seed = Some(seed + 1);
429        }
430        self.next_client_id += 1;
431        client
432    }
433
434    async fn terminate(&mut self) -> Result<()> {
435        for validator in self.running_validators.values_mut() {
436            validator.terminate().await.context("in local network")?
437        }
438        Ok(())
439    }
440}
441
442impl LocalNet {
443    #[expect(clippy::too_many_arguments)]
444    fn new(
445        network: NetworkConfig,
446        testing_prng_seed: Option<u64>,
447        common_namespace: String,
448        num_initial_validators: usize,
449        num_proxies: usize,
450        num_shards: usize,
451        common_storage_config: InnerStorageConfig,
452        cross_chain_config: CrossChainConfig,
453        path_provider: PathProvider,
454        block_exporters: ExportersSetup,
455        binary_dir: Option<PathBuf>,
456    ) -> Self {
457        Self {
458            network,
459            testing_prng_seed,
460            next_client_id: 0,
461            num_initial_validators,
462            num_proxies,
463            num_shards,
464            validator_keys: BTreeMap::new(),
465            running_validators: BTreeMap::new(),
466            initialized_validator_storages: BTreeMap::new(),
467            common_namespace,
468            common_storage_config,
469            cross_chain_config,
470            path_provider,
471            block_exporters,
472            binary_dir,
473        }
474    }
475
476    async fn command_for_binary(&self, name: &'static str) -> Result<Command> {
477        let path = if let Some(dir) = &self.binary_dir {
478            dir.join(name)
479        } else {
480            resolve_binary(name, env!("CARGO_PKG_NAME")).await?
481        };
482        let mut command = Command::new(path);
483        command.current_dir(self.path_provider.path());
484        Ok(command)
485    }
486
487    #[cfg(with_testing)]
488    pub fn genesis_config(&self) -> Result<linera_client::config::GenesisConfig> {
489        let path = self.path_provider.path();
490        crate::util::read_json(path.join("genesis.json"))
491    }
492
493    fn shard_port(&self, validator: usize, shard: usize) -> usize {
494        test_offset_port() + validator * self.num_shards + shard + 1
495    }
496
497    fn proxy_internal_port(&self, validator: usize, proxy_id: usize) -> usize {
498        test_offset_port() + 1000 + validator * self.num_proxies + proxy_id + 1
499    }
500
501    fn shard_metrics_port(&self, validator: usize, shard: usize) -> usize {
502        test_offset_port() + 2000 + validator * self.num_shards + shard + 1
503    }
504
505    fn proxy_metrics_port(&self, validator: usize, proxy_id: usize) -> usize {
506        test_offset_port() + 3000 + validator * self.num_proxies + proxy_id + 1
507    }
508
509    fn block_exporter_port(&self, validator: usize, exporter_id: usize) -> usize {
510        test_offset_port() + 3000 + validator * self.num_shards + exporter_id + 1
511    }
512
513    pub fn proxy_public_port(&self, validator: usize, proxy_id: usize) -> usize {
514        test_offset_port() + 4000 + validator * self.num_proxies + proxy_id + 1
515    }
516
517    pub fn first_public_port() -> usize {
518        test_offset_port() + 4000 + 1
519    }
520
521    fn block_exporter_metrics_port(exporter_id: usize) -> usize {
522        test_offset_port() + 4000 + exporter_id + 1
523    }
524
525    fn configuration_string(&self, server_number: usize) -> Result<String> {
526        let n = server_number;
527        let path = self
528            .path_provider
529            .path()
530            .join(format!("validator_{n}.toml"));
531        let port = self.proxy_public_port(n, 0);
532        let external_protocol = self.network.external.toml();
533        let internal_protocol = self.network.internal.toml();
534        let external_host = self.network.external.localhost();
535        let internal_host = self.network.internal.localhost();
536        let mut content = format!(
537            r#"
538                server_config_path = "server_{n}.json"
539                host = "{external_host}"
540                port = {port}
541                external_protocol = {external_protocol}
542                internal_protocol = {internal_protocol}
543            "#
544        );
545
546        for k in 0..self.num_proxies {
547            let public_port = self.proxy_public_port(n, k);
548            let internal_port = self.proxy_internal_port(n, k);
549            let metrics_port = self.proxy_metrics_port(n, k);
550            // In the local network, the validator ingress is
551            // the proxy - so the `public_port` is the validator
552            // port.
553            content.push_str(&format!(
554                r#"
555                [[proxies]]
556                host = "{internal_host}"
557                public_port = {public_port}
558                private_port = {internal_port}
559                metrics_port = {metrics_port}
560                "#
561            ));
562        }
563
564        for k in 0..self.num_shards {
565            let shard_port = self.shard_port(n, k);
566            let shard_metrics_port = self.shard_metrics_port(n, k);
567            content.push_str(&format!(
568                r#"
569
570                [[shards]]
571                host = "{internal_host}"
572                port = {shard_port}
573                metrics_port = {shard_metrics_port}
574                "#
575            ));
576        }
577
578        match self.block_exporters {
579            ExportersSetup::Local(ref exporters) => {
580                for (j, exporter) in exporters.iter().enumerate() {
581                    let host = Network::Grpc.localhost();
582                    let port = self.block_exporter_port(n, j);
583                    let config_content = format!(
584                        r#"
585
586                        [[block_exporters]]
587                        host = "{host}"
588                        port = {port}
589                        "#
590                    );
591
592                    content.push_str(&config_content);
593                    let exporter_config = self.generate_block_exporter_config(
594                        n,
595                        j as u32,
596                        &exporter.destination_config,
597                    );
598                    let config_path = self
599                        .path_provider
600                        .path()
601                        .join(format!("exporter_config_{n}:{j}.toml"));
602
603                    fs_err::write(&config_path, &exporter_config)?;
604                }
605            }
606            ExportersSetup::Remote(ref exporters) => {
607                for exporter in exporters {
608                    let host = exporter.host.clone();
609                    let port = exporter.port;
610                    let config_content = format!(
611                        r#"
612
613                        [[block_exporters]]
614                        host = "{host}"
615                        port = {port}
616                        "#
617                    );
618
619                    content.push_str(&config_content);
620                }
621            }
622        }
623
624        fs_err::write(&path, content)?;
625        path.into_os_string().into_string().map_err(|error| {
626            anyhow!(
627                "could not parse OS string into string: {}",
628                error.to_string_lossy()
629            )
630        })
631    }
632
633    fn generate_block_exporter_config(
634        &self,
635        validator: usize,
636        exporter_id: u32,
637        destination_config: &DestinationConfig,
638    ) -> String {
639        let n = validator;
640        let host = Network::Grpc.localhost();
641        let port = self.block_exporter_port(n, exporter_id as usize);
642        let metrics_port = Self::block_exporter_metrics_port(exporter_id as usize);
643        let mut config = format!(
644            r#"
645            id = {exporter_id}
646
647            metrics_port = {metrics_port}
648
649            [service_config]
650            host = "{host}"
651            port = {port}
652
653            "#
654        );
655
656        let DestinationConfig {
657            destinations,
658            committee_destination,
659        } = destination_config;
660
661        if *committee_destination {
662            let destination_string_to_push = r#"
663
664            [destination_config]
665            committee_destination = true
666            "#
667            .to_string();
668
669            config.push_str(&destination_string_to_push);
670        }
671
672        for destination in destinations {
673            let destination_string_to_push = match destination {
674                Destination::Indexer {
675                    tls,
676                    endpoint,
677                    port,
678                } => {
679                    let tls = match tls {
680                        TlsConfig::ClearText => "ClearText",
681                        TlsConfig::Tls => "Tls",
682                    };
683                    format!(
684                        r#"
685                        [[destination_config.destinations]]
686                        tls = "{tls}"
687                        endpoint = "{endpoint}"
688                        port = {port}
689                        kind = "Indexer"
690                        "#
691                    )
692                }
693                Destination::Validator { endpoint, port } => {
694                    format!(
695                        r#"
696                        [[destination_config.destinations]]
697                        endpoint = "{endpoint}"
698                        port = {port}
699                        kind = "Validator"
700                        "#
701                    )
702                }
703                Destination::Logging { file_name } => {
704                    format!(
705                        r#"
706                        [[destination_config.destinations]]
707                        file_name = "{file_name}"
708                        kind = "Logging"
709                        "#
710                    )
711                }
712            };
713
714            config.push_str(&destination_string_to_push);
715        }
716
717        config
718    }
719
720    async fn generate_initial_validator_config(&mut self) -> Result<()> {
721        let mut command = self.command_for_binary("linera-server").await?;
722        command.arg("generate");
723        if let Some(seed) = self.testing_prng_seed {
724            command.arg("--testing-prng-seed").arg(seed.to_string());
725            self.testing_prng_seed = Some(seed + 1);
726        }
727        command.arg("--validators");
728        for i in 0..self.num_initial_validators {
729            command.arg(&self.configuration_string(i)?);
730        }
731        let output = command
732            .args(["--committee", "committee.json"])
733            .spawn_and_wait_for_stdout()
734            .await?;
735        self.validator_keys = output
736            .split_whitespace()
737            .map(str::to_string)
738            .map(|keys| keys.split(',').map(str::to_string).collect::<Vec<_>>())
739            .enumerate()
740            .map(|(i, keys)| {
741                let validator_key = keys[0].to_string();
742                let account_key = keys[1].to_string();
743                (i, (validator_key, account_key))
744            })
745            .collect();
746        Ok(())
747    }
748
749    async fn run_proxy(&self, validator: usize, proxy_id: usize) -> Result<Child> {
750        let storage = self
751            .initialized_validator_storages
752            .get(&validator)
753            .expect("initialized storage");
754        let child = self
755            .command_for_binary("linera-proxy")
756            .await?
757            .arg(format!("server_{}.json", validator))
758            .args(["--storage", &storage.to_string()])
759            .args(["--id", &proxy_id.to_string()])
760            .spawn_into()?;
761
762        let port = self.proxy_public_port(validator, proxy_id);
763        let nickname = format!("validator proxy {validator}");
764        match self.network.external {
765            Network::Grpc => {
766                Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
767                let nickname = format!("validator proxy {validator}");
768                Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
769            }
770            Network::Grpcs => {
771                let nickname = format!("validator proxy {validator}");
772                Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
773            }
774            Network::Tcp => {
775                Self::ensure_simple_server_has_started(&nickname, port, "tcp").await?;
776            }
777            Network::Udp => {
778                Self::ensure_simple_server_has_started(&nickname, port, "udp").await?;
779            }
780        }
781        Ok(child)
782    }
783
784    async fn run_exporter(&self, validator: usize, exporter_id: u32) -> Result<Child> {
785        let config_path = format!("exporter_config_{validator}:{exporter_id}.toml");
786        let storage = self
787            .initialized_validator_storages
788            .get(&validator)
789            .expect("initialized storage");
790
791        tracing::debug!(config=?config_path, storage=?storage.to_string(), "starting block exporter");
792
793        let child = self
794            .command_for_binary("linera-exporter")
795            .await?
796            .args(["run", "--config-path", &config_path])
797            .args(["--storage", &storage.to_string()])
798            .spawn_into()?;
799
800        match self.network.internal {
801            Network::Grpc => {
802                let port = self.block_exporter_port(validator, exporter_id as usize);
803                let nickname = format!("block exporter {validator}:{exporter_id}");
804                Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
805            }
806            Network::Grpcs => {
807                let port = self.block_exporter_port(validator, exporter_id as usize);
808                let nickname = format!("block exporter  {validator}:{exporter_id}");
809                Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
810            }
811            Network::Tcp | Network::Udp => {
812                unreachable!("Only allowed options are grpc and grpcs")
813            }
814        }
815
816        tracing::info!("block exporter started {validator}:{exporter_id}");
817
818        Ok(child)
819    }
820
821    pub async fn ensure_grpc_server_has_started(
822        nickname: &str,
823        port: usize,
824        scheme: &str,
825    ) -> Result<()> {
826        let endpoint = match scheme {
827            "http" => Endpoint::new(format!("http://localhost:{port}"))
828                .context("endpoint should always parse")?,
829            "https" => {
830                use linera_rpc::CERT_PEM;
831                let certificate = tonic::transport::Certificate::from_pem(CERT_PEM);
832                let tls_config = ClientTlsConfig::new().ca_certificate(certificate);
833                Endpoint::new(format!("https://localhost:{port}"))
834                    .context("endpoint should always parse")?
835                    .tls_config(tls_config)?
836            }
837            _ => bail!("Only supported scheme are http and https"),
838        };
839        let connection = endpoint.connect_lazy();
840        let mut client = HealthClient::new(connection);
841        linera_base::time::timer::sleep(Duration::from_millis(100)).await;
842        for i in 0..10 {
843            linera_base::time::timer::sleep(Duration::from_millis(i * 500)).await;
844            let result = client.check(HealthCheckRequest::default()).await;
845            if result.is_ok() && result.unwrap().get_ref().status() == ServingStatus::Serving {
846                info!(?port, "Successfully started {nickname}");
847                return Ok(());
848            } else {
849                warn!("Waiting for {nickname} to start");
850            }
851        }
852        bail!("Failed to start {nickname}");
853    }
854
855    async fn ensure_simple_server_has_started(
856        nickname: &str,
857        port: usize,
858        protocol: &str,
859    ) -> Result<()> {
860        use linera_core::node::ValidatorNode as _;
861
862        let options = linera_rpc::NodeOptions {
863            send_timeout: Duration::from_secs(5),
864            recv_timeout: Duration::from_secs(5),
865            retry_delay: Duration::from_secs(1),
866            max_retries: 1,
867            ..Default::default()
868        };
869        let provider = linera_rpc::simple::SimpleNodeProvider::new(options);
870        let address = format!("{protocol}:127.0.0.1:{port}");
871        // All "simple" services (i.e. proxy and "server") are based on `RpcMessage` and
872        // support `VersionInfoQuery`.
873        let node = provider.make_node(&address)?;
874        linera_base::time::timer::sleep(Duration::from_millis(100)).await;
875        for i in 0..10 {
876            linera_base::time::timer::sleep(Duration::from_millis(i * 500)).await;
877            let result = node.get_version_info().await;
878            if result.is_ok() {
879                info!("Successfully started {nickname}");
880                return Ok(());
881            } else {
882                warn!("Waiting for {nickname} to start");
883            }
884        }
885        bail!("Failed to start {nickname}");
886    }
887
888    async fn initialize_storage(&mut self, validator: usize) -> Result<()> {
889        let namespace = format!("{}_server_{}_db", self.common_namespace, validator);
890        let inner_storage_config = self.common_storage_config.clone();
891        let storage = StorageConfig {
892            inner_storage_config,
893            namespace,
894        };
895        let mut command = self.command_for_binary("linera").await?;
896        if let Ok(var) = env::var(SERVER_ENV) {
897            command.args(var.split_whitespace());
898        }
899        command.args(["storage", "initialize"]);
900        command
901            .args(["--storage", &storage.to_string()])
902            .args(["--genesis", "genesis.json"])
903            .spawn_and_wait_for_stdout()
904            .await?;
905
906        self.initialized_validator_storages
907            .insert(validator, storage);
908        Ok(())
909    }
910
911    async fn run_server(&self, validator: usize, shard: usize) -> Result<Child> {
912        let mut storage = self
913            .initialized_validator_storages
914            .get(&validator)
915            .expect("initialized storage")
916            .clone();
917
918        // For the storage backends with a local directory, make sure that we don't reuse
919        // the same directory for all the shards.
920        storage.maybe_append_shard_path(shard)?;
921
922        let mut command = self.command_for_binary("linera-server").await?;
923        if let Ok(var) = env::var(SERVER_ENV) {
924            command.args(var.split_whitespace());
925        }
926        command
927            .arg("run")
928            .args(["--storage", &storage.to_string()])
929            .args(["--server", &format!("server_{}.json", validator)])
930            .args(["--shard", &shard.to_string()])
931            .args(self.cross_chain_config.to_args());
932        let child = command.spawn_into()?;
933
934        let port = self.shard_port(validator, shard);
935        let nickname = format!("validator server {validator}:{shard}");
936        match self.network.internal {
937            Network::Grpc => {
938                Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
939            }
940            Network::Grpcs => {
941                Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
942            }
943            Network::Tcp => {
944                Self::ensure_simple_server_has_started(&nickname, port, "tcp").await?;
945            }
946            Network::Udp => {
947                Self::ensure_simple_server_has_started(&nickname, port, "udp").await?;
948            }
949        }
950        Ok(child)
951    }
952
953    async fn run(&mut self) -> Result<()> {
954        for validator in 0..self.num_initial_validators {
955            self.start_validator(validator).await?;
956        }
957        Ok(())
958    }
959
960    /// Start a validator.
961    pub async fn start_validator(&mut self, index: usize) -> Result<()> {
962        self.initialize_storage(index).await?;
963        self.restart_validator(index).await
964    }
965
966    /// Restart a validator. This is similar to `start_validator` except that the
967    /// database was already initialized once.
968    pub async fn restart_validator(&mut self, index: usize) -> Result<()> {
969        let mut validator = Validator::new();
970        for k in 0..self.num_proxies {
971            let proxy = self.run_proxy(index, k).await?;
972            validator.add_proxy(proxy);
973        }
974        for shard in 0..self.num_shards {
975            let server = self.run_server(index, shard).await?;
976            validator.add_server(server);
977        }
978        if let ExportersSetup::Local(ref exporters) = self.block_exporters {
979            for block_exporter in 0..exporters.len() {
980                let exporter = self.run_exporter(index, block_exporter as u32).await?;
981                validator.add_block_exporter(exporter);
982            }
983        }
984
985        self.running_validators.insert(index, validator);
986        Ok(())
987    }
988
989    /// Terminates all the processes of a given validator.
990    pub async fn stop_validator(&mut self, index: usize) -> Result<()> {
991        if let Some(mut validator) = self.running_validators.remove(&index) {
992            if let Err(error) = validator.terminate().await {
993                error!("Failed to stop validator {index}: {error}");
994                return Err(error);
995            }
996        }
997        Ok(())
998    }
999
1000    /// Returns a [`linera_rpc::Client`] to interact directly with a `validator`.
1001    pub fn validator_client(&mut self, validator: usize) -> Result<linera_rpc::Client> {
1002        let node_provider = linera_rpc::NodeProvider::new(linera_rpc::NodeOptions {
1003            send_timeout: Duration::from_secs(1),
1004            recv_timeout: Duration::from_secs(1),
1005            retry_delay: Duration::ZERO,
1006            max_retries: 0,
1007            ..Default::default()
1008        });
1009
1010        Ok(node_provider.make_node(&self.validator_address(validator))?)
1011    }
1012
1013    /// Returns the address to connect to a validator's proxy.
1014    /// In local networks, the zeroth proxy _is_ the validator ingress.
1015    pub fn validator_address(&self, validator: usize) -> String {
1016        let port = self.proxy_public_port(validator, 0);
1017        let schema = self.network.external.schema();
1018
1019        format!("{schema}:localhost:{port}")
1020    }
1021}
1022
1023#[cfg(with_testing)]
1024impl LocalNet {
1025    /// Returns the validating key and an account key of the validator.
1026    pub fn validator_keys(&self, validator: usize) -> Option<&(String, String)> {
1027        self.validator_keys.get(&validator)
1028    }
1029
1030    pub async fn generate_validator_config(&mut self, validator: usize) -> Result<()> {
1031        let stdout = self
1032            .command_for_binary("linera-server")
1033            .await?
1034            .arg("generate")
1035            .arg("--validators")
1036            .arg(&self.configuration_string(validator)?)
1037            .spawn_and_wait_for_stdout()
1038            .await?;
1039        let keys = stdout
1040            .trim()
1041            .split(',')
1042            .map(str::to_string)
1043            .collect::<Vec<_>>();
1044        self.validator_keys
1045            .insert(validator, (keys[0].clone(), keys[1].clone()));
1046        Ok(())
1047    }
1048
1049    pub async fn terminate_server(&mut self, validator: usize, shard: usize) -> Result<()> {
1050        self.running_validators
1051            .get_mut(&validator)
1052            .context("server not found")?
1053            .terminate_server(shard)
1054            .await?;
1055        Ok(())
1056    }
1057
1058    pub fn remove_validator(&mut self, validator: usize) -> Result<()> {
1059        self.running_validators
1060            .remove(&validator)
1061            .context("validator not found")?;
1062        Ok(())
1063    }
1064
1065    pub async fn start_server(&mut self, validator: usize, shard: usize) -> Result<()> {
1066        let server = self.run_server(validator, shard).await?;
1067        self.running_validators
1068            .get_mut(&validator)
1069            .context("could not find validator")?
1070            .add_server(server);
1071        Ok(())
1072    }
1073}