Skip to main content

linera_service/cli_wrappers/
local_net.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4#[cfg(with_testing)]
5use std::sync::LazyLock;
6use std::{
7    collections::BTreeMap,
8    env,
9    num::NonZeroU16,
10    path::{Path, PathBuf},
11    sync::Arc,
12    time::Duration,
13};
14
15use anyhow::{anyhow, bail, ensure, Context, Result};
16#[cfg(with_testing)]
17use async_lock::RwLock;
18use async_trait::async_trait;
19use linera_base::{
20    command::{resolve_binary, CommandExt},
21    data_types::Amount,
22};
23use linera_client::client_options::ResourceControlPolicyConfig;
24use linera_core::node::ValidatorNodeProvider;
25use linera_exporter::config::{BlockExporterConfig, Destination, DestinationConfig};
26use linera_rpc::config::{CrossChainConfig, ExporterServiceConfig, TlsConfig};
27#[cfg(all(feature = "storage-service", with_testing))]
28use linera_storage_service::common::storage_service_test_endpoint;
29#[cfg(all(feature = "rocksdb", feature = "scylladb", with_testing))]
30use linera_views::rocks_db::{RocksDbDatabase, RocksDbSpawnMode};
31#[cfg(all(feature = "scylladb", with_testing))]
32use linera_views::{scylla_db::ScyllaDbDatabase, store::TestKeyValueDatabase as _};
33use tempfile::{tempdir, TempDir};
34use tokio::process::{Child, Command};
35use tonic::transport::{channel::ClientTlsConfig, Endpoint};
36use tonic_health::pb::{
37    health_check_response::ServingStatus, health_client::HealthClient, HealthCheckRequest,
38};
39use tracing::{error, info, warn};
40
41use crate::{
42    cli_wrappers::{
43        ClientWrapper, LineraNet, LineraNetConfig, Network, NetworkConfig, OnClientDrop,
44    },
45    storage::{InnerStorageConfig, StorageConfig},
46    util::ChildExt,
47};
48
49/// Maximum allowed number of shards over all validators.
50const MAX_NUMBER_SHARDS: usize = 1000;
51
52pub enum ProcessInbox {
53    Skip,
54    Automatic,
55}
56
57#[cfg(with_testing)]
58static PORT_PROVIDER: LazyLock<RwLock<u16>> = LazyLock::new(|| RwLock::new(7080));
59
60/// The offset of the port
61fn test_offset_port() -> usize {
62    std::env::var("TEST_OFFSET_PORT")
63        .ok()
64        .and_then(|port_str| port_str.parse::<usize>().ok())
65        .unwrap_or(9000)
66}
67
68/// Provides a port for the node service. Increment the port numbers.
69#[cfg(with_testing)]
70pub async fn get_node_port() -> u16 {
71    let mut port = PORT_PROVIDER.write().await;
72    let port_ret = *port;
73    *port += 1;
74    info!("get_node_port returning port_ret={}", port_ret);
75    assert!(port_selector::is_free(port_ret));
76    port_ret
77}
78
79#[cfg(with_testing)]
80async fn make_testing_config(database: Database) -> Result<InnerStorageConfig> {
81    match database {
82        Database::Service => {
83            #[cfg(feature = "storage-service")]
84            {
85                let endpoint = storage_service_test_endpoint()
86                    .expect("Reading LINERA_STORAGE_SERVICE environment variable");
87                Ok(InnerStorageConfig::Service { endpoint })
88            }
89            #[cfg(not(feature = "storage-service"))]
90            panic!("Database::Service is selected without the feature storage_service");
91        }
92        Database::ScyllaDb => {
93            #[cfg(feature = "scylladb")]
94            {
95                let config = ScyllaDbDatabase::new_test_config().await?;
96                Ok(InnerStorageConfig::ScyllaDb {
97                    uri: config.inner_config.uri,
98                })
99            }
100            #[cfg(not(feature = "scylladb"))]
101            panic!("Database::ScyllaDb is selected without the feature scylladb");
102        }
103        Database::DualRocksDbScyllaDb => {
104            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
105            {
106                let rocksdb_config = RocksDbDatabase::new_test_config().await?;
107                let scylla_config = ScyllaDbDatabase::new_test_config().await?;
108                let spawn_mode = RocksDbSpawnMode::get_spawn_mode_from_runtime();
109                Ok(InnerStorageConfig::DualRocksDbScyllaDb {
110                    path_with_guard: rocksdb_config.inner_config.path_with_guard,
111                    spawn_mode,
112                    uri: scylla_config.inner_config.uri,
113                })
114            }
115            #[cfg(not(all(feature = "rocksdb", feature = "scylladb")))]
116            panic!("Database::DualRocksDbScyllaDb is selected without the features rocksdb and scylladb");
117        }
118    }
119}
120
121pub enum InnerStorageConfigBuilder {
122    #[cfg(with_testing)]
123    TestConfig,
124    ExistingConfig {
125        storage_config: InnerStorageConfig,
126    },
127}
128
129impl InnerStorageConfigBuilder {
130    #[cfg_attr(not(with_testing), expect(unused_variables))]
131    pub async fn build(self, database: Database) -> Result<InnerStorageConfig> {
132        match self {
133            #[cfg(with_testing)]
134            InnerStorageConfigBuilder::TestConfig => make_testing_config(database).await,
135            InnerStorageConfigBuilder::ExistingConfig { storage_config } => Ok(storage_config),
136        }
137    }
138}
139
140/// Path used for the run can come from a path whose lifetime is controlled
141/// by an external user or as a temporary directory
142#[derive(Clone)]
143pub enum PathProvider {
144    ExternalPath { path_buf: PathBuf },
145    TemporaryDirectory { tmp_dir: Arc<TempDir> },
146}
147
148impl PathProvider {
149    pub fn path(&self) -> &Path {
150        match self {
151            PathProvider::ExternalPath { path_buf } => path_buf.as_path(),
152            PathProvider::TemporaryDirectory { tmp_dir } => tmp_dir.path(),
153        }
154    }
155
156    pub fn create_temporary_directory() -> Result<Self> {
157        let tmp_dir = Arc::new(tempdir()?);
158        Ok(PathProvider::TemporaryDirectory { tmp_dir })
159    }
160
161    pub fn from_path_option(path: &Option<String>) -> anyhow::Result<Self> {
162        Ok(match path {
163            None => {
164                let tmp_dir = Arc::new(tempfile::tempdir()?);
165                PathProvider::TemporaryDirectory { tmp_dir }
166            }
167            Some(path) => {
168                let path = Path::new(path);
169                let path_buf = path.to_path_buf();
170                PathProvider::ExternalPath { path_buf }
171            }
172        })
173    }
174}
175
176/// The information needed to start a [`LocalNet`].
177pub struct LocalNetConfig {
178    pub database: Database,
179    pub network: NetworkConfig,
180    pub testing_prng_seed: Option<u64>,
181    pub namespace: String,
182    pub num_other_initial_chains: u32,
183    pub initial_amount: Amount,
184    pub num_initial_validators: usize,
185    pub num_shards: usize,
186    pub num_proxies: usize,
187    pub policy_config: ResourceControlPolicyConfig,
188    pub http_request_allow_list: Option<Vec<String>>,
189    pub cross_chain_config: CrossChainConfig,
190    pub storage_config_builder: InnerStorageConfigBuilder,
191    pub path_provider: PathProvider,
192    pub block_exporters: ExportersSetup,
193    /// Optional directory where the `linera`, `linera-proxy`, and `linera-server` binaries
194    /// are located. If `None`, binaries are resolved from the current binary's directory.
195    pub binary_dir: Option<PathBuf>,
196}
197
198/// The setup for the block exporters.
199#[derive(Clone, PartialEq)]
200pub enum ExportersSetup {
201    // Block exporters are meant to be started and managed by the testing framework.
202    Local(Vec<BlockExporterConfig>),
203    // Block exporters are already started and we just need to connect to them.
204    Remote(Vec<ExporterServiceConfig>),
205}
206
207impl ExportersSetup {
208    pub fn new(
209        with_block_exporter: bool,
210        block_exporter_address: String,
211        block_exporter_port: NonZeroU16,
212    ) -> ExportersSetup {
213        if with_block_exporter {
214            let exporter_config =
215                ExporterServiceConfig::new(block_exporter_address, block_exporter_port.into());
216            ExportersSetup::Remote(vec![exporter_config])
217        } else {
218            ExportersSetup::Local(vec![])
219        }
220    }
221}
222
223/// A set of Linera validators running locally as native processes.
224pub struct LocalNet {
225    network: NetworkConfig,
226    testing_prng_seed: Option<u64>,
227    next_client_id: usize,
228    num_initial_validators: usize,
229    num_proxies: usize,
230    num_shards: usize,
231    validator_keys: BTreeMap<usize, (String, String)>,
232    running_validators: BTreeMap<usize, Validator>,
233    initialized_validator_storages: BTreeMap<usize, StorageConfig>,
234    common_namespace: String,
235    common_storage_config: InnerStorageConfig,
236    cross_chain_config: CrossChainConfig,
237    path_provider: PathProvider,
238    block_exporters: ExportersSetup,
239    binary_dir: Option<PathBuf>,
240}
241
242/// The name of the environment variable that allows specifying additional arguments to be passed
243/// to the binary when starting a server.
244const SERVER_ENV: &str = "LINERA_SERVER_PARAMS";
245
246/// Description of the database engine to use inside a local Linera network.
247#[derive(Copy, Clone, Eq, PartialEq)]
248pub enum Database {
249    Service,
250    ScyllaDb,
251    DualRocksDbScyllaDb,
252}
253
254/// The processes of a running validator.
255struct Validator {
256    proxies: Vec<Child>,
257    servers: Vec<Child>,
258    exporters: Vec<Child>,
259}
260
261impl Validator {
262    fn new() -> Self {
263        Self {
264            proxies: vec![],
265            servers: vec![],
266            exporters: vec![],
267        }
268    }
269
270    async fn terminate(&mut self) -> Result<()> {
271        for proxy in &mut self.proxies {
272            proxy.kill().await.context("terminating validator proxy")?;
273        }
274        for server in &mut self.servers {
275            server
276                .kill()
277                .await
278                .context("terminating validator server")?;
279        }
280        Ok(())
281    }
282
283    fn add_proxy(&mut self, proxy: Child) {
284        self.proxies.push(proxy)
285    }
286
287    fn add_server(&mut self, server: Child) {
288        self.servers.push(server)
289    }
290
291    #[cfg(with_testing)]
292    async fn terminate_server(&mut self, index: usize) -> Result<()> {
293        let mut server = self.servers.remove(index);
294        server
295            .kill()
296            .await
297            .context("terminating validator server")?;
298        Ok(())
299    }
300
301    fn add_block_exporter(&mut self, exporter: Child) {
302        self.exporters.push(exporter);
303    }
304
305    fn ensure_is_running(&mut self) -> Result<()> {
306        for proxy in &mut self.proxies {
307            proxy.ensure_is_running()?;
308        }
309        for child in &mut self.servers {
310            child.ensure_is_running()?;
311        }
312        for exporter in &mut self.exporters {
313            exporter.ensure_is_running()?;
314        }
315        Ok(())
316    }
317}
318
319#[cfg(with_testing)]
320impl LocalNetConfig {
321    pub fn new_test(database: Database, network: Network) -> Self {
322        let num_shards = 4;
323        let num_proxies = 1;
324        let storage_config_builder = InnerStorageConfigBuilder::TestConfig;
325        let path_provider = PathProvider::create_temporary_directory().unwrap();
326        let internal = network.drop_tls();
327        let external = network;
328        let network = NetworkConfig { internal, external };
329        let cross_chain_config = CrossChainConfig::default();
330        Self {
331            database,
332            network,
333            num_other_initial_chains: 2,
334            initial_amount: Amount::from_tokens(1_000_000),
335            policy_config: ResourceControlPolicyConfig::Testnet,
336            cross_chain_config,
337            testing_prng_seed: Some(37),
338            namespace: linera_views::random::generate_test_namespace(),
339            num_initial_validators: 4,
340            num_shards,
341            num_proxies,
342            storage_config_builder,
343            path_provider,
344            block_exporters: ExportersSetup::Local(vec![]),
345            http_request_allow_list: Some(vec!["localhost".to_string()]),
346            binary_dir: None,
347        }
348    }
349}
350
351#[async_trait]
352impl LineraNetConfig for LocalNetConfig {
353    type Net = LocalNet;
354
355    async fn instantiate(self) -> Result<(Self::Net, ClientWrapper)> {
356        let storage_config = self.storage_config_builder.build(self.database).await?;
357        let mut net = LocalNet::new(
358            self.network,
359            self.testing_prng_seed,
360            self.namespace,
361            self.num_initial_validators,
362            self.num_proxies,
363            self.num_shards,
364            storage_config,
365            self.cross_chain_config,
366            self.path_provider,
367            self.block_exporters,
368            self.binary_dir,
369        );
370        let client = net.make_client().await;
371        ensure!(
372            self.num_initial_validators > 0,
373            "There should be at least one initial validator"
374        );
375        let total_number_shards = self.num_initial_validators * self.num_shards;
376        ensure!(
377            total_number_shards <= MAX_NUMBER_SHARDS,
378            "Total number of shards ({}) exceeds maximum allowed ({})",
379            self.num_shards,
380            MAX_NUMBER_SHARDS
381        );
382        net.generate_initial_validator_config().await?;
383        client
384            .create_genesis_config(
385                self.num_other_initial_chains,
386                self.initial_amount,
387                self.policy_config,
388                self.http_request_allow_list
389                    .clone()
390                    .or_else(|| Some(vec!["localhost".to_owned()])),
391            )
392            .await?;
393        net.run().await?;
394        Ok((net, client))
395    }
396}
397
398#[async_trait]
399impl LineraNet for LocalNet {
400    async fn ensure_is_running(&mut self) -> Result<()> {
401        for validator in self.running_validators.values_mut() {
402            validator.ensure_is_running().context("in local network")?;
403        }
404        Ok(())
405    }
406
407    async fn make_client(&mut self) -> ClientWrapper {
408        let client = ClientWrapper::new_with_extra_args(
409            self.path_provider.clone(),
410            self.network.external,
411            self.testing_prng_seed,
412            self.next_client_id,
413            OnClientDrop::LeakChains,
414            vec!["--wait-for-outgoing-messages".to_string()],
415            self.binary_dir.clone(),
416        );
417        if let Some(seed) = self.testing_prng_seed {
418            self.testing_prng_seed = Some(seed + 1);
419        }
420        self.next_client_id += 1;
421        client
422    }
423
424    async fn terminate(&mut self) -> Result<()> {
425        for validator in self.running_validators.values_mut() {
426            validator.terminate().await.context("in local network")?
427        }
428        Ok(())
429    }
430}
431
432impl LocalNet {
433    #[expect(clippy::too_many_arguments)]
434    fn new(
435        network: NetworkConfig,
436        testing_prng_seed: Option<u64>,
437        common_namespace: String,
438        num_initial_validators: usize,
439        num_proxies: usize,
440        num_shards: usize,
441        common_storage_config: InnerStorageConfig,
442        cross_chain_config: CrossChainConfig,
443        path_provider: PathProvider,
444        block_exporters: ExportersSetup,
445        binary_dir: Option<PathBuf>,
446    ) -> Self {
447        Self {
448            network,
449            testing_prng_seed,
450            next_client_id: 0,
451            num_initial_validators,
452            num_proxies,
453            num_shards,
454            validator_keys: BTreeMap::new(),
455            running_validators: BTreeMap::new(),
456            initialized_validator_storages: BTreeMap::new(),
457            common_namespace,
458            common_storage_config,
459            cross_chain_config,
460            path_provider,
461            block_exporters,
462            binary_dir,
463        }
464    }
465
466    async fn command_for_binary(&self, name: &'static str) -> Result<Command> {
467        let path = if let Some(dir) = &self.binary_dir {
468            dir.join(name)
469        } else {
470            resolve_binary(name, env!("CARGO_PKG_NAME")).await?
471        };
472        let mut command = Command::new(path);
473        command.current_dir(self.path_provider.path());
474        Ok(command)
475    }
476
477    #[cfg(with_testing)]
478    pub fn genesis_config(&self) -> Result<linera_client::config::GenesisConfig> {
479        let path = self.path_provider.path();
480        crate::util::read_json(path.join("genesis.json"))
481    }
482
483    fn shard_port(&self, validator: usize, shard: usize) -> usize {
484        test_offset_port() + validator * self.num_shards + shard + 1
485    }
486
487    fn proxy_internal_port(&self, validator: usize, proxy_id: usize) -> usize {
488        test_offset_port() + 1000 + validator * self.num_proxies + proxy_id + 1
489    }
490
491    fn shard_metrics_port(&self, validator: usize, shard: usize) -> usize {
492        test_offset_port() + 2000 + validator * self.num_shards + shard + 1
493    }
494
495    fn proxy_metrics_port(&self, validator: usize, proxy_id: usize) -> usize {
496        test_offset_port() + 3000 + validator * self.num_proxies + proxy_id + 1
497    }
498
499    fn block_exporter_port(&self, validator: usize, exporter_id: usize) -> usize {
500        test_offset_port() + 3000 + validator * self.num_shards + exporter_id + 1
501    }
502
503    pub fn proxy_public_port(&self, validator: usize, proxy_id: usize) -> usize {
504        test_offset_port() + 4000 + validator * self.num_proxies + proxy_id + 1
505    }
506
507    pub fn first_public_port() -> usize {
508        test_offset_port() + 4000 + 1
509    }
510
511    fn block_exporter_metrics_port(exporter_id: usize) -> usize {
512        test_offset_port() + 4000 + exporter_id + 1
513    }
514
515    fn configuration_string(&self, server_number: usize) -> Result<String> {
516        let n = server_number;
517        let path = self
518            .path_provider
519            .path()
520            .join(format!("validator_{n}.toml"));
521        let port = self.proxy_public_port(n, 0);
522        let external_protocol = self.network.external.toml();
523        let internal_protocol = self.network.internal.toml();
524        let external_host = self.network.external.localhost();
525        let internal_host = self.network.internal.localhost();
526        let mut content = format!(
527            r#"
528                server_config_path = "server_{n}.json"
529                host = "{external_host}"
530                port = {port}
531                external_protocol = {external_protocol}
532                internal_protocol = {internal_protocol}
533            "#
534        );
535
536        for k in 0..self.num_proxies {
537            let public_port = self.proxy_public_port(n, k);
538            let internal_port = self.proxy_internal_port(n, k);
539            let metrics_port = self.proxy_metrics_port(n, k);
540            // In the local network, the validator ingress is
541            // the proxy - so the `public_port` is the validator
542            // port.
543            content.push_str(&format!(
544                r#"
545                [[proxies]]
546                host = "{internal_host}"
547                public_port = {public_port}
548                private_port = {internal_port}
549                metrics_port = {metrics_port}
550                "#
551            ));
552        }
553
554        for k in 0..self.num_shards {
555            let shard_port = self.shard_port(n, k);
556            let shard_metrics_port = self.shard_metrics_port(n, k);
557            content.push_str(&format!(
558                r#"
559
560                [[shards]]
561                host = "{internal_host}"
562                port = {shard_port}
563                metrics_port = {shard_metrics_port}
564                "#
565            ));
566        }
567
568        match self.block_exporters {
569            ExportersSetup::Local(ref exporters) => {
570                for (j, exporter) in exporters.iter().enumerate() {
571                    let host = Network::Grpc.localhost();
572                    let port = self.block_exporter_port(n, j);
573                    let config_content = format!(
574                        r#"
575
576                        [[block_exporters]]
577                        host = "{host}"
578                        port = {port}
579                        "#
580                    );
581
582                    content.push_str(&config_content);
583                    #[expect(
584                        clippy::cast_possible_truncation,
585                        reason = "loop index over a local config list bounded well below u32::MAX"
586                    )]
587                    let exporter_index = j as u32;
588                    let exporter_config = self.generate_block_exporter_config(
589                        n,
590                        exporter_index,
591                        &exporter.destination_config,
592                    );
593                    let config_path = self
594                        .path_provider
595                        .path()
596                        .join(format!("exporter_config_{n}:{j}.toml"));
597
598                    fs_err::write(&config_path, &exporter_config)?;
599                }
600            }
601            ExportersSetup::Remote(ref exporters) => {
602                for exporter in exporters {
603                    let host = exporter.host.clone();
604                    let port = exporter.port;
605                    let config_content = format!(
606                        r#"
607
608                        [[block_exporters]]
609                        host = "{host}"
610                        port = {port}
611                        "#
612                    );
613
614                    content.push_str(&config_content);
615                }
616            }
617        }
618
619        fs_err::write(&path, content)?;
620        path.into_os_string().into_string().map_err(|error| {
621            anyhow!(
622                "could not parse OS string into string: {}",
623                error.to_string_lossy()
624            )
625        })
626    }
627
628    fn generate_block_exporter_config(
629        &self,
630        validator: usize,
631        exporter_id: u32,
632        destination_config: &DestinationConfig,
633    ) -> String {
634        let n = validator;
635        let host = Network::Grpc.localhost();
636        let port = self.block_exporter_port(n, exporter_id as usize);
637        let metrics_port = Self::block_exporter_metrics_port(exporter_id as usize);
638        let mut config = format!(
639            r#"
640            id = {exporter_id}
641
642            metrics_port = {metrics_port}
643
644            [service_config]
645            host = "{host}"
646            port = {port}
647
648            "#
649        );
650
651        let DestinationConfig {
652            destinations,
653            committee_destination,
654        } = destination_config;
655
656        if *committee_destination {
657            let destination_string_to_push = r#"
658
659            [destination_config]
660            committee_destination = true
661            "#
662            .to_string();
663
664            config.push_str(&destination_string_to_push);
665        }
666
667        for destination in destinations {
668            let destination_string_to_push = match destination {
669                Destination::Indexer {
670                    tls,
671                    endpoint,
672                    port,
673                } => {
674                    let tls = match tls {
675                        TlsConfig::ClearText => "ClearText",
676                        TlsConfig::Tls => "Tls",
677                    };
678                    format!(
679                        r#"
680                        [[destination_config.destinations]]
681                        tls = "{tls}"
682                        endpoint = "{endpoint}"
683                        port = {port}
684                        kind = "Indexer"
685                        "#
686                    )
687                }
688                Destination::Validator { endpoint, port } => {
689                    format!(
690                        r#"
691                        [[destination_config.destinations]]
692                        endpoint = "{endpoint}"
693                        port = {port}
694                        kind = "Validator"
695                        "#
696                    )
697                }
698                Destination::Logging { file_name } => {
699                    format!(
700                        r#"
701                        [[destination_config.destinations]]
702                        file_name = "{file_name}"
703                        kind = "Logging"
704                        "#
705                    )
706                }
707            };
708
709            config.push_str(&destination_string_to_push);
710        }
711
712        config
713    }
714
715    async fn generate_initial_validator_config(&mut self) -> Result<()> {
716        let mut command = self.command_for_binary("linera-server").await?;
717        command.arg("generate");
718        if let Some(seed) = self.testing_prng_seed {
719            command.arg("--testing-prng-seed").arg(seed.to_string());
720            self.testing_prng_seed = Some(seed + 1);
721        }
722        command.arg("--validators");
723        for i in 0..self.num_initial_validators {
724            command.arg(&self.configuration_string(i)?);
725        }
726        let output = command
727            .args(["--committee", "committee.json"])
728            .spawn_and_wait_for_stdout()
729            .await?;
730        self.validator_keys = output
731            .split_whitespace()
732            .map(str::to_string)
733            .map(|keys| keys.split(',').map(str::to_string).collect::<Vec<_>>())
734            .enumerate()
735            .map(|(i, keys)| {
736                let validator_key = keys[0].to_string();
737                let account_key = keys[1].to_string();
738                (i, (validator_key, account_key))
739            })
740            .collect();
741        Ok(())
742    }
743
744    async fn run_proxy(&self, validator: usize, proxy_id: usize) -> Result<Child> {
745        let storage = self
746            .initialized_validator_storages
747            .get(&validator)
748            .expect("initialized storage");
749        let child = self
750            .command_for_binary("linera-proxy")
751            .await?
752            .arg(format!("server_{validator}.json"))
753            .args(["--storage", &storage.to_string()])
754            .args(["--id", &proxy_id.to_string()])
755            .spawn_into()?;
756
757        let port = self.proxy_public_port(validator, proxy_id);
758        let nickname = format!("validator proxy {validator}");
759        match self.network.external {
760            Network::Grpc => {
761                Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
762                let nickname = format!("validator proxy {validator}");
763                Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
764            }
765            Network::Grpcs => {
766                let nickname = format!("validator proxy {validator}");
767                Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
768            }
769            Network::Tcp => {
770                Self::ensure_simple_server_has_started(&nickname, port, "tcp").await?;
771            }
772            Network::Udp => {
773                Self::ensure_simple_server_has_started(&nickname, port, "udp").await?;
774            }
775        }
776        Ok(child)
777    }
778
779    async fn run_exporter(&self, validator: usize, exporter_id: u32) -> Result<Child> {
780        let config_path = format!("exporter_config_{validator}:{exporter_id}.toml");
781        let storage = self
782            .initialized_validator_storages
783            .get(&validator)
784            .expect("initialized storage");
785
786        tracing::debug!(config=?config_path, storage=?storage.to_string(), "starting block exporter");
787
788        let child = self
789            .command_for_binary("linera-exporter")
790            .await?
791            .args(["run", "--config-path", &config_path])
792            .args(["--storage", &storage.to_string()])
793            .spawn_into()?;
794
795        match self.network.internal {
796            Network::Grpc => {
797                let port = self.block_exporter_port(validator, exporter_id as usize);
798                let nickname = format!("block exporter {validator}:{exporter_id}");
799                Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
800            }
801            Network::Grpcs => {
802                let port = self.block_exporter_port(validator, exporter_id as usize);
803                let nickname = format!("block exporter  {validator}:{exporter_id}");
804                Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
805            }
806            Network::Tcp | Network::Udp => {
807                unreachable!("Only allowed options are grpc and grpcs")
808            }
809        }
810
811        tracing::info!("block exporter started {validator}:{exporter_id}");
812
813        Ok(child)
814    }
815
816    pub async fn ensure_grpc_server_has_started(
817        nickname: &str,
818        port: usize,
819        scheme: &str,
820    ) -> Result<()> {
821        let endpoint = match scheme {
822            "http" => Endpoint::new(format!("http://localhost:{port}"))
823                .context("endpoint should always parse")?,
824            "https" => {
825                use linera_rpc::CERT_PEM;
826                let certificate = tonic::transport::Certificate::from_pem(CERT_PEM);
827                let tls_config = ClientTlsConfig::new().ca_certificate(certificate);
828                Endpoint::new(format!("https://localhost:{port}"))
829                    .context("endpoint should always parse")?
830                    .tls_config(tls_config)?
831            }
832            _ => bail!("Only supported scheme are http and https"),
833        };
834        let connection = endpoint.connect_lazy();
835        let mut client = HealthClient::new(connection);
836        linera_base::time::timer::sleep(Duration::from_millis(100)).await;
837        for i in 0..10 {
838            linera_base::time::timer::sleep(Duration::from_millis(i * 500)).await;
839            let result = client.check(HealthCheckRequest::default()).await;
840            if result.is_ok() && result.unwrap().get_ref().status() == ServingStatus::Serving {
841                info!(?port, "Successfully started {nickname}");
842                return Ok(());
843            } else {
844                warn!("Waiting for {nickname} to start");
845            }
846        }
847        bail!("Failed to start {nickname}");
848    }
849
850    async fn ensure_simple_server_has_started(
851        nickname: &str,
852        port: usize,
853        protocol: &str,
854    ) -> Result<()> {
855        use linera_core::node::ValidatorNode as _;
856
857        let options = linera_rpc::NodeOptions {
858            send_timeout: Duration::from_secs(5),
859            recv_timeout: Duration::from_secs(5),
860            retry_delay: Duration::from_secs(1),
861            max_retries: 1,
862            ..Default::default()
863        };
864        let provider = linera_rpc::simple::SimpleNodeProvider::new(options);
865        let address = format!("{protocol}:127.0.0.1:{port}");
866        // All "simple" services (i.e. proxy and "server") are based on `RpcMessage` and
867        // support `VersionInfoQuery`.
868        let node = provider.make_node(&address)?;
869        linera_base::time::timer::sleep(Duration::from_millis(100)).await;
870        for i in 0..10 {
871            linera_base::time::timer::sleep(Duration::from_millis(i * 500)).await;
872            let result = node.get_version_info().await;
873            if result.is_ok() {
874                info!("Successfully started {nickname}");
875                return Ok(());
876            } else {
877                warn!("Waiting for {nickname} to start");
878            }
879        }
880        bail!("Failed to start {nickname}");
881    }
882
883    async fn initialize_storage(&mut self, validator: usize) -> Result<()> {
884        let namespace = format!("{}_server_{}_db", self.common_namespace, validator);
885        let inner_storage_config = self.common_storage_config.clone();
886        let storage = StorageConfig {
887            inner_storage_config,
888            namespace,
889        };
890        let mut command = self.command_for_binary("linera").await?;
891        if let Ok(var) = env::var(SERVER_ENV) {
892            command.args(var.split_whitespace());
893        }
894        command.args(["storage", "initialize"]);
895        command
896            .args(["--storage", &storage.to_string()])
897            .args(["--genesis", "genesis.json"])
898            .spawn_and_wait_for_stdout()
899            .await?;
900
901        self.initialized_validator_storages
902            .insert(validator, storage);
903        Ok(())
904    }
905
906    async fn run_server(&self, validator: usize, shard: usize) -> Result<Child> {
907        let mut storage = self
908            .initialized_validator_storages
909            .get(&validator)
910            .expect("initialized storage")
911            .clone();
912
913        // For the storage backends with a local directory, make sure that we don't reuse
914        // the same directory for all the shards.
915        storage.maybe_append_shard_path(shard)?;
916
917        let mut command = self.command_for_binary("linera-server").await?;
918        if let Ok(var) = env::var(SERVER_ENV) {
919            command.args(var.split_whitespace());
920        }
921        command
922            .arg("run")
923            .args(["--storage", &storage.to_string()])
924            .args(["--server", &format!("server_{validator}.json")])
925            .args(["--shard", &shard.to_string()])
926            .args(self.cross_chain_config.to_args());
927        let child = command.spawn_into()?;
928
929        let port = self.shard_port(validator, shard);
930        let nickname = format!("validator server {validator}:{shard}");
931        match self.network.internal {
932            Network::Grpc => {
933                Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
934            }
935            Network::Grpcs => {
936                Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
937            }
938            Network::Tcp => {
939                Self::ensure_simple_server_has_started(&nickname, port, "tcp").await?;
940            }
941            Network::Udp => {
942                Self::ensure_simple_server_has_started(&nickname, port, "udp").await?;
943            }
944        }
945        Ok(child)
946    }
947
948    async fn run(&mut self) -> Result<()> {
949        for validator in 0..self.num_initial_validators {
950            self.start_validator(validator).await?;
951        }
952        Ok(())
953    }
954
955    /// Start a validator.
956    pub async fn start_validator(&mut self, index: usize) -> Result<()> {
957        self.initialize_storage(index).await?;
958        self.restart_validator(index).await
959    }
960
961    /// Restart a validator. This is similar to `start_validator` except that the
962    /// database was already initialized once.
963    pub async fn restart_validator(&mut self, index: usize) -> Result<()> {
964        let mut validator = Validator::new();
965        for k in 0..self.num_proxies {
966            let proxy = self.run_proxy(index, k).await?;
967            validator.add_proxy(proxy);
968        }
969        for shard in 0..self.num_shards {
970            let server = self.run_server(index, shard).await?;
971            validator.add_server(server);
972        }
973        if let ExportersSetup::Local(ref exporters) = self.block_exporters {
974            for block_exporter in 0..exporters.len() {
975                #[expect(
976                    clippy::cast_possible_truncation,
977                    reason = "loop index over a local exporter list bounded well below u32::MAX"
978                )]
979                let exporter_id = block_exporter as u32;
980                let exporter = self.run_exporter(index, exporter_id).await?;
981                validator.add_block_exporter(exporter);
982            }
983        }
984
985        self.running_validators.insert(index, validator);
986        Ok(())
987    }
988
989    /// Terminates all the processes of a given validator.
990    pub async fn stop_validator(&mut self, index: usize) -> Result<()> {
991        if let Some(mut validator) = self.running_validators.remove(&index) {
992            if let Err(error) = validator.terminate().await {
993                error!("Failed to stop validator {index}: {error}");
994                return Err(error);
995            }
996        }
997        Ok(())
998    }
999
1000    /// Returns a [`linera_rpc::Client`] to interact directly with a `validator`.
1001    pub fn validator_client(&mut self, validator: usize) -> Result<linera_rpc::Client> {
1002        let node_provider = linera_rpc::NodeProvider::new(linera_rpc::NodeOptions {
1003            send_timeout: Duration::from_secs(1),
1004            recv_timeout: Duration::from_secs(1),
1005            retry_delay: Duration::ZERO,
1006            max_retries: 0,
1007            ..Default::default()
1008        });
1009
1010        Ok(node_provider.make_node(&self.validator_address(validator))?)
1011    }
1012
1013    /// Returns the address to connect to a validator's proxy.
1014    /// In local networks, the zeroth proxy _is_ the validator ingress.
1015    pub fn validator_address(&self, validator: usize) -> String {
1016        let port = self.proxy_public_port(validator, 0);
1017        let schema = self.network.external.schema();
1018
1019        format!("{schema}:localhost:{port}")
1020    }
1021}
1022
1023#[cfg(with_testing)]
1024impl LocalNet {
1025    /// Returns the validating key and an account key of the validator.
1026    pub fn validator_keys(&self, validator: usize) -> Option<&(String, String)> {
1027        self.validator_keys.get(&validator)
1028    }
1029
1030    pub async fn generate_validator_config(&mut self, validator: usize) -> Result<()> {
1031        let stdout = self
1032            .command_for_binary("linera-server")
1033            .await?
1034            .arg("generate")
1035            .arg("--validators")
1036            .arg(&self.configuration_string(validator)?)
1037            .spawn_and_wait_for_stdout()
1038            .await?;
1039        let keys = stdout
1040            .trim()
1041            .split(',')
1042            .map(str::to_string)
1043            .collect::<Vec<_>>();
1044        self.validator_keys
1045            .insert(validator, (keys[0].clone(), keys[1].clone()));
1046        Ok(())
1047    }
1048
1049    pub async fn terminate_server(&mut self, validator: usize, shard: usize) -> Result<()> {
1050        self.running_validators
1051            .get_mut(&validator)
1052            .context("server not found")?
1053            .terminate_server(shard)
1054            .await?;
1055        Ok(())
1056    }
1057
1058    pub fn remove_validator(&mut self, validator: usize) -> Result<()> {
1059        self.running_validators
1060            .remove(&validator)
1061            .context("validator not found")?;
1062        Ok(())
1063    }
1064
1065    pub async fn start_server(&mut self, validator: usize, shard: usize) -> Result<()> {
1066        let server = self.run_server(validator, shard).await?;
1067        self.running_validators
1068            .get_mut(&validator)
1069            .context("could not find validator")?
1070            .add_server(server);
1071        Ok(())
1072    }
1073}