linera_service/cli_wrappers/
local_net.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4#[cfg(with_testing)]
5use std::sync::LazyLock;
6use std::{
7    collections::BTreeMap,
8    env,
9    path::{Path, PathBuf},
10    sync::Arc,
11    time::Duration,
12};
13
14use anyhow::{anyhow, bail, ensure, Context, Result};
15#[cfg(with_testing)]
16use async_lock::RwLock;
17use async_trait::async_trait;
18use linera_base::{
19    command::{resolve_binary, CommandExt},
20    data_types::Amount,
21};
22use linera_client::client_options::ResourceControlPolicyConfig;
23use linera_core::node::ValidatorNodeProvider;
24use linera_rpc::config::CrossChainConfig;
25#[cfg(all(feature = "storage-service", with_testing))]
26use linera_storage_service::common::storage_service_test_endpoint;
27#[cfg(all(feature = "rocksdb", feature = "scylladb", with_testing))]
28use linera_views::rocks_db::{RocksDbSpawnMode, RocksDbStore};
29#[cfg(all(feature = "scylladb", with_testing))]
30use linera_views::{scylla_db::ScyllaDbStore, store::TestKeyValueStore as _};
31use tempfile::{tempdir, TempDir};
32use tokio::process::{Child, Command};
33use tonic::transport::{channel::ClientTlsConfig, Endpoint};
34use tonic_health::pb::{
35    health_check_response::ServingStatus, health_client::HealthClient, HealthCheckRequest,
36};
37use tracing::{error, info, warn};
38
39use crate::{
40    cli_wrappers::{
41        ClientWrapper, LineraNet, LineraNetConfig, Network, NetworkConfig, OnClientDrop,
42    },
43    storage::{StorageConfig, StorageConfigNamespace},
44    util::ChildExt,
45};
46
47pub enum ProcessInbox {
48    Skip,
49    Automatic,
50}
51
52#[cfg(with_testing)]
53static PORT_PROVIDER: LazyLock<RwLock<u16>> = LazyLock::new(|| RwLock::new(7080));
54
55/// Provides a port for the node service. Increment the port numbers.
56#[cfg(with_testing)]
57pub async fn get_node_port() -> u16 {
58    let mut port = PORT_PROVIDER.write().await;
59    let port_ret = *port;
60    *port += 1;
61    info!("get_node_port returning port_ret={}", port_ret);
62    assert!(port_selector::is_free(port_ret));
63    port_ret
64}
65
66#[cfg(with_testing)]
67async fn make_testing_config(database: Database) -> Result<StorageConfig> {
68    match database {
69        Database::Service => {
70            #[cfg(feature = "storage-service")]
71            {
72                let endpoint = storage_service_test_endpoint()
73                    .expect("Reading LINERA_STORAGE_SERVICE environment variable");
74                Ok(StorageConfig::Service { endpoint })
75            }
76            #[cfg(not(feature = "storage-service"))]
77            panic!("Database::Service is selected without the feature storage_service");
78        }
79        Database::DynamoDb => {
80            #[cfg(feature = "dynamodb")]
81            {
82                let use_dynamodb_local = true;
83                Ok(StorageConfig::DynamoDb { use_dynamodb_local })
84            }
85            #[cfg(not(feature = "dynamodb"))]
86            panic!("Database::DynamoDb is selected without the feature dynamodb");
87        }
88        Database::ScyllaDb => {
89            #[cfg(feature = "scylladb")]
90            {
91                let config = ScyllaDbStore::new_test_config().await?;
92                Ok(StorageConfig::ScyllaDb {
93                    uri: config.inner_config.uri,
94                })
95            }
96            #[cfg(not(feature = "scylladb"))]
97            panic!("Database::ScyllaDb is selected without the feature scylladb");
98        }
99        Database::DualRocksDbScyllaDb => {
100            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
101            {
102                let rocksdb_config = RocksDbStore::new_test_config().await?;
103                let scylla_config = ScyllaDbStore::new_test_config().await?;
104                let spawn_mode = RocksDbSpawnMode::get_spawn_mode_from_runtime();
105                Ok(StorageConfig::DualRocksDbScyllaDb {
106                    path_with_guard: rocksdb_config.inner_config.path_with_guard,
107                    spawn_mode,
108                    uri: scylla_config.inner_config.uri,
109                })
110            }
111            #[cfg(not(all(feature = "rocksdb", feature = "scylladb")))]
112            panic!("Database::DualRocksDbScyllaDb is selected without the features rocksdb and scylladb");
113        }
114    }
115}
116
117pub enum StorageConfigBuilder {
118    #[cfg(with_testing)]
119    TestConfig,
120    ExistingConfig {
121        storage_config: StorageConfig,
122    },
123}
124
125impl StorageConfigBuilder {
126    #[allow(unused_variables)]
127    pub async fn build(self, database: Database) -> Result<StorageConfig> {
128        match self {
129            #[cfg(with_testing)]
130            StorageConfigBuilder::TestConfig => make_testing_config(database).await,
131            StorageConfigBuilder::ExistingConfig { storage_config } => Ok(storage_config),
132        }
133    }
134}
135
136/// Path used for the run can come from a path whose lifetime is controlled
137/// by an external user or as a temporary directory
138#[derive(Clone)]
139pub enum PathProvider {
140    ExternalPath { path_buf: PathBuf },
141    TemporaryDirectory { tmp_dir: Arc<TempDir> },
142}
143
144impl PathProvider {
145    pub fn path(&self) -> &Path {
146        match self {
147            PathProvider::ExternalPath { path_buf } => path_buf.as_path(),
148            PathProvider::TemporaryDirectory { tmp_dir } => tmp_dir.path(),
149        }
150    }
151
152    pub fn create_temporary_directory() -> Result<Self> {
153        let tmp_dir = Arc::new(tempdir()?);
154        Ok(PathProvider::TemporaryDirectory { tmp_dir })
155    }
156
157    pub fn new(path: &Option<String>) -> anyhow::Result<Self> {
158        Ok(match path {
159            None => {
160                let tmp_dir = Arc::new(tempfile::tempdir()?);
161                PathProvider::TemporaryDirectory { tmp_dir }
162            }
163            Some(path) => {
164                let path = Path::new(path);
165                let path_buf = path.to_path_buf();
166                PathProvider::ExternalPath { path_buf }
167            }
168        })
169    }
170}
171
172/// The information needed to start a [`LocalNet`].
173pub struct LocalNetConfig {
174    pub database: Database,
175    pub network: NetworkConfig,
176    pub testing_prng_seed: Option<u64>,
177    pub namespace: String,
178    pub num_other_initial_chains: u32,
179    pub initial_amount: Amount,
180    pub num_initial_validators: usize,
181    pub num_shards: usize,
182    pub num_proxies: usize,
183    pub policy_config: ResourceControlPolicyConfig,
184    pub cross_chain_config: CrossChainConfig,
185    pub storage_config_builder: StorageConfigBuilder,
186    pub path_provider: PathProvider,
187    pub num_block_exporters: u32,
188}
189
190/// A set of Linera validators running locally as native processes.
191pub struct LocalNet {
192    network: NetworkConfig,
193    testing_prng_seed: Option<u64>,
194    next_client_id: usize,
195    num_initial_validators: usize,
196    num_proxies: usize,
197    num_shards: usize,
198    validator_keys: BTreeMap<usize, (String, String)>,
199    running_validators: BTreeMap<usize, Validator>,
200    initialized_validator_storages: BTreeMap<usize, StorageConfigNamespace>,
201    common_namespace: String,
202    common_storage_config: StorageConfig,
203    cross_chain_config: CrossChainConfig,
204    path_provider: PathProvider,
205    num_block_exporters: u32,
206}
207
208/// The name of the environment variable that allows specifying additional arguments to be passed
209/// to the binary when starting a server.
210const SERVER_ENV: &str = "LINERA_SERVER_PARAMS";
211
212/// Description of the database engine to use inside a local Linera network.
213#[derive(Copy, Clone, Eq, PartialEq)]
214pub enum Database {
215    Service,
216    DynamoDb,
217    ScyllaDb,
218    DualRocksDbScyllaDb,
219}
220
221/// The processes of a running validator.
222struct Validator {
223    proxy: Child,
224    servers: Vec<Child>,
225    exporters: Vec<Child>,
226}
227
228impl Validator {
229    fn new(proxy: Child) -> Self {
230        Self {
231            proxy,
232            servers: vec![],
233            exporters: vec![],
234        }
235    }
236
237    async fn terminate(&mut self) -> Result<()> {
238        self.proxy
239            .kill()
240            .await
241            .context("terminating validator proxy")?;
242        for server in &mut self.servers {
243            server
244                .kill()
245                .await
246                .context("terminating validator server")?;
247        }
248        Ok(())
249    }
250
251    fn add_server(&mut self, server: Child) {
252        self.servers.push(server)
253    }
254
255    fn add_block_exporter(&mut self, exporter: Child) {
256        self.exporters.push(exporter);
257    }
258
259    #[cfg(with_testing)]
260    async fn terminate_server(&mut self, index: usize) -> Result<()> {
261        let mut server = self.servers.remove(index);
262        server
263            .kill()
264            .await
265            .context("terminating validator server")?;
266        Ok(())
267    }
268
269    fn ensure_is_running(&mut self) -> Result<()> {
270        self.proxy.ensure_is_running()?;
271        for child in &mut self.servers {
272            child.ensure_is_running()?;
273        }
274
275        for exporter in &mut self.exporters {
276            exporter.ensure_is_running()?;
277        }
278
279        Ok(())
280    }
281}
282
283#[cfg(with_testing)]
284impl LocalNetConfig {
285    pub fn new_test(database: Database, network: Network) -> Self {
286        let num_shards = 4;
287        let num_proxies = 1;
288        let storage_config_builder = StorageConfigBuilder::TestConfig;
289        let path_provider = PathProvider::create_temporary_directory().unwrap();
290        let internal = network.drop_tls();
291        let external = network;
292        let network = NetworkConfig { internal, external };
293        let cross_chain_config = CrossChainConfig::default();
294        Self {
295            database,
296            network,
297            num_other_initial_chains: 2,
298            initial_amount: Amount::from_tokens(1_000_000),
299            policy_config: ResourceControlPolicyConfig::Testnet,
300            cross_chain_config,
301            testing_prng_seed: Some(37),
302            namespace: linera_views::random::generate_test_namespace(),
303            num_initial_validators: 4,
304            num_shards,
305            num_proxies,
306            storage_config_builder,
307            path_provider,
308            num_block_exporters: 0,
309        }
310    }
311}
312
313#[async_trait]
314impl LineraNetConfig for LocalNetConfig {
315    type Net = LocalNet;
316
317    async fn instantiate(self) -> Result<(Self::Net, ClientWrapper)> {
318        let storage_config = self.storage_config_builder.build(self.database).await?;
319        let mut net = LocalNet::new(
320            self.network,
321            self.testing_prng_seed,
322            self.namespace,
323            self.num_initial_validators,
324            self.num_shards,
325            self.num_proxies,
326            storage_config,
327            self.cross_chain_config,
328            self.path_provider,
329            self.num_block_exporters,
330        )?;
331        let client = net.make_client().await;
332        ensure!(
333            self.num_initial_validators > 0,
334            "There should be at least one initial validator"
335        );
336        net.generate_initial_validator_config().await?;
337        client
338            .create_genesis_config(
339                self.num_other_initial_chains,
340                self.initial_amount,
341                self.policy_config,
342                Some(vec!["localhost".to_owned()]),
343            )
344            .await?;
345        net.run().await?;
346        Ok((net, client))
347    }
348}
349
350#[async_trait]
351impl LineraNet for LocalNet {
352    async fn ensure_is_running(&mut self) -> Result<()> {
353        for validator in self.running_validators.values_mut() {
354            validator.ensure_is_running().context("in local network")?;
355        }
356        Ok(())
357    }
358
359    async fn make_client(&mut self) -> ClientWrapper {
360        let client = ClientWrapper::new(
361            self.path_provider.clone(),
362            self.network.external,
363            self.testing_prng_seed,
364            self.next_client_id,
365            OnClientDrop::LeakChains,
366        );
367        if let Some(seed) = self.testing_prng_seed {
368            self.testing_prng_seed = Some(seed + 1);
369        }
370        self.next_client_id += 1;
371        client
372    }
373
374    async fn terminate(&mut self) -> Result<()> {
375        for validator in self.running_validators.values_mut() {
376            validator.terminate().await.context("in local network")?
377        }
378        Ok(())
379    }
380}
381
382impl LocalNet {
383    #[expect(clippy::too_many_arguments)]
384    fn new(
385        network: NetworkConfig,
386        testing_prng_seed: Option<u64>,
387        common_namespace: String,
388        num_initial_validators: usize,
389        num_proxies: usize,
390        num_shards: usize,
391        common_storage_config: StorageConfig,
392        cross_chain_config: CrossChainConfig,
393        path_provider: PathProvider,
394        num_block_exporters: u32,
395    ) -> Result<Self> {
396        Ok(Self {
397            network,
398            testing_prng_seed,
399            next_client_id: 0,
400            num_initial_validators,
401            num_proxies,
402            num_shards,
403            validator_keys: BTreeMap::new(),
404            running_validators: BTreeMap::new(),
405            initialized_validator_storages: BTreeMap::new(),
406            common_namespace,
407            common_storage_config,
408            cross_chain_config,
409            path_provider,
410            num_block_exporters,
411        })
412    }
413
414    async fn command_for_binary(&self, name: &'static str) -> Result<Command> {
415        let path = resolve_binary(name, env!("CARGO_PKG_NAME")).await?;
416        let mut command = Command::new(path);
417        command.current_dir(self.path_provider.path());
418        Ok(command)
419    }
420
421    #[cfg(with_testing)]
422    pub fn genesis_config(&self) -> Result<linera_client::config::GenesisConfig> {
423        let path = self.path_provider.path();
424        crate::util::read_json(path.join("genesis.json"))
425    }
426
427    pub fn proxy_public_port(validator: usize, proxy_id: usize) -> usize {
428        13000 + validator * 100 + proxy_id + 1
429    }
430
431    pub fn proxy_internal_port(validator: usize, proxy_id: usize) -> usize {
432        10000 + validator * 100 + proxy_id + 1
433    }
434
435    fn shard_port(validator: usize, shard: usize) -> usize {
436        9000 + validator * 100 + shard + 1
437    }
438
439    fn proxy_metrics_port(validator: usize, proxy_id: usize) -> usize {
440        12000 + validator * 100 + proxy_id + 1
441    }
442
443    fn shard_metrics_port(validator: usize, shard: usize) -> usize {
444        11000 + validator * 100 + shard + 1
445    }
446
447    fn block_exporter_port(validator: usize, exporter_id: usize) -> usize {
448        12000 + validator * 100 + exporter_id + 1
449    }
450
451    fn configuration_string(&self, server_number: usize) -> Result<String> {
452        let n = server_number;
453        let path = self
454            .path_provider
455            .path()
456            .join(format!("validator_{n}.toml"));
457        let port = Self::proxy_public_port(n, 0);
458        let external_protocol = self.network.external.toml();
459        let internal_protocol = self.network.internal.toml();
460        let external_host = self.network.external.localhost();
461        let internal_host = self.network.internal.localhost();
462        let mut content = format!(
463            r#"
464                server_config_path = "server_{n}.json"
465                host = "{external_host}"
466                port = {port}
467                external_protocol = {external_protocol}
468                internal_protocol = {internal_protocol}
469            "#
470        );
471
472        for k in 0..self.num_proxies {
473            let internal_port = Self::proxy_internal_port(n, k);
474            let metrics_port = Self::proxy_metrics_port(n, k);
475            // In the local network, the validator ingress is
476            // the proxy - so the `public_port` is the validator
477            // port.
478            content.push_str(&format!(
479                r#"
480                [[proxies]]
481                host = "{internal_host}"
482                public_port = {port}
483                private_port = {internal_port}
484                metrics_host = "{external_host}"
485                metrics_port = {metrics_port}
486                "#
487            ));
488        }
489
490        for k in 0..self.num_shards {
491            let shard_port = Self::shard_port(n, k);
492            let shard_metrics_port = Self::shard_metrics_port(n, k);
493            content.push_str(&format!(
494                r#"
495
496                [[shards]]
497                host = "{internal_host}"
498                port = {shard_port}
499                metrics_port = {shard_metrics_port}
500                "#
501            ));
502        }
503
504        for j in 0..self.num_block_exporters {
505            let host = Network::Grpc.localhost();
506            let port = Self::block_exporter_port(n, j as usize);
507            let config_content = format!(
508                r#"
509
510                [[block_exporters]]
511                host = "{host}"
512                port = {port}
513                "#
514            );
515
516            content.push_str(&config_content);
517            let exporter_config = self.generate_block_exporter_config(n, j);
518            let config_path = self
519                .path_provider
520                .path()
521                .join(format!("exporter_config_{n}:{j}.toml"));
522
523            fs_err::write(&config_path, &exporter_config)?;
524        }
525
526        fs_err::write(&path, content)?;
527        path.into_os_string().into_string().map_err(|error| {
528            anyhow!(
529                "could not parse OS string into string: {}",
530                error.to_string_lossy()
531            )
532        })
533    }
534
535    fn generate_block_exporter_config(&self, validator: usize, exporter_id: u32) -> String {
536        let n = validator;
537        let host = Network::Grpc.localhost();
538        let port = Self::block_exporter_port(n, exporter_id as usize);
539        let config = format!(
540            r#"
541            id = {exporter_id}
542
543            [service_config]
544            host = "{host}"
545            port = {port}
546            "#
547        );
548
549        config
550    }
551
552    async fn generate_initial_validator_config(&mut self) -> Result<()> {
553        let mut command = self.command_for_binary("linera-server").await?;
554        command.arg("generate");
555        if let Some(seed) = self.testing_prng_seed {
556            command.arg("--testing-prng-seed").arg(seed.to_string());
557            self.testing_prng_seed = Some(seed + 1);
558        }
559        command.arg("--validators");
560        for i in 0..self.num_initial_validators {
561            command.arg(&self.configuration_string(i)?);
562        }
563        let output = command
564            .args(["--committee", "committee.json"])
565            .spawn_and_wait_for_stdout()
566            .await?;
567        self.validator_keys = output
568            .split_whitespace()
569            .map(str::to_string)
570            .map(|keys| keys.split(',').map(str::to_string).collect::<Vec<_>>())
571            .enumerate()
572            .map(|(i, keys)| {
573                let validator_key = keys[0].to_string();
574                let account_key = keys[1].to_string();
575                (i, (validator_key, account_key))
576            })
577            .collect();
578        Ok(())
579    }
580
581    async fn run_proxy(&mut self, validator: usize) -> Result<Child> {
582        let storage = self
583            .initialized_validator_storages
584            .get(&validator)
585            .expect("initialized storage");
586        let child = self
587            .command_for_binary("linera-proxy")
588            .await?
589            .arg(format!("server_{}.json", validator))
590            .args(["--storage", &storage.to_string()])
591            .args(["--genesis", "genesis.json"])
592            .spawn_into()?;
593
594        let port = Self::proxy_public_port(validator, 0);
595        let nickname = format!("validator proxy {validator}");
596        match self.network.external {
597            Network::Grpc => {
598                Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
599                let nickname = format!("validator proxy {validator}");
600                Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
601            }
602            Network::Grpcs => {
603                let nickname = format!("validator proxy {validator}");
604                Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
605            }
606            Network::Tcp => {
607                Self::ensure_simple_server_has_started(&nickname, port, "tcp").await?;
608            }
609            Network::Udp => {
610                Self::ensure_simple_server_has_started(&nickname, port, "udp").await?;
611            }
612        }
613        Ok(child)
614    }
615
616    async fn run_exporter(&mut self, validator: usize, exporter_id: u32) -> Result<Child> {
617        let config_path = format!("exporter_config_{validator}:{exporter_id}.toml");
618        let storage = self
619            .initialized_validator_storages
620            .get(&validator)
621            .expect("initialized storage");
622
623        let child = self
624            .command_for_binary("linera-exporter")
625            .await?
626            .arg(config_path)
627            .args(["--storage", &storage.to_string()])
628            .args(["--genesis", "genesis.json"])
629            .spawn_into()?;
630
631        match self.network.internal {
632            Network::Grpc => {
633                let port = Self::block_exporter_port(validator, exporter_id as usize);
634                let nickname = format!("block exporter {validator}:{exporter_id}");
635                Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
636            }
637            Network::Grpcs => {
638                let port = Self::block_exporter_port(validator, exporter_id as usize);
639                let nickname = format!("block exporter  {validator}:{exporter_id}");
640                Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
641            }
642            Network::Tcp | Network::Udp => {
643                unreachable!("Only allowed options are grpc and grpcs")
644            }
645        }
646
647        Ok(child)
648    }
649
650    pub async fn ensure_grpc_server_has_started(
651        nickname: &str,
652        port: usize,
653        scheme: &str,
654    ) -> Result<()> {
655        let endpoint = match scheme {
656            "http" => Endpoint::new(format!("http://localhost:{port}"))
657                .context("endpoint should always parse")?,
658            "https" => {
659                use linera_rpc::CERT_PEM;
660                let certificate = tonic::transport::Certificate::from_pem(CERT_PEM);
661                let tls_config = ClientTlsConfig::new().ca_certificate(certificate);
662                Endpoint::new(format!("https://localhost:{port}"))
663                    .context("endpoint should always parse")?
664                    .tls_config(tls_config)?
665            }
666            _ => bail!("Only supported scheme are http and https"),
667        };
668        let connection = endpoint.connect_lazy();
669        let mut client = HealthClient::new(connection);
670        linera_base::time::timer::sleep(Duration::from_millis(100)).await;
671        for i in 0..10 {
672            linera_base::time::timer::sleep(Duration::from_millis(i * 500)).await;
673            let result = client.check(HealthCheckRequest::default()).await;
674            if result.is_ok() && result.unwrap().get_ref().status() == ServingStatus::Serving {
675                info!("Successfully started {nickname}");
676                return Ok(());
677            } else {
678                warn!("Waiting for {nickname} to start");
679            }
680        }
681        bail!("Failed to start {nickname}");
682    }
683
684    pub async fn ensure_simple_server_has_started(
685        nickname: &str,
686        port: usize,
687        protocol: &str,
688    ) -> Result<()> {
689        use linera_core::node::ValidatorNode as _;
690
691        let options = linera_rpc::NodeOptions {
692            send_timeout: Duration::from_secs(5),
693            recv_timeout: Duration::from_secs(5),
694            retry_delay: Duration::from_secs(1),
695            max_retries: 1,
696        };
697        let provider = linera_rpc::simple::SimpleNodeProvider::new(options);
698        let address = format!("{protocol}:127.0.0.1:{port}");
699        // All "simple" services (i.e. proxy and "server") are based on `RpcMessage` and
700        // support `VersionInfoQuery`.
701        let node = provider.make_node(&address)?;
702        linera_base::time::timer::sleep(Duration::from_millis(100)).await;
703        for i in 0..10 {
704            linera_base::time::timer::sleep(Duration::from_millis(i * 500)).await;
705            let result = node.get_version_info().await;
706            if result.is_ok() {
707                info!("Successfully started {nickname}");
708                return Ok(());
709            } else {
710                warn!("Waiting for {nickname} to start");
711            }
712        }
713        bail!("Failed to start {nickname}");
714    }
715
716    async fn initialize_storage(&mut self, validator: usize) -> Result<()> {
717        let namespace = format!("{}_server_{}_db", self.common_namespace, validator);
718        let storage_config = self.common_storage_config.clone();
719        let storage = StorageConfigNamespace {
720            storage_config,
721            namespace,
722        };
723
724        let mut command = self.command_for_binary("linera-server").await?;
725        if let Ok(var) = env::var(SERVER_ENV) {
726            command.args(var.split_whitespace());
727        }
728        command.arg("initialize");
729        command
730            .args(["--storage", &storage.to_string()])
731            .args(["--genesis", "genesis.json"])
732            .spawn_and_wait_for_stdout()
733            .await?;
734
735        self.initialized_validator_storages
736            .insert(validator, storage);
737        Ok(())
738    }
739
740    async fn run_server(&mut self, validator: usize, shard: usize) -> Result<Child> {
741        let mut storage = self
742            .initialized_validator_storages
743            .get(&validator)
744            .expect("initialized storage")
745            .clone();
746
747        // For the storage backends with a local directory, make sure that we don't reuse
748        // the same directory for all the shards.
749        storage.storage_config.maybe_append_shard_path(shard)?;
750
751        let mut command = self.command_for_binary("linera-server").await?;
752        if let Ok(var) = env::var(SERVER_ENV) {
753            command.args(var.split_whitespace());
754        }
755        command
756            .arg("run")
757            .args(["--storage", &storage.to_string()])
758            .args(["--server", &format!("server_{}.json", validator)])
759            .args(["--shard", &shard.to_string()])
760            .args(["--genesis", "genesis.json"])
761            .args(self.cross_chain_config.to_args());
762        let child = command.spawn_into()?;
763
764        let port = Self::shard_port(validator, shard);
765        let nickname = format!("validator server {validator}:{shard}");
766        match self.network.internal {
767            Network::Grpc => {
768                Self::ensure_grpc_server_has_started(&nickname, port, "http").await?;
769            }
770            Network::Grpcs => {
771                Self::ensure_grpc_server_has_started(&nickname, port, "https").await?;
772            }
773            Network::Tcp => {
774                Self::ensure_simple_server_has_started(&nickname, port, "tcp").await?;
775            }
776            Network::Udp => {
777                Self::ensure_simple_server_has_started(&nickname, port, "udp").await?;
778            }
779        }
780        Ok(child)
781    }
782
783    async fn run(&mut self) -> Result<()> {
784        for validator in 0..self.num_initial_validators {
785            self.start_validator(validator).await?;
786        }
787        Ok(())
788    }
789
790    /// Start a validator.
791    pub async fn start_validator(&mut self, index: usize) -> Result<()> {
792        self.initialize_storage(index).await?;
793        self.restart_validator(index).await
794    }
795
796    /// Restart a validator. This is similar to `start_validator` except that the
797    /// database was already initialized once.
798    pub async fn restart_validator(&mut self, index: usize) -> Result<()> {
799        let proxy = self.run_proxy(index).await?;
800        let mut validator = Validator::new(proxy);
801        for shard in 0..self.num_shards {
802            let server = self.run_server(index, shard).await?;
803            validator.add_server(server);
804        }
805        for block_exporter in 0..self.num_block_exporters {
806            let exporter = self.run_exporter(index, block_exporter).await?;
807            validator.add_block_exporter(exporter);
808        }
809        self.running_validators.insert(index, validator);
810        Ok(())
811    }
812
813    /// Terminates all the processes of a given validator.
814    pub async fn stop_validator(&mut self, index: usize) -> Result<()> {
815        if let Some(mut validator) = self.running_validators.remove(&index) {
816            if let Err(error) = validator.terminate().await {
817                error!("Failed to stop validator {index}: {error}");
818                return Err(error);
819            }
820        }
821        Ok(())
822    }
823
824    /// Returns a [`linera_rpc::Client`] to interact directly with a `validator`.
825    pub async fn validator_client(&mut self, validator: usize) -> Result<linera_rpc::Client> {
826        let node_provider = linera_rpc::NodeProvider::new(linera_rpc::NodeOptions {
827            send_timeout: Duration::from_secs(1),
828            recv_timeout: Duration::from_secs(1),
829            retry_delay: Duration::ZERO,
830            max_retries: 0,
831        });
832
833        Ok(node_provider.make_node(&self.validator_address(validator))?)
834    }
835
836    /// Returns the address to connect to a validator's proxy.
837    /// In local networks, the zeroth proxy _is_ the validator ingress.
838    pub fn validator_address(&self, validator: usize) -> String {
839        let port = Self::proxy_public_port(validator, 0);
840        let schema = self.network.external.schema();
841
842        format!("{schema}:localhost:{port}")
843    }
844}
845
846#[cfg(with_testing)]
847impl LocalNet {
848    /// Returns the validating key and an account key of the validator.
849    pub fn validator_keys(&self, validator: usize) -> Option<&(String, String)> {
850        self.validator_keys.get(&validator)
851    }
852
853    pub async fn generate_validator_config(&mut self, validator: usize) -> Result<()> {
854        let stdout = self
855            .command_for_binary("linera-server")
856            .await?
857            .arg("generate")
858            .arg("--validators")
859            .arg(&self.configuration_string(validator)?)
860            .spawn_and_wait_for_stdout()
861            .await?;
862        let keys = stdout
863            .trim()
864            .split(',')
865            .map(str::to_string)
866            .collect::<Vec<_>>();
867        self.validator_keys
868            .insert(validator, (keys[0].clone(), keys[1].clone()));
869        Ok(())
870    }
871
872    pub async fn terminate_server(&mut self, validator: usize, shard: usize) -> Result<()> {
873        self.running_validators
874            .get_mut(&validator)
875            .context("server not found")?
876            .terminate_server(shard)
877            .await?;
878        Ok(())
879    }
880
881    pub fn remove_validator(&mut self, validator: usize) -> Result<()> {
882        self.running_validators
883            .remove(&validator)
884            .context("validator not found")?;
885        Ok(())
886    }
887
888    pub async fn start_server(&mut self, validator: usize, shard: usize) -> Result<()> {
889        let server = self.run_server(validator, shard).await?;
890        self.running_validators
891            .get_mut(&validator)
892            .context("could not find validator")?
893            .add_server(server);
894        Ok(())
895    }
896}