linera_service/cli_wrappers/
local_net.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4#[cfg(with_testing)]
5use std::sync::LazyLock;
6use std::{
7    collections::BTreeMap,
8    env,
9    num::NonZeroU16,
10    path::{Path, PathBuf},
11    sync::Arc,
12    time::Duration,
13};
14
15use anyhow::{anyhow, bail, ensure, Context, Result};
16#[cfg(with_testing)]
17use async_lock::RwLock;
18use async_trait::async_trait;
19use linera_base::{
20    command::{resolve_binary, CommandExt},
21    data_types::Amount,
22};
23use linera_client::client_options::ResourceControlPolicyConfig;
24use linera_core::node::ValidatorNodeProvider;
25use linera_rpc::config::{CrossChainConfig, ExporterServiceConfig, TlsConfig};
26#[cfg(all(feature = "storage-service", with_testing))]
27use linera_storage_service::common::storage_service_test_endpoint;
28#[cfg(all(feature = "rocksdb", feature = "scylladb", with_testing))]
29use linera_views::rocks_db::{RocksDbDatabase, RocksDbSpawnMode};
30#[cfg(all(feature = "scylladb", with_testing))]
31use linera_views::{scylla_db::ScyllaDbDatabase, store::TestKeyValueDatabase as _};
32use tempfile::{tempdir, TempDir};
33use tokio::process::{Child, Command};
34use tonic::transport::{channel::ClientTlsConfig, Endpoint};
35use tonic_health::pb::{
36    health_check_response::ServingStatus, health_client::HealthClient, HealthCheckRequest,
37};
38use tracing::{error, info, warn};
39
40use crate::{
41    cli_wrappers::{
42        ClientWrapper, LineraNet, LineraNetConfig, Network, NetworkConfig, OnClientDrop,
43    },
44    config::{BlockExporterConfig, Destination, DestinationConfig},
45    storage::{InnerStorageConfig, StorageConfig},
46    util::ChildExt,
47};
48
49/// Maximum allowed number of shards over all validators.
50const MAX_NUMBER_SHARDS: usize = 1000;
51
52pub const FIRST_PUBLIC_PORT: usize = 13000;
53
54pub enum ProcessInbox {
55    Skip,
56    Automatic,
57}
58
59#[cfg(with_testing)]
60static PORT_PROVIDER: LazyLock<RwLock<u16>> = LazyLock::new(|| RwLock::new(7080));
61
62/// Provides a port for the node service. Increment the port numbers.
63#[cfg(with_testing)]
64pub async fn get_node_port() -> u16 {
65    let mut port = PORT_PROVIDER.write().await;
66    let port_ret = *port;
67    *port += 1;
68    info!("get_node_port returning port_ret={}", port_ret);
69    assert!(port_selector::is_free(port_ret));
70    port_ret
71}
72
73#[cfg(with_testing)]
74async fn make_testing_config(database: Database) -> Result<InnerStorageConfig> {
75    match database {
76        Database::Service => {
77            #[cfg(feature = "storage-service")]
78            {
79                let endpoint = storage_service_test_endpoint()
80                    .expect("Reading LINERA_STORAGE_SERVICE environment variable");
81                Ok(InnerStorageConfig::Service { endpoint })
82            }
83            #[cfg(not(feature = "storage-service"))]
84            panic!("Database::Service is selected without the feature storage_service");
85        }
86        Database::DynamoDb => {
87            #[cfg(feature = "dynamodb")]
88            {
89                let use_dynamodb_local = true;
90                Ok(InnerStorageConfig::DynamoDb { use_dynamodb_local })
91            }
92            #[cfg(not(feature = "dynamodb"))]
93            panic!("Database::DynamoDb is selected without the feature dynamodb");
94        }
95        Database::ScyllaDb => {
96            #[cfg(feature = "scylladb")]
97            {
98                let config = ScyllaDbDatabase::new_test_config().await?;
99                Ok(InnerStorageConfig::ScyllaDb {
100                    uri: config.inner_config.uri,
101                })
102            }
103            #[cfg(not(feature = "scylladb"))]
104            panic!("Database::ScyllaDb is selected without the feature scylladb");
105        }
106        Database::DualRocksDbScyllaDb => {
107            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
108            {
109                let rocksdb_config = RocksDbDatabase::new_test_config().await?;
110                let scylla_config = ScyllaDbDatabase::new_test_config().await?;
111                let spawn_mode = RocksDbSpawnMode::get_spawn_mode_from_runtime();
112                Ok(InnerStorageConfig::DualRocksDbScyllaDb {
113                    path_with_guard: rocksdb_config.inner_config.path_with_guard,
114                    spawn_mode,
115                    uri: scylla_config.inner_config.uri,
116                })
117            }
118            #[cfg(not(all(feature = "rocksdb", feature = "scylladb")))]
119            panic!("Database::DualRocksDbScyllaDb is selected without the features rocksdb and scylladb");
120        }
121    }
122}
123
124pub enum InnerStorageConfigBuilder {
125    #[cfg(with_testing)]
126    TestConfig,
127    ExistingConfig {
128        storage_config: InnerStorageConfig,
129    },
130}
131
132impl InnerStorageConfigBuilder {
133    #[cfg_attr(not(with_testing), expect(unused_variables))]
134    pub async fn build(self, database: Database) -> Result<InnerStorageConfig> {
135        match self {
136            #[cfg(with_testing)]
137            InnerStorageConfigBuilder::TestConfig => make_testing_config(database).await,
138            InnerStorageConfigBuilder::ExistingConfig { storage_config } => Ok(storage_config),
139        }
140    }
141}
142
143/// Path used for the run can come from a path whose lifetime is controlled
144/// by an external user or as a temporary directory
145#[derive(Clone)]
146pub enum PathProvider {
147    ExternalPath { path_buf: PathBuf },
148    TemporaryDirectory { tmp_dir: Arc<TempDir> },
149}
150
151impl PathProvider {
152    pub fn path(&self) -> &Path {
153        match self {
154            PathProvider::ExternalPath { path_buf } => path_buf.as_path(),
155            PathProvider::TemporaryDirectory { tmp_dir } => tmp_dir.path(),
156        }
157    }
158
159    pub fn create_temporary_directory() -> Result<Self> {
160        let tmp_dir = Arc::new(tempdir()?);
161        Ok(PathProvider::TemporaryDirectory { tmp_dir })
162    }
163
164    pub fn from_path_option(path: &Option<String>) -> anyhow::Result<Self> {
165        Ok(match path {
166            None => {
167                let tmp_dir = Arc::new(tempfile::tempdir()?);
168                PathProvider::TemporaryDirectory { tmp_dir }
169            }
170            Some(path) => {
171                let path = Path::new(path);
172                let path_buf = path.to_path_buf();
173                PathProvider::ExternalPath { path_buf }
174            }
175        })
176    }
177}
178
179/// The information needed to start a [`LocalNet`].
180pub struct LocalNetConfig {
181    pub database: Database,
182    pub network: NetworkConfig,
183    pub testing_prng_seed: Option<u64>,
184    pub namespace: String,
185    pub num_other_initial_chains: u32,
186    pub initial_amount: Amount,
187    pub num_initial_validators: usize,
188    pub num_shards: usize,
189    pub num_proxies: usize,
190    pub policy_config: ResourceControlPolicyConfig,
191    pub cross_chain_config: CrossChainConfig,
192    pub storage_config_builder: InnerStorageConfigBuilder,
193    pub path_provider: PathProvider,
194    pub block_exporters: ExportersSetup,
195}
196
197/// The setup for the block exporters.
198#[derive(Clone, PartialEq)]
199pub enum ExportersSetup {
200    // Block exporters are meant to be started and managed by the testing framework.
201    Local(Vec<BlockExporterConfig>),
202    // Block exporters are already started and we just need to connect to them.
203    Remote(Vec<ExporterServiceConfig>),
204}
205
206impl ExportersSetup {
207    pub fn new(
208        with_block_exporter: bool,
209        block_exporter_address: String,
210        block_exporter_port: NonZeroU16,
211    ) -> ExportersSetup {
212        if with_block_exporter {
213            let exporter_config =
214                ExporterServiceConfig::new(block_exporter_address, block_exporter_port.into());
215            ExportersSetup::Remote(vec![exporter_config])
216        } else {
217            ExportersSetup::Local(vec![])
218        }
219    }
220}
221
222/// A set of Linera validators running locally as native processes.
223pub struct LocalNet {
224    network: NetworkConfig,
225    testing_prng_seed: Option<u64>,
226    next_client_id: usize,
227    num_initial_validators: usize,
228    num_proxies: usize,
229    num_shards: usize,
230    validator_keys: BTreeMap<usize, (String, String)>,
231    running_validators: BTreeMap<usize, Validator>,
232    initialized_validator_storages: BTreeMap<usize, StorageConfig>,
233    common_namespace: String,
234    common_storage_config: InnerStorageConfig,
235    cross_chain_config: CrossChainConfig,
236    path_provider: PathProvider,
237    block_exporters: ExportersSetup,
238}
239
240/// The name of the environment variable that allows specifying additional arguments to be passed
241/// to the binary when starting a server.
242const SERVER_ENV: &str = "LINERA_SERVER_PARAMS";
243
244/// Description of the database engine to use inside a local Linera network.
245#[derive(Copy, Clone, Eq, PartialEq)]
246pub enum Database {
247    Service,
248    DynamoDb,
249    ScyllaDb,
250    DualRocksDbScyllaDb,
251}
252
253/// The processes of a running validator.
254struct Validator {
255    proxies: Vec<Child>,
256    servers: Vec<Child>,
257    exporters: Vec<Child>,
258}
259
260impl Validator {
261    fn new() -> Self {
262        Self {
263            proxies: vec![],
264            servers: vec![],
265            exporters: vec![],
266        }
267    }
268
269    async fn terminate(&mut self) -> Result<()> {
270        for proxy in &mut self.proxies {
271            proxy.kill().await.context("terminating validator proxy")?;
272        }
273        for server in &mut self.servers {
274            server
275                .kill()
276                .await
277                .context("terminating validator server")?;
278        }
279        Ok(())
280    }
281
282    fn add_proxy(&mut self, proxy: Child) {
283        self.proxies.push(proxy)
284    }
285
286    fn add_server(&mut self, server: Child) {
287        self.servers.push(server)
288    }
289
290    #[cfg(with_testing)]
291    async fn terminate_server(&mut self, index: usize) -> Result<()> {
292        let mut server = self.servers.remove(index);
293        server
294            .kill()
295            .await
296            .context("terminating validator server")?;
297        Ok(())
298    }
299
300    fn add_block_exporter(&mut self, exporter: Child) {
301        self.exporters.push(exporter);
302    }
303
304    fn ensure_is_running(&mut self) -> Result<()> {
305        for proxy in &mut self.proxies {
306            proxy.ensure_is_running()?;
307        }
308        for child in &mut self.servers {
309            child.ensure_is_running()?;
310        }
311        for exporter in &mut self.exporters {
312            exporter.ensure_is_running()?;
313        }
314        Ok(())
315    }
316}
317
318#[cfg(with_testing)]
319impl LocalNetConfig {
320    pub fn new_test(database: Database, network: Network) -> Self {
321        let num_shards = 4;
322        let num_proxies = 1;
323        let storage_config_builder = InnerStorageConfigBuilder::TestConfig;
324        let path_provider = PathProvider::create_temporary_directory().unwrap();
325        let internal = network.drop_tls();
326        let external = network;
327        let network = NetworkConfig { internal, external };
328        let cross_chain_config = CrossChainConfig::default();
329        Self {
330            database,
331            network,
332            num_other_initial_chains: 2,
333            initial_amount: Amount::from_tokens(1_000_000),
334            policy_config: ResourceControlPolicyConfig::Testnet,
335            cross_chain_config,
336            testing_prng_seed: Some(37),
337            namespace: linera_views::random::generate_test_namespace(),
338            num_initial_validators: 4,
339            num_shards,
340            num_proxies,
341            storage_config_builder,
342            path_provider,
343            block_exporters: ExportersSetup::Local(vec![]),
344        }
345    }
346}
347
348#[async_trait]
349impl LineraNetConfig for LocalNetConfig {
350    type Net = LocalNet;
351
352    async fn instantiate(self) -> Result<(Self::Net, ClientWrapper)> {
353        let storage_config = self.storage_config_builder.build(self.database).await?;
354        let mut net = LocalNet::new(
355            self.network,
356            self.testing_prng_seed,
357            self.namespace,
358            self.num_initial_validators,
359            self.num_proxies,
360            self.num_shards,
361            storage_config,
362            self.cross_chain_config,
363            self.path_provider,
364            self.block_exporters,
365        );
366        let client = net.make_client().await;
367        ensure!(
368            self.num_initial_validators > 0,
369            "There should be at least one initial validator"
370        );
371        let total_number_shards = self.num_initial_validators * self.num_shards;
372        ensure!(
373            total_number_shards <= MAX_NUMBER_SHARDS,
374            "Total number of shards ({}) exceeds maximum allowed ({})",
375            self.num_shards,
376            MAX_NUMBER_SHARDS
377        );
378        net.generate_initial_validator_config().await?;
379        client
380            .create_genesis_config(
381                self.num_other_initial_chains,
382                self.initial_amount,
383                self.policy_config,
384                Some(vec!["localhost".to_owned()]),
385            )
386            .await?;
387        net.run().await?;
388        Ok((net, client))
389    }
390}
391
392#[async_trait]
393impl LineraNet for LocalNet {
394    async fn ensure_is_running(&mut self) -> Result<()> {
395        for validator in self.running_validators.values_mut() {
396            validator.ensure_is_running().context("in local network")?;
397        }
398        Ok(())
399    }
400
401    async fn make_client(&mut self) -> ClientWrapper {
402        let client = ClientWrapper::new(
403            self.path_provider.clone(),
404            self.network.external,
405            self.testing_prng_seed,
406            self.next_client_id,
407            OnClientDrop::LeakChains,
408        );
409        if let Some(seed) = self.testing_prng_seed {
410            self.testing_prng_seed = Some(seed + 1);
411        }
412        self.next_client_id += 1;
413        client
414    }
415
416    async fn terminate(&mut self) -> Result<()> {
417        for validator in self.running_validators.values_mut() {
418            validator.terminate().await.context("in local network")?
419        }
420        Ok(())
421    }
422}
423
424impl LocalNet {
425    #[expect(clippy::too_many_arguments)]
426    fn new(
427        network: NetworkConfig,
428        testing_prng_seed: Option<u64>,
429        common_namespace: String,
430        num_initial_validators: usize,
431        num_proxies: usize,
432        num_shards: usize,
433        common_storage_config: InnerStorageConfig,
434        cross_chain_config: CrossChainConfig,
435        path_provider: PathProvider,
436        block_exporters: ExportersSetup,
437    ) -> Self {
438        Self {
439            network,
440            testing_prng_seed,
441            next_client_id: 0,
442            num_initial_validators,
443            num_proxies,
444            num_shards,
445            validator_keys: BTreeMap::new(),
446            running_validators: BTreeMap::new(),
447            initialized_validator_storages: BTreeMap::new(),
448            common_namespace,
449            common_storage_config,
450            cross_chain_config,
451            path_provider,
452            block_exporters,
453        }
454    }
455
456    async fn command_for_binary(&self, name: &'static str) -> Result<Command> {
457        let path = resolve_binary(name, env!("CARGO_PKG_NAME")).await?;
458        let mut command = Command::new(path);
459        command.current_dir(self.path_provider.path());
460        Ok(command)
461    }
462
463    #[cfg(with_testing)]
464    pub fn genesis_config(&self) -> Result<linera_client::config::GenesisConfig> {
465        let path = self.path_provider.path();
466        crate::util::read_json(path.join("genesis.json"))
467    }
468
469    fn shard_port(&self, validator: usize, shard: usize) -> usize {
470        9000 + validator * self.num_shards + shard + 1
471    }
472
473    fn proxy_internal_port(&self, validator: usize, proxy_id: usize) -> usize {
474        10000 + validator * self.num_proxies + proxy_id + 1
475    }
476
477    fn shard_metrics_port(&self, validator: usize, shard: usize) -> usize {
478        11000 + validator * self.num_shards + shard + 1
479    }
480
481    fn proxy_metrics_port(&self, validator: usize, proxy_id: usize) -> usize {
482        12000 + validator * self.num_proxies + proxy_id + 1
483    }
484
485    fn block_exporter_port(&self, validator: usize, exporter_id: usize) -> usize {
486        12000 + validator * self.num_shards + exporter_id + 1
487    }
488
489    pub fn proxy_public_port(&self, validator: usize, proxy_id: usize) -> usize {
490        FIRST_PUBLIC_PORT + validator * self.num_proxies + proxy_id + 1
491    }
492
493    pub fn first_public_port() -> usize {
494        FIRST_PUBLIC_PORT + 1
495    }
496
497    fn block_exporter_metrics_port(exporter_id: usize) -> usize {
498        FIRST_PUBLIC_PORT + exporter_id + 1
499    }
500
501    fn configuration_string(&self, server_number: usize) -> Result<String> {
502        let n = server_number;
503        let path = self
504            .path_provider
505            .path()
506            .join(format!("validator_{n}.toml"));
507        let port = self.proxy_public_port(n, 0);
508        let external_protocol = self.network.external.toml();
509        let internal_protocol = self.network.internal.toml();
510        let external_host = self.network.external.localhost();
511        let internal_host = self.network.internal.localhost();
512        let mut content = format!(
513            r#"
514                server_config_path = "server_{n}.json"
515                host = "{external_host}"
516                port = {port}
517                external_protocol = {external_protocol}
518                internal_protocol = {internal_protocol}
519            "#
520        );
521
522        for k in 0..self.num_proxies {
523            let public_port = self.proxy_public_port(n, k);
524            let internal_port = self.proxy_internal_port(n, k);
525            let metrics_port = self.proxy_metrics_port(n, k);
526            // In the local network, the validator ingress is
527            // the proxy - so the `public_port` is the validator
528            // port.
529            content.push_str(&format!(
530                r#"
531                [[proxies]]
532                host = "{internal_host}"
533                public_port = {public_port}
534                private_port = {internal_port}
535                metrics_port = {metrics_port}
536                "#
537            ));
538        }
539
540        for k in 0..self.num_shards {
541            let shard_port = self.shard_port(n, k);
542            let shard_metrics_port = self.shard_metrics_port(n, k);
543            content.push_str(&format!(
544                r#"
545
546                [[shards]]
547                host = "{internal_host}"
548                port = {shard_port}
549                metrics_port = {shard_metrics_port}
550                "#
551            ));
552        }
553
554        match self.block_exporters {
555            ExportersSetup::Local(ref exporters) => {
556                for (j, exporter) in exporters.iter().enumerate() {
557                    let host = Network::Grpc.localhost();
558                    let port = self.block_exporter_port(n, j);
559                    let config_content = format!(
560                        r#"
561
562                        [[block_exporters]]
563                        host = "{host}"
564                        port = {port}
565                        "#
566                    );
567
568                    content.push_str(&config_content);
569                    let exporter_config = self.generate_block_exporter_config(
570                        n,
571                        j as u32,
572                        &exporter.destination_config,
573                    );
574                    let config_path = self
575                        .path_provider
576                        .path()
577                        .join(format!("exporter_config_{n}:{j}.toml"));
578
579                    fs_err::write(&config_path, &exporter_config)?;
580                }
581            }
582            ExportersSetup::Remote(ref exporters) => {
583                for exporter in exporters {
584                    let host = exporter.host.clone();
585                    let port = exporter.port;
586                    let config_content = format!(
587                        r#"
588
589                        [[block_exporters]]
590                        host = "{host}"
591                        port = {port}
592                        "#
593                    );
594
595                    content.push_str(&config_content);
596                }
597            }
598        }
599
600        fs_err::write(&path, content)?;
601        path.into_os_string().into_string().map_err(|error| {
602            anyhow!(
603                "could not parse OS string into string: {}",
604                error.to_string_lossy()
605            )
606        })
607    }
608
609    fn generate_block_exporter_config(
610        &self,
611        validator: usize,
612        exporter_id: u32,
613        destination_config: &DestinationConfig,
614    ) -> String {
615        let n = validator;
616        let host = Network::Grpc.localhost();
617        let port = self.block_exporter_port(n, exporter_id as usize);
618        let metrics_port = Self::block_exporter_metrics_port(exporter_id as usize);
619        let mut config = format!(
620            r#"
621            id = {exporter_id}
622
623            metrics_port = {metrics_port}
624
625            [service_config]
626            host = "{host}"
627            port = {port}
628
629            "#
630        );
631
632        let DestinationConfig {
633            destinations,
634            committee_destination,
635        } = destination_config;
636
637        if *committee_destination {
638            let destination_string_to_push = r#"
639
640            [destination_config]
641            committee_destination = true
642            "#
643            .to_string();
644
645            config.push_str(&destination_string_to_push);
646        }
647
648        for destination in destinations {
649            let destination_string_to_push = match destination {
650                Destination::Indexer {
651                    tls,
652                    endpoint,
653                    port,
654                } => {
655                    let tls = match tls {
656                        TlsConfig::ClearText => "ClearText",
657                        TlsConfig::Tls => "Tls",
658                    };
659                    format!(
660                        r#"
661                        [[destination_config.destinations]]
662                        tls = "{tls}"
663                        endpoint = "{endpoint}"
664                        port = {port}
665                        kind = "Indexer"
666                        "#
667                    )
668                }
669                Destination::Validator { endpoint, port } => {
670                    format!(
671                        r#"
672                        [[destination_config.destinations]]
673                        endpoint = "{endpoint}"
674                        port = {port}
675                        kind = "Validator"
676                        "#
677                    )
678                }
679                Destination::Logging { file_name } => {
680                    format!(
681                        r#"
682                        [[destination_config.destinations]]
683                        file_name = "{file_name}"
684                        kind = "Logging"
685                        "#
686                    )
687                }
688            };
689
690            config.push_str(&destination_string_to_push);
691        }
692
693        config
694    }
695
696    async fn generate_initial_validator_config(&mut self) -> Result<()> {
697        let mut command = self.command_for_binary("linera-server").await?;
698        command.arg("generate");
699        if let Some(seed) = self.testing_prng_seed {
700            command.arg("--testing-prng-seed").arg(seed.to_string());
701            self.testing_prng_seed = Some(seed + 1);
702        }
703        command.arg("--validators");
704        for i in 0..self.num_initial_validators {
705            command.arg(&self.configuration_string(i)?);
706        }
707        let output = command
708            .args(["--committee", "committee.json"])
709            .spawn_and_wait_for_stdout()
710            .await?;
711        self.validator_keys = output
712            .split_whitespace()
713            .map(str::to_string)
714            .map(|keys| keys.split(',').map(str::to_string).collect::<Vec<_>>())
715            .enumerate()
716            .map(|(i, keys)| {
717                let validator_key = keys[0].to_string();
718                let account_key = keys[1].to_string();
719                (i, (validator_key, account_key))
720            })
721            .collect();
722        Ok(())
723    }
724
725    async fn run_proxy(&mut self, validator: usize, proxy_id: usize) -> Result<Child> {
726        let storage = self
727            .initialized_validator_storages
728            .get(&validator)
729            .expect("initialized storage");
730        let child = self
731            .command_for_binary("linera-proxy")
732            .await?
733            .arg(format!("server_{}.json", validator))
734            .args(["--storage", &storage.to_string()])
735            .args(["--id", &proxy_id.to_string()])
736            .spawn_into()?;
737
738        let port = self.proxy_public_port(validator, proxy_id);
739        let nickname = format!("validator proxy {validator}");
740        match self.network.external {
741            Network::Grpc => {
742                Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
743                let nickname = format!("validator proxy {validator}");
744                Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
745            }
746            Network::Grpcs => {
747                let nickname = format!("validator proxy {validator}");
748                Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
749            }
750            Network::Tcp => {
751                Self::ensure_simple_server_has_started(&nickname, port, "tcp").await?;
752            }
753            Network::Udp => {
754                Self::ensure_simple_server_has_started(&nickname, port, "udp").await?;
755            }
756        }
757        Ok(child)
758    }
759
760    async fn run_exporter(&mut self, validator: usize, exporter_id: u32) -> Result<Child> {
761        let config_path = format!("exporter_config_{validator}:{exporter_id}.toml");
762        let storage = self
763            .initialized_validator_storages
764            .get(&validator)
765            .expect("initialized storage");
766
767        tracing::debug!(config=?config_path, storage=?storage.to_string(), "starting block exporter");
768
769        let child = self
770            .command_for_binary("linera-exporter")
771            .await?
772            .args(["--config-path", &config_path])
773            .args(["--storage", &storage.to_string()])
774            .spawn_into()?;
775
776        match self.network.internal {
777            Network::Grpc => {
778                let port = self.block_exporter_port(validator, exporter_id as usize);
779                let nickname = format!("block exporter {validator}:{exporter_id}");
780                Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
781            }
782            Network::Grpcs => {
783                let port = self.block_exporter_port(validator, exporter_id as usize);
784                let nickname = format!("block exporter  {validator}:{exporter_id}");
785                Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
786            }
787            Network::Tcp | Network::Udp => {
788                unreachable!("Only allowed options are grpc and grpcs")
789            }
790        }
791
792        tracing::info!("block exporter started {validator}:{exporter_id}");
793
794        Ok(child)
795    }
796
797    pub async fn ensure_grpc_server_has_started(
798        nickname: &str,
799        port: usize,
800        scheme: &str,
801    ) -> Result<()> {
802        let endpoint = match scheme {
803            "http" => Endpoint::new(format!("http://localhost:{port}"))
804                .context("endpoint should always parse")?,
805            "https" => {
806                use linera_rpc::CERT_PEM;
807                let certificate = tonic::transport::Certificate::from_pem(CERT_PEM);
808                let tls_config = ClientTlsConfig::new().ca_certificate(certificate);
809                Endpoint::new(format!("https://localhost:{port}"))
810                    .context("endpoint should always parse")?
811                    .tls_config(tls_config)?
812            }
813            _ => bail!("Only supported scheme are http and https"),
814        };
815        let connection = endpoint.connect_lazy();
816        let mut client = HealthClient::new(connection);
817        linera_base::time::timer::sleep(Duration::from_millis(100)).await;
818        for i in 0..10 {
819            linera_base::time::timer::sleep(Duration::from_millis(i * 500)).await;
820            let result = client.check(HealthCheckRequest::default()).await;
821            if result.is_ok() && result.unwrap().get_ref().status() == ServingStatus::Serving {
822                info!(?port, "Successfully started {nickname}");
823                return Ok(());
824            } else {
825                warn!("Waiting for {nickname} to start");
826            }
827        }
828        bail!("Failed to start {nickname}");
829    }
830
831    async fn ensure_simple_server_has_started(
832        nickname: &str,
833        port: usize,
834        protocol: &str,
835    ) -> Result<()> {
836        use linera_core::node::ValidatorNode as _;
837
838        let options = linera_rpc::NodeOptions {
839            send_timeout: Duration::from_secs(5),
840            recv_timeout: Duration::from_secs(5),
841            retry_delay: Duration::from_secs(1),
842            max_retries: 1,
843        };
844        let provider = linera_rpc::simple::SimpleNodeProvider::new(options);
845        let address = format!("{protocol}:127.0.0.1:{port}");
846        // All "simple" services (i.e. proxy and "server") are based on `RpcMessage` and
847        // support `VersionInfoQuery`.
848        let node = provider.make_node(&address)?;
849        linera_base::time::timer::sleep(Duration::from_millis(100)).await;
850        for i in 0..10 {
851            linera_base::time::timer::sleep(Duration::from_millis(i * 500)).await;
852            let result = node.get_version_info().await;
853            if result.is_ok() {
854                info!("Successfully started {nickname}");
855                return Ok(());
856            } else {
857                warn!("Waiting for {nickname} to start");
858            }
859        }
860        bail!("Failed to start {nickname}");
861    }
862
863    async fn initialize_storage(&mut self, validator: usize) -> Result<()> {
864        let namespace = format!("{}_server_{}_db", self.common_namespace, validator);
865        let inner_storage_config = self.common_storage_config.clone();
866        let storage = StorageConfig {
867            inner_storage_config,
868            namespace,
869        };
870        let mut command = self.command_for_binary("linera").await?;
871        if let Ok(var) = env::var(SERVER_ENV) {
872            command.args(var.split_whitespace());
873        }
874        command.args(["storage", "initialize"]);
875        command
876            .args(["--storage", &storage.to_string()])
877            .args(["--genesis", "genesis.json"])
878            .spawn_and_wait_for_stdout()
879            .await?;
880
881        self.initialized_validator_storages
882            .insert(validator, storage);
883        Ok(())
884    }
885
886    async fn run_server(&mut self, validator: usize, shard: usize) -> Result<Child> {
887        let mut storage = self
888            .initialized_validator_storages
889            .get(&validator)
890            .expect("initialized storage")
891            .clone();
892
893        // For the storage backends with a local directory, make sure that we don't reuse
894        // the same directory for all the shards.
895        storage.maybe_append_shard_path(shard)?;
896
897        let mut command = self.command_for_binary("linera-server").await?;
898        if let Ok(var) = env::var(SERVER_ENV) {
899            command.args(var.split_whitespace());
900        }
901        command
902            .arg("run")
903            .args(["--storage", &storage.to_string()])
904            .args(["--server", &format!("server_{}.json", validator)])
905            .args(["--shard", &shard.to_string()])
906            .args(self.cross_chain_config.to_args());
907        let child = command.spawn_into()?;
908
909        let port = self.shard_port(validator, shard);
910        let nickname = format!("validator server {validator}:{shard}");
911        match self.network.internal {
912            Network::Grpc => {
913                Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
914            }
915            Network::Grpcs => {
916                Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
917            }
918            Network::Tcp => {
919                Self::ensure_simple_server_has_started(&nickname, port, "tcp").await?;
920            }
921            Network::Udp => {
922                Self::ensure_simple_server_has_started(&nickname, port, "udp").await?;
923            }
924        }
925        Ok(child)
926    }
927
928    async fn run(&mut self) -> Result<()> {
929        for validator in 0..self.num_initial_validators {
930            self.start_validator(validator).await?;
931        }
932        Ok(())
933    }
934
935    /// Start a validator.
936    pub async fn start_validator(&mut self, index: usize) -> Result<()> {
937        self.initialize_storage(index).await?;
938        self.restart_validator(index).await
939    }
940
941    /// Restart a validator. This is similar to `start_validator` except that the
942    /// database was already initialized once.
943    pub async fn restart_validator(&mut self, index: usize) -> Result<()> {
944        let mut validator = Validator::new();
945        for k in 0..self.num_proxies {
946            let proxy = self.run_proxy(index, k).await?;
947            validator.add_proxy(proxy);
948        }
949        for shard in 0..self.num_shards {
950            let server = self.run_server(index, shard).await?;
951            validator.add_server(server);
952        }
953        if let ExportersSetup::Local(ref exporters) = self.block_exporters {
954            for block_exporter in 0..exporters.len() {
955                let exporter = self.run_exporter(index, block_exporter as u32).await?;
956                validator.add_block_exporter(exporter);
957            }
958        }
959
960        self.running_validators.insert(index, validator);
961        Ok(())
962    }
963
964    /// Terminates all the processes of a given validator.
965    pub async fn stop_validator(&mut self, index: usize) -> Result<()> {
966        if let Some(mut validator) = self.running_validators.remove(&index) {
967            if let Err(error) = validator.terminate().await {
968                error!("Failed to stop validator {index}: {error}");
969                return Err(error);
970            }
971        }
972        Ok(())
973    }
974
975    /// Returns a [`linera_rpc::Client`] to interact directly with a `validator`.
976    pub fn validator_client(&mut self, validator: usize) -> Result<linera_rpc::Client> {
977        let node_provider = linera_rpc::NodeProvider::new(linera_rpc::NodeOptions {
978            send_timeout: Duration::from_secs(1),
979            recv_timeout: Duration::from_secs(1),
980            retry_delay: Duration::ZERO,
981            max_retries: 0,
982        });
983
984        Ok(node_provider.make_node(&self.validator_address(validator))?)
985    }
986
987    /// Returns the address to connect to a validator's proxy.
988    /// In local networks, the zeroth proxy _is_ the validator ingress.
989    pub fn validator_address(&self, validator: usize) -> String {
990        let port = self.proxy_public_port(validator, 0);
991        let schema = self.network.external.schema();
992
993        format!("{schema}:localhost:{port}")
994    }
995}
996
997#[cfg(with_testing)]
998impl LocalNet {
999    /// Returns the validating key and an account key of the validator.
1000    pub fn validator_keys(&self, validator: usize) -> Option<&(String, String)> {
1001        self.validator_keys.get(&validator)
1002    }
1003
1004    pub async fn generate_validator_config(&mut self, validator: usize) -> Result<()> {
1005        let stdout = self
1006            .command_for_binary("linera-server")
1007            .await?
1008            .arg("generate")
1009            .arg("--validators")
1010            .arg(&self.configuration_string(validator)?)
1011            .spawn_and_wait_for_stdout()
1012            .await?;
1013        let keys = stdout
1014            .trim()
1015            .split(',')
1016            .map(str::to_string)
1017            .collect::<Vec<_>>();
1018        self.validator_keys
1019            .insert(validator, (keys[0].clone(), keys[1].clone()));
1020        Ok(())
1021    }
1022
1023    pub async fn terminate_server(&mut self, validator: usize, shard: usize) -> Result<()> {
1024        self.running_validators
1025            .get_mut(&validator)
1026            .context("server not found")?
1027            .terminate_server(shard)
1028            .await?;
1029        Ok(())
1030    }
1031
1032    pub fn remove_validator(&mut self, validator: usize) -> Result<()> {
1033        self.running_validators
1034            .remove(&validator)
1035            .context("validator not found")?;
1036        Ok(())
1037    }
1038
1039    pub async fn start_server(&mut self, validator: usize, shard: usize) -> Result<()> {
1040        let server = self.run_server(validator, shard).await?;
1041        self.running_validators
1042            .get_mut(&validator)
1043            .context("could not find validator")?
1044            .add_server(server);
1045        Ok(())
1046    }
1047}