linera_service/cli_wrappers/
local_net.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4#[cfg(with_testing)]
5use std::sync::LazyLock;
6use std::{
7    collections::BTreeMap,
8    env,
9    num::NonZeroU16,
10    path::{Path, PathBuf},
11    sync::Arc,
12    time::Duration,
13};
14
15use anyhow::{anyhow, bail, ensure, Context, Result};
16#[cfg(with_testing)]
17use async_lock::RwLock;
18use async_trait::async_trait;
19use linera_base::{
20    command::{resolve_binary, CommandExt},
21    data_types::Amount,
22};
23use linera_client::client_options::ResourceControlPolicyConfig;
24use linera_core::node::ValidatorNodeProvider;
25use linera_rpc::config::{CrossChainConfig, ExporterServiceConfig, TlsConfig};
26#[cfg(all(feature = "storage-service", with_testing))]
27use linera_storage_service::common::storage_service_test_endpoint;
28#[cfg(all(feature = "rocksdb", feature = "scylladb", with_testing))]
29use linera_views::rocks_db::{RocksDbDatabase, RocksDbSpawnMode};
30#[cfg(all(feature = "scylladb", with_testing))]
31use linera_views::{scylla_db::ScyllaDbDatabase, store::TestKeyValueDatabase as _};
32use tempfile::{tempdir, TempDir};
33use tokio::process::{Child, Command};
34use tonic::transport::{channel::ClientTlsConfig, Endpoint};
35use tonic_health::pb::{
36    health_check_response::ServingStatus, health_client::HealthClient, HealthCheckRequest,
37};
38use tracing::{error, info, warn};
39
40use crate::{
41    cli_wrappers::{
42        ClientWrapper, LineraNet, LineraNetConfig, Network, NetworkConfig, OnClientDrop,
43    },
44    config::{BlockExporterConfig, Destination, DestinationConfig},
45    storage::{InnerStorageConfig, StorageConfig},
46    util::ChildExt,
47};
48
49/// Maximum allowed number of shards over all validators.
50const MAX_NUMBER_SHARDS: usize = 1000;
51
52pub enum ProcessInbox {
53    Skip,
54    Automatic,
55}
56
57#[cfg(with_testing)]
58static PORT_PROVIDER: LazyLock<RwLock<u16>> = LazyLock::new(|| RwLock::new(7080));
59
60/// The offset of the port
61fn test_offset_port() -> usize {
62    std::env::var("TEST_OFFSET_PORT")
63        .ok()
64        .and_then(|port_str| port_str.parse::<usize>().ok())
65        .unwrap_or(9000)
66}
67
68/// Provides a port for the node service. Increment the port numbers.
69#[cfg(with_testing)]
70pub async fn get_node_port() -> u16 {
71    let mut port = PORT_PROVIDER.write().await;
72    let port_ret = *port;
73    *port += 1;
74    info!("get_node_port returning port_ret={}", port_ret);
75    assert!(port_selector::is_free(port_ret));
76    port_ret
77}
78
79#[cfg(with_testing)]
80async fn make_testing_config(database: Database) -> Result<InnerStorageConfig> {
81    match database {
82        Database::Service => {
83            #[cfg(feature = "storage-service")]
84            {
85                let endpoint = storage_service_test_endpoint()
86                    .expect("Reading LINERA_STORAGE_SERVICE environment variable");
87                Ok(InnerStorageConfig::Service { endpoint })
88            }
89            #[cfg(not(feature = "storage-service"))]
90            panic!("Database::Service is selected without the feature storage_service");
91        }
92        Database::DynamoDb => {
93            #[cfg(feature = "dynamodb")]
94            {
95                let use_dynamodb_local = true;
96                Ok(InnerStorageConfig::DynamoDb { use_dynamodb_local })
97            }
98            #[cfg(not(feature = "dynamodb"))]
99            panic!("Database::DynamoDb is selected without the feature dynamodb");
100        }
101        Database::ScyllaDb => {
102            #[cfg(feature = "scylladb")]
103            {
104                let config = ScyllaDbDatabase::new_test_config().await?;
105                Ok(InnerStorageConfig::ScyllaDb {
106                    uri: config.inner_config.uri,
107                })
108            }
109            #[cfg(not(feature = "scylladb"))]
110            panic!("Database::ScyllaDb is selected without the feature scylladb");
111        }
112        Database::DualRocksDbScyllaDb => {
113            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
114            {
115                let rocksdb_config = RocksDbDatabase::new_test_config().await?;
116                let scylla_config = ScyllaDbDatabase::new_test_config().await?;
117                let spawn_mode = RocksDbSpawnMode::get_spawn_mode_from_runtime();
118                Ok(InnerStorageConfig::DualRocksDbScyllaDb {
119                    path_with_guard: rocksdb_config.inner_config.path_with_guard,
120                    spawn_mode,
121                    uri: scylla_config.inner_config.uri,
122                })
123            }
124            #[cfg(not(all(feature = "rocksdb", feature = "scylladb")))]
125            panic!("Database::DualRocksDbScyllaDb is selected without the features rocksdb and scylladb");
126        }
127    }
128}
129
130pub enum InnerStorageConfigBuilder {
131    #[cfg(with_testing)]
132    TestConfig,
133    ExistingConfig {
134        storage_config: InnerStorageConfig,
135    },
136}
137
138impl InnerStorageConfigBuilder {
139    #[cfg_attr(not(with_testing), expect(unused_variables))]
140    pub async fn build(self, database: Database) -> Result<InnerStorageConfig> {
141        match self {
142            #[cfg(with_testing)]
143            InnerStorageConfigBuilder::TestConfig => make_testing_config(database).await,
144            InnerStorageConfigBuilder::ExistingConfig { storage_config } => Ok(storage_config),
145        }
146    }
147}
148
149/// Path used for the run can come from a path whose lifetime is controlled
150/// by an external user or as a temporary directory
151#[derive(Clone)]
152pub enum PathProvider {
153    ExternalPath { path_buf: PathBuf },
154    TemporaryDirectory { tmp_dir: Arc<TempDir> },
155}
156
157impl PathProvider {
158    pub fn path(&self) -> &Path {
159        match self {
160            PathProvider::ExternalPath { path_buf } => path_buf.as_path(),
161            PathProvider::TemporaryDirectory { tmp_dir } => tmp_dir.path(),
162        }
163    }
164
165    pub fn create_temporary_directory() -> Result<Self> {
166        let tmp_dir = Arc::new(tempdir()?);
167        Ok(PathProvider::TemporaryDirectory { tmp_dir })
168    }
169
170    pub fn from_path_option(path: &Option<String>) -> anyhow::Result<Self> {
171        Ok(match path {
172            None => {
173                let tmp_dir = Arc::new(tempfile::tempdir()?);
174                PathProvider::TemporaryDirectory { tmp_dir }
175            }
176            Some(path) => {
177                let path = Path::new(path);
178                let path_buf = path.to_path_buf();
179                PathProvider::ExternalPath { path_buf }
180            }
181        })
182    }
183}
184
185/// The information needed to start a [`LocalNet`].
186pub struct LocalNetConfig {
187    pub database: Database,
188    pub network: NetworkConfig,
189    pub testing_prng_seed: Option<u64>,
190    pub namespace: String,
191    pub num_other_initial_chains: u32,
192    pub initial_amount: Amount,
193    pub num_initial_validators: usize,
194    pub num_shards: usize,
195    pub num_proxies: usize,
196    pub policy_config: ResourceControlPolicyConfig,
197    pub cross_chain_config: CrossChainConfig,
198    pub storage_config_builder: InnerStorageConfigBuilder,
199    pub path_provider: PathProvider,
200    pub block_exporters: ExportersSetup,
201}
202
203/// The setup for the block exporters.
204#[derive(Clone, PartialEq)]
205pub enum ExportersSetup {
206    // Block exporters are meant to be started and managed by the testing framework.
207    Local(Vec<BlockExporterConfig>),
208    // Block exporters are already started and we just need to connect to them.
209    Remote(Vec<ExporterServiceConfig>),
210}
211
212impl ExportersSetup {
213    pub fn new(
214        with_block_exporter: bool,
215        block_exporter_address: String,
216        block_exporter_port: NonZeroU16,
217    ) -> ExportersSetup {
218        if with_block_exporter {
219            let exporter_config =
220                ExporterServiceConfig::new(block_exporter_address, block_exporter_port.into());
221            ExportersSetup::Remote(vec![exporter_config])
222        } else {
223            ExportersSetup::Local(vec![])
224        }
225    }
226}
227
228/// A set of Linera validators running locally as native processes.
229pub struct LocalNet {
230    network: NetworkConfig,
231    testing_prng_seed: Option<u64>,
232    next_client_id: usize,
233    num_initial_validators: usize,
234    num_proxies: usize,
235    num_shards: usize,
236    validator_keys: BTreeMap<usize, (String, String)>,
237    running_validators: BTreeMap<usize, Validator>,
238    initialized_validator_storages: BTreeMap<usize, StorageConfig>,
239    common_namespace: String,
240    common_storage_config: InnerStorageConfig,
241    cross_chain_config: CrossChainConfig,
242    path_provider: PathProvider,
243    block_exporters: ExportersSetup,
244}
245
246/// The name of the environment variable that allows specifying additional arguments to be passed
247/// to the binary when starting a server.
248const SERVER_ENV: &str = "LINERA_SERVER_PARAMS";
249
250/// Description of the database engine to use inside a local Linera network.
251#[derive(Copy, Clone, Eq, PartialEq)]
252pub enum Database {
253    Service,
254    DynamoDb,
255    ScyllaDb,
256    DualRocksDbScyllaDb,
257}
258
259/// The processes of a running validator.
260struct Validator {
261    proxies: Vec<Child>,
262    servers: Vec<Child>,
263    exporters: Vec<Child>,
264}
265
266impl Validator {
267    fn new() -> Self {
268        Self {
269            proxies: vec![],
270            servers: vec![],
271            exporters: vec![],
272        }
273    }
274
275    async fn terminate(&mut self) -> Result<()> {
276        for proxy in &mut self.proxies {
277            proxy.kill().await.context("terminating validator proxy")?;
278        }
279        for server in &mut self.servers {
280            server
281                .kill()
282                .await
283                .context("terminating validator server")?;
284        }
285        Ok(())
286    }
287
288    fn add_proxy(&mut self, proxy: Child) {
289        self.proxies.push(proxy)
290    }
291
292    fn add_server(&mut self, server: Child) {
293        self.servers.push(server)
294    }
295
296    #[cfg(with_testing)]
297    async fn terminate_server(&mut self, index: usize) -> Result<()> {
298        let mut server = self.servers.remove(index);
299        server
300            .kill()
301            .await
302            .context("terminating validator server")?;
303        Ok(())
304    }
305
306    fn add_block_exporter(&mut self, exporter: Child) {
307        self.exporters.push(exporter);
308    }
309
310    fn ensure_is_running(&mut self) -> Result<()> {
311        for proxy in &mut self.proxies {
312            proxy.ensure_is_running()?;
313        }
314        for child in &mut self.servers {
315            child.ensure_is_running()?;
316        }
317        for exporter in &mut self.exporters {
318            exporter.ensure_is_running()?;
319        }
320        Ok(())
321    }
322}
323
324#[cfg(with_testing)]
325impl LocalNetConfig {
326    pub fn new_test(database: Database, network: Network) -> Self {
327        let num_shards = 4;
328        let num_proxies = 1;
329        let storage_config_builder = InnerStorageConfigBuilder::TestConfig;
330        let path_provider = PathProvider::create_temporary_directory().unwrap();
331        let internal = network.drop_tls();
332        let external = network;
333        let network = NetworkConfig { internal, external };
334        let cross_chain_config = CrossChainConfig::default();
335        Self {
336            database,
337            network,
338            num_other_initial_chains: 2,
339            initial_amount: Amount::from_tokens(1_000_000),
340            policy_config: ResourceControlPolicyConfig::Testnet,
341            cross_chain_config,
342            testing_prng_seed: Some(37),
343            namespace: linera_views::random::generate_test_namespace(),
344            num_initial_validators: 4,
345            num_shards,
346            num_proxies,
347            storage_config_builder,
348            path_provider,
349            block_exporters: ExportersSetup::Local(vec![]),
350        }
351    }
352}
353
354#[async_trait]
355impl LineraNetConfig for LocalNetConfig {
356    type Net = LocalNet;
357
358    async fn instantiate(self) -> Result<(Self::Net, ClientWrapper)> {
359        let storage_config = self.storage_config_builder.build(self.database).await?;
360        let mut net = LocalNet::new(
361            self.network,
362            self.testing_prng_seed,
363            self.namespace,
364            self.num_initial_validators,
365            self.num_proxies,
366            self.num_shards,
367            storage_config,
368            self.cross_chain_config,
369            self.path_provider,
370            self.block_exporters,
371        );
372        let client = net.make_client().await;
373        ensure!(
374            self.num_initial_validators > 0,
375            "There should be at least one initial validator"
376        );
377        let total_number_shards = self.num_initial_validators * self.num_shards;
378        ensure!(
379            total_number_shards <= MAX_NUMBER_SHARDS,
380            "Total number of shards ({}) exceeds maximum allowed ({})",
381            self.num_shards,
382            MAX_NUMBER_SHARDS
383        );
384        net.generate_initial_validator_config().await?;
385        client
386            .create_genesis_config(
387                self.num_other_initial_chains,
388                self.initial_amount,
389                self.policy_config,
390                Some(vec!["localhost".to_owned()]),
391            )
392            .await?;
393        net.run().await?;
394        Ok((net, client))
395    }
396}
397
398#[async_trait]
399impl LineraNet for LocalNet {
400    async fn ensure_is_running(&mut self) -> Result<()> {
401        for validator in self.running_validators.values_mut() {
402            validator.ensure_is_running().context("in local network")?;
403        }
404        Ok(())
405    }
406
407    async fn make_client(&mut self) -> ClientWrapper {
408        let client = ClientWrapper::new(
409            self.path_provider.clone(),
410            self.network.external,
411            self.testing_prng_seed,
412            self.next_client_id,
413            OnClientDrop::LeakChains,
414        );
415        if let Some(seed) = self.testing_prng_seed {
416            self.testing_prng_seed = Some(seed + 1);
417        }
418        self.next_client_id += 1;
419        client
420    }
421
422    async fn terminate(&mut self) -> Result<()> {
423        for validator in self.running_validators.values_mut() {
424            validator.terminate().await.context("in local network")?
425        }
426        Ok(())
427    }
428}
429
430impl LocalNet {
431    #[expect(clippy::too_many_arguments)]
432    fn new(
433        network: NetworkConfig,
434        testing_prng_seed: Option<u64>,
435        common_namespace: String,
436        num_initial_validators: usize,
437        num_proxies: usize,
438        num_shards: usize,
439        common_storage_config: InnerStorageConfig,
440        cross_chain_config: CrossChainConfig,
441        path_provider: PathProvider,
442        block_exporters: ExportersSetup,
443    ) -> Self {
444        Self {
445            network,
446            testing_prng_seed,
447            next_client_id: 0,
448            num_initial_validators,
449            num_proxies,
450            num_shards,
451            validator_keys: BTreeMap::new(),
452            running_validators: BTreeMap::new(),
453            initialized_validator_storages: BTreeMap::new(),
454            common_namespace,
455            common_storage_config,
456            cross_chain_config,
457            path_provider,
458            block_exporters,
459        }
460    }
461
462    async fn command_for_binary(&self, name: &'static str) -> Result<Command> {
463        let path = resolve_binary(name, env!("CARGO_PKG_NAME")).await?;
464        let mut command = Command::new(path);
465        command.current_dir(self.path_provider.path());
466        Ok(command)
467    }
468
469    #[cfg(with_testing)]
470    pub fn genesis_config(&self) -> Result<linera_client::config::GenesisConfig> {
471        let path = self.path_provider.path();
472        crate::util::read_json(path.join("genesis.json"))
473    }
474
475    fn shard_port(&self, validator: usize, shard: usize) -> usize {
476        test_offset_port() + validator * self.num_shards + shard + 1
477    }
478
479    fn proxy_internal_port(&self, validator: usize, proxy_id: usize) -> usize {
480        test_offset_port() + 1000 + validator * self.num_proxies + proxy_id + 1
481    }
482
483    fn shard_metrics_port(&self, validator: usize, shard: usize) -> usize {
484        test_offset_port() + 2000 + validator * self.num_shards + shard + 1
485    }
486
487    fn proxy_metrics_port(&self, validator: usize, proxy_id: usize) -> usize {
488        test_offset_port() + 3000 + validator * self.num_proxies + proxy_id + 1
489    }
490
491    fn block_exporter_port(&self, validator: usize, exporter_id: usize) -> usize {
492        test_offset_port() + 3000 + validator * self.num_shards + exporter_id + 1
493    }
494
495    pub fn proxy_public_port(&self, validator: usize, proxy_id: usize) -> usize {
496        test_offset_port() + 4000 + validator * self.num_proxies + proxy_id + 1
497    }
498
499    pub fn first_public_port() -> usize {
500        test_offset_port() + 4000 + 1
501    }
502
503    fn block_exporter_metrics_port(exporter_id: usize) -> usize {
504        test_offset_port() + 4000 + exporter_id + 1
505    }
506
507    fn configuration_string(&self, server_number: usize) -> Result<String> {
508        let n = server_number;
509        let path = self
510            .path_provider
511            .path()
512            .join(format!("validator_{n}.toml"));
513        let port = self.proxy_public_port(n, 0);
514        let external_protocol = self.network.external.toml();
515        let internal_protocol = self.network.internal.toml();
516        let external_host = self.network.external.localhost();
517        let internal_host = self.network.internal.localhost();
518        let mut content = format!(
519            r#"
520                server_config_path = "server_{n}.json"
521                host = "{external_host}"
522                port = {port}
523                external_protocol = {external_protocol}
524                internal_protocol = {internal_protocol}
525            "#
526        );
527
528        for k in 0..self.num_proxies {
529            let public_port = self.proxy_public_port(n, k);
530            let internal_port = self.proxy_internal_port(n, k);
531            let metrics_port = self.proxy_metrics_port(n, k);
532            // In the local network, the validator ingress is
533            // the proxy - so the `public_port` is the validator
534            // port.
535            content.push_str(&format!(
536                r#"
537                [[proxies]]
538                host = "{internal_host}"
539                public_port = {public_port}
540                private_port = {internal_port}
541                metrics_port = {metrics_port}
542                "#
543            ));
544        }
545
546        for k in 0..self.num_shards {
547            let shard_port = self.shard_port(n, k);
548            let shard_metrics_port = self.shard_metrics_port(n, k);
549            content.push_str(&format!(
550                r#"
551
552                [[shards]]
553                host = "{internal_host}"
554                port = {shard_port}
555                metrics_port = {shard_metrics_port}
556                "#
557            ));
558        }
559
560        match self.block_exporters {
561            ExportersSetup::Local(ref exporters) => {
562                for (j, exporter) in exporters.iter().enumerate() {
563                    let host = Network::Grpc.localhost();
564                    let port = self.block_exporter_port(n, j);
565                    let config_content = format!(
566                        r#"
567
568                        [[block_exporters]]
569                        host = "{host}"
570                        port = {port}
571                        "#
572                    );
573
574                    content.push_str(&config_content);
575                    let exporter_config = self.generate_block_exporter_config(
576                        n,
577                        j as u32,
578                        &exporter.destination_config,
579                    );
580                    let config_path = self
581                        .path_provider
582                        .path()
583                        .join(format!("exporter_config_{n}:{j}.toml"));
584
585                    fs_err::write(&config_path, &exporter_config)?;
586                }
587            }
588            ExportersSetup::Remote(ref exporters) => {
589                for exporter in exporters {
590                    let host = exporter.host.clone();
591                    let port = exporter.port;
592                    let config_content = format!(
593                        r#"
594
595                        [[block_exporters]]
596                        host = "{host}"
597                        port = {port}
598                        "#
599                    );
600
601                    content.push_str(&config_content);
602                }
603            }
604        }
605
606        fs_err::write(&path, content)?;
607        path.into_os_string().into_string().map_err(|error| {
608            anyhow!(
609                "could not parse OS string into string: {}",
610                error.to_string_lossy()
611            )
612        })
613    }
614
615    fn generate_block_exporter_config(
616        &self,
617        validator: usize,
618        exporter_id: u32,
619        destination_config: &DestinationConfig,
620    ) -> String {
621        let n = validator;
622        let host = Network::Grpc.localhost();
623        let port = self.block_exporter_port(n, exporter_id as usize);
624        let metrics_port = Self::block_exporter_metrics_port(exporter_id as usize);
625        let mut config = format!(
626            r#"
627            id = {exporter_id}
628
629            metrics_port = {metrics_port}
630
631            [service_config]
632            host = "{host}"
633            port = {port}
634
635            "#
636        );
637
638        let DestinationConfig {
639            destinations,
640            committee_destination,
641        } = destination_config;
642
643        if *committee_destination {
644            let destination_string_to_push = r#"
645
646            [destination_config]
647            committee_destination = true
648            "#
649            .to_string();
650
651            config.push_str(&destination_string_to_push);
652        }
653
654        for destination in destinations {
655            let destination_string_to_push = match destination {
656                Destination::Indexer {
657                    tls,
658                    endpoint,
659                    port,
660                } => {
661                    let tls = match tls {
662                        TlsConfig::ClearText => "ClearText",
663                        TlsConfig::Tls => "Tls",
664                    };
665                    format!(
666                        r#"
667                        [[destination_config.destinations]]
668                        tls = "{tls}"
669                        endpoint = "{endpoint}"
670                        port = {port}
671                        kind = "Indexer"
672                        "#
673                    )
674                }
675                Destination::Validator { endpoint, port } => {
676                    format!(
677                        r#"
678                        [[destination_config.destinations]]
679                        endpoint = "{endpoint}"
680                        port = {port}
681                        kind = "Validator"
682                        "#
683                    )
684                }
685                Destination::Logging { file_name } => {
686                    format!(
687                        r#"
688                        [[destination_config.destinations]]
689                        file_name = "{file_name}"
690                        kind = "Logging"
691                        "#
692                    )
693                }
694            };
695
696            config.push_str(&destination_string_to_push);
697        }
698
699        config
700    }
701
702    async fn generate_initial_validator_config(&mut self) -> Result<()> {
703        let mut command = self.command_for_binary("linera-server").await?;
704        command.arg("generate");
705        if let Some(seed) = self.testing_prng_seed {
706            command.arg("--testing-prng-seed").arg(seed.to_string());
707            self.testing_prng_seed = Some(seed + 1);
708        }
709        command.arg("--validators");
710        for i in 0..self.num_initial_validators {
711            command.arg(&self.configuration_string(i)?);
712        }
713        let output = command
714            .args(["--committee", "committee.json"])
715            .spawn_and_wait_for_stdout()
716            .await?;
717        self.validator_keys = output
718            .split_whitespace()
719            .map(str::to_string)
720            .map(|keys| keys.split(',').map(str::to_string).collect::<Vec<_>>())
721            .enumerate()
722            .map(|(i, keys)| {
723                let validator_key = keys[0].to_string();
724                let account_key = keys[1].to_string();
725                (i, (validator_key, account_key))
726            })
727            .collect();
728        Ok(())
729    }
730
731    async fn run_proxy(&mut self, validator: usize, proxy_id: usize) -> Result<Child> {
732        let storage = self
733            .initialized_validator_storages
734            .get(&validator)
735            .expect("initialized storage");
736        let child = self
737            .command_for_binary("linera-proxy")
738            .await?
739            .arg(format!("server_{}.json", validator))
740            .args(["--storage", &storage.to_string()])
741            .args(["--id", &proxy_id.to_string()])
742            .spawn_into()?;
743
744        let port = self.proxy_public_port(validator, proxy_id);
745        let nickname = format!("validator proxy {validator}");
746        match self.network.external {
747            Network::Grpc => {
748                Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
749                let nickname = format!("validator proxy {validator}");
750                Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
751            }
752            Network::Grpcs => {
753                let nickname = format!("validator proxy {validator}");
754                Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
755            }
756            Network::Tcp => {
757                Self::ensure_simple_server_has_started(&nickname, port, "tcp").await?;
758            }
759            Network::Udp => {
760                Self::ensure_simple_server_has_started(&nickname, port, "udp").await?;
761            }
762        }
763        Ok(child)
764    }
765
766    async fn run_exporter(&mut self, validator: usize, exporter_id: u32) -> Result<Child> {
767        let config_path = format!("exporter_config_{validator}:{exporter_id}.toml");
768        let storage = self
769            .initialized_validator_storages
770            .get(&validator)
771            .expect("initialized storage");
772
773        tracing::debug!(config=?config_path, storage=?storage.to_string(), "starting block exporter");
774
775        let child = self
776            .command_for_binary("linera-exporter")
777            .await?
778            .args(["--config-path", &config_path])
779            .args(["--storage", &storage.to_string()])
780            .spawn_into()?;
781
782        match self.network.internal {
783            Network::Grpc => {
784                let port = self.block_exporter_port(validator, exporter_id as usize);
785                let nickname = format!("block exporter {validator}:{exporter_id}");
786                Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
787            }
788            Network::Grpcs => {
789                let port = self.block_exporter_port(validator, exporter_id as usize);
790                let nickname = format!("block exporter  {validator}:{exporter_id}");
791                Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
792            }
793            Network::Tcp | Network::Udp => {
794                unreachable!("Only allowed options are grpc and grpcs")
795            }
796        }
797
798        tracing::info!("block exporter started {validator}:{exporter_id}");
799
800        Ok(child)
801    }
802
803    pub async fn ensure_grpc_server_has_started(
804        nickname: &str,
805        port: usize,
806        scheme: &str,
807    ) -> Result<()> {
808        let endpoint = match scheme {
809            "http" => Endpoint::new(format!("http://localhost:{port}"))
810                .context("endpoint should always parse")?,
811            "https" => {
812                use linera_rpc::CERT_PEM;
813                let certificate = tonic::transport::Certificate::from_pem(CERT_PEM);
814                let tls_config = ClientTlsConfig::new().ca_certificate(certificate);
815                Endpoint::new(format!("https://localhost:{port}"))
816                    .context("endpoint should always parse")?
817                    .tls_config(tls_config)?
818            }
819            _ => bail!("Only supported scheme are http and https"),
820        };
821        let connection = endpoint.connect_lazy();
822        let mut client = HealthClient::new(connection);
823        linera_base::time::timer::sleep(Duration::from_millis(100)).await;
824        for i in 0..10 {
825            linera_base::time::timer::sleep(Duration::from_millis(i * 500)).await;
826            let result = client.check(HealthCheckRequest::default()).await;
827            if result.is_ok() && result.unwrap().get_ref().status() == ServingStatus::Serving {
828                info!(?port, "Successfully started {nickname}");
829                return Ok(());
830            } else {
831                warn!("Waiting for {nickname} to start");
832            }
833        }
834        bail!("Failed to start {nickname}");
835    }
836
837    async fn ensure_simple_server_has_started(
838        nickname: &str,
839        port: usize,
840        protocol: &str,
841    ) -> Result<()> {
842        use linera_core::node::ValidatorNode as _;
843
844        let options = linera_rpc::NodeOptions {
845            send_timeout: Duration::from_secs(5),
846            recv_timeout: Duration::from_secs(5),
847            retry_delay: Duration::from_secs(1),
848            max_retries: 1,
849        };
850        let provider = linera_rpc::simple::SimpleNodeProvider::new(options);
851        let address = format!("{protocol}:127.0.0.1:{port}");
852        // All "simple" services (i.e. proxy and "server") are based on `RpcMessage` and
853        // support `VersionInfoQuery`.
854        let node = provider.make_node(&address)?;
855        linera_base::time::timer::sleep(Duration::from_millis(100)).await;
856        for i in 0..10 {
857            linera_base::time::timer::sleep(Duration::from_millis(i * 500)).await;
858            let result = node.get_version_info().await;
859            if result.is_ok() {
860                info!("Successfully started {nickname}");
861                return Ok(());
862            } else {
863                warn!("Waiting for {nickname} to start");
864            }
865        }
866        bail!("Failed to start {nickname}");
867    }
868
869    async fn initialize_storage(&mut self, validator: usize) -> Result<()> {
870        let namespace = format!("{}_server_{}_db", self.common_namespace, validator);
871        let inner_storage_config = self.common_storage_config.clone();
872        let storage = StorageConfig {
873            inner_storage_config,
874            namespace,
875        };
876        let mut command = self.command_for_binary("linera").await?;
877        if let Ok(var) = env::var(SERVER_ENV) {
878            command.args(var.split_whitespace());
879        }
880        command.args(["storage", "initialize"]);
881        command
882            .args(["--storage", &storage.to_string()])
883            .args(["--genesis", "genesis.json"])
884            .spawn_and_wait_for_stdout()
885            .await?;
886
887        self.initialized_validator_storages
888            .insert(validator, storage);
889        Ok(())
890    }
891
892    async fn run_server(&mut self, validator: usize, shard: usize) -> Result<Child> {
893        let mut storage = self
894            .initialized_validator_storages
895            .get(&validator)
896            .expect("initialized storage")
897            .clone();
898
899        // For the storage backends with a local directory, make sure that we don't reuse
900        // the same directory for all the shards.
901        storage.maybe_append_shard_path(shard)?;
902
903        let mut command = self.command_for_binary("linera-server").await?;
904        if let Ok(var) = env::var(SERVER_ENV) {
905            command.args(var.split_whitespace());
906        }
907        command
908            .arg("run")
909            .args(["--storage", &storage.to_string()])
910            .args(["--server", &format!("server_{}.json", validator)])
911            .args(["--shard", &shard.to_string()])
912            .args(self.cross_chain_config.to_args());
913        let child = command.spawn_into()?;
914
915        let port = self.shard_port(validator, shard);
916        let nickname = format!("validator server {validator}:{shard}");
917        match self.network.internal {
918            Network::Grpc => {
919                Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
920            }
921            Network::Grpcs => {
922                Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
923            }
924            Network::Tcp => {
925                Self::ensure_simple_server_has_started(&nickname, port, "tcp").await?;
926            }
927            Network::Udp => {
928                Self::ensure_simple_server_has_started(&nickname, port, "udp").await?;
929            }
930        }
931        Ok(child)
932    }
933
934    async fn run(&mut self) -> Result<()> {
935        for validator in 0..self.num_initial_validators {
936            self.start_validator(validator).await?;
937        }
938        Ok(())
939    }
940
941    /// Start a validator.
942    pub async fn start_validator(&mut self, index: usize) -> Result<()> {
943        self.initialize_storage(index).await?;
944        self.restart_validator(index).await
945    }
946
947    /// Restart a validator. This is similar to `start_validator` except that the
948    /// database was already initialized once.
949    pub async fn restart_validator(&mut self, index: usize) -> Result<()> {
950        let mut validator = Validator::new();
951        for k in 0..self.num_proxies {
952            let proxy = self.run_proxy(index, k).await?;
953            validator.add_proxy(proxy);
954        }
955        for shard in 0..self.num_shards {
956            let server = self.run_server(index, shard).await?;
957            validator.add_server(server);
958        }
959        if let ExportersSetup::Local(ref exporters) = self.block_exporters {
960            for block_exporter in 0..exporters.len() {
961                let exporter = self.run_exporter(index, block_exporter as u32).await?;
962                validator.add_block_exporter(exporter);
963            }
964        }
965
966        self.running_validators.insert(index, validator);
967        Ok(())
968    }
969
970    /// Terminates all the processes of a given validator.
971    pub async fn stop_validator(&mut self, index: usize) -> Result<()> {
972        if let Some(mut validator) = self.running_validators.remove(&index) {
973            if let Err(error) = validator.terminate().await {
974                error!("Failed to stop validator {index}: {error}");
975                return Err(error);
976            }
977        }
978        Ok(())
979    }
980
981    /// Returns a [`linera_rpc::Client`] to interact directly with a `validator`.
982    pub fn validator_client(&mut self, validator: usize) -> Result<linera_rpc::Client> {
983        let node_provider = linera_rpc::NodeProvider::new(linera_rpc::NodeOptions {
984            send_timeout: Duration::from_secs(1),
985            recv_timeout: Duration::from_secs(1),
986            retry_delay: Duration::ZERO,
987            max_retries: 0,
988        });
989
990        Ok(node_provider.make_node(&self.validator_address(validator))?)
991    }
992
993    /// Returns the address to connect to a validator's proxy.
994    /// In local networks, the zeroth proxy _is_ the validator ingress.
995    pub fn validator_address(&self, validator: usize) -> String {
996        let port = self.proxy_public_port(validator, 0);
997        let schema = self.network.external.schema();
998
999        format!("{schema}:localhost:{port}")
1000    }
1001}
1002
1003#[cfg(with_testing)]
1004impl LocalNet {
1005    /// Returns the validating key and an account key of the validator.
1006    pub fn validator_keys(&self, validator: usize) -> Option<&(String, String)> {
1007        self.validator_keys.get(&validator)
1008    }
1009
1010    pub async fn generate_validator_config(&mut self, validator: usize) -> Result<()> {
1011        let stdout = self
1012            .command_for_binary("linera-server")
1013            .await?
1014            .arg("generate")
1015            .arg("--validators")
1016            .arg(&self.configuration_string(validator)?)
1017            .spawn_and_wait_for_stdout()
1018            .await?;
1019        let keys = stdout
1020            .trim()
1021            .split(',')
1022            .map(str::to_string)
1023            .collect::<Vec<_>>();
1024        self.validator_keys
1025            .insert(validator, (keys[0].clone(), keys[1].clone()));
1026        Ok(())
1027    }
1028
1029    pub async fn terminate_server(&mut self, validator: usize, shard: usize) -> Result<()> {
1030        self.running_validators
1031            .get_mut(&validator)
1032            .context("server not found")?
1033            .terminate_server(shard)
1034            .await?;
1035        Ok(())
1036    }
1037
1038    pub fn remove_validator(&mut self, validator: usize) -> Result<()> {
1039        self.running_validators
1040            .remove(&validator)
1041            .context("validator not found")?;
1042        Ok(())
1043    }
1044
1045    pub async fn start_server(&mut self, validator: usize, shard: usize) -> Result<()> {
1046        let server = self.run_server(validator, shard).await?;
1047        self.running_validators
1048            .get_mut(&validator)
1049            .context("could not find validator")?
1050            .add_server(server);
1051        Ok(())
1052    }
1053}