linera_service/cli/
net_up_utils.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{num::NonZeroU16, str::FromStr};
5
6use colored::Colorize as _;
7use linera_base::{data_types::Amount, listen_for_shutdown_signals, time::Duration};
8use linera_client::client_options::ResourceControlPolicyConfig;
9use linera_rpc::config::{CrossChainConfig, ExporterServiceConfig};
10#[cfg(feature = "storage-service")]
11use linera_storage_service::{
12    child::{StorageService, StorageServiceGuard},
13    common::get_service_storage_binary,
14};
15use tokio_util::sync::CancellationToken;
16use tracing::info;
17#[cfg(feature = "kubernetes")]
18use {
19    crate::cli_wrappers::local_kubernetes_net::{BuildMode, LocalKubernetesNetConfig},
20    std::path::PathBuf,
21};
22
23use crate::{
24    cli_wrappers::{
25        local_net::{
26            Database, ExportersSetup, InnerStorageConfigBuilder, LocalNetConfig, PathProvider,
27        },
28        ClientWrapper, FaucetService, LineraNet, LineraNetConfig, Network, NetworkConfig,
29    },
30    storage::{InnerStorageConfig, StorageConfig},
31};
32
33struct StorageConfigProvider {
34    /// The storage config.
35    config: StorageConfig,
36    #[cfg(feature = "storage-service")]
37    _service_guard: Option<StorageServiceGuard>,
38}
39
40impl StorageConfigProvider {
41    pub async fn new(storage: &Option<String>) -> anyhow::Result<StorageConfigProvider> {
42        match storage {
43            #[cfg(feature = "storage-service")]
44            None => {
45                let service_endpoint = linera_base::port::get_free_endpoint().await?;
46                let binary = get_service_storage_binary().await?.display().to_string();
47                let service = StorageService::new(&service_endpoint, binary);
48                let _service_guard = service.run().await?;
49                let _service_guard = Some(_service_guard);
50                let inner_storage_config = InnerStorageConfig::Service {
51                    endpoint: service_endpoint,
52                };
53                let namespace = "table_default".to_string();
54                let config = StorageConfig {
55                    inner_storage_config,
56                    namespace,
57                };
58                Ok(StorageConfigProvider {
59                    config,
60                    _service_guard,
61                })
62            }
63            #[cfg(not(feature = "storage-service"))]
64            None => {
65                panic!("When storage is not selected, the storage-service needs to be enabled");
66            }
67            #[cfg(feature = "storage-service")]
68            Some(storage) => {
69                let config = StorageConfig::from_str(storage)?;
70                Ok(StorageConfigProvider {
71                    config,
72                    _service_guard: None,
73                })
74            }
75            #[cfg(not(feature = "storage-service"))]
76            Some(storage) => {
77                let config = StorageConfig::from_str(storage)?;
78                Ok(StorageConfigProvider { config })
79            }
80        }
81    }
82
83    pub fn inner_storage_config(&self) -> &InnerStorageConfig {
84        &self.config.inner_storage_config
85    }
86
87    pub fn namespace(&self) -> &str {
88        &self.config.namespace
89    }
90
91    pub fn database(&self) -> anyhow::Result<Database> {
92        match self.config.inner_storage_config {
93            InnerStorageConfig::Memory { .. } => anyhow::bail!("Not possible to work with memory"),
94            #[cfg(feature = "rocksdb")]
95            InnerStorageConfig::RocksDb { .. } => {
96                anyhow::bail!("Not possible to work with RocksDB")
97            }
98            #[cfg(feature = "storage-service")]
99            InnerStorageConfig::Service { .. } => Ok(Database::Service),
100            #[cfg(feature = "dynamodb")]
101            InnerStorageConfig::DynamoDb { .. } => Ok(Database::DynamoDb),
102            #[cfg(feature = "scylladb")]
103            InnerStorageConfig::ScyllaDb { .. } => Ok(Database::ScyllaDb),
104            #[cfg(all(feature = "rocksdb", feature = "scylladb"))]
105            InnerStorageConfig::DualRocksDbScyllaDb { .. } => Ok(Database::DualRocksDbScyllaDb),
106        }
107    }
108}
109
110#[expect(clippy::too_many_arguments)]
111#[cfg(feature = "kubernetes")]
112pub async fn handle_net_up_kubernetes(
113    num_other_initial_chains: u32,
114    initial_amount: u128,
115    num_initial_validators: usize,
116    num_shards: usize,
117    testing_prng_seed: Option<u64>,
118    binaries: &Option<Option<PathBuf>>,
119    no_build: bool,
120    docker_image_name: String,
121    build_mode: BuildMode,
122    policy_config: ResourceControlPolicyConfig,
123    with_faucet: bool,
124    faucet_chain: Option<u32>,
125    faucet_port: NonZeroU16,
126    faucet_amount: Amount,
127    dual_store: bool,
128) -> anyhow::Result<()> {
129    assert!(
130        num_initial_validators >= 1,
131        "The local test network must have at least one validator."
132    );
133    assert!(
134        num_shards >= 1,
135        "The local test network must have at least one shard per validator."
136    );
137    if faucet_chain.is_some() {
138        assert!(
139            with_faucet,
140            "--faucet-chain must be provided only with --with-faucet"
141        );
142    }
143
144    let shutdown_notifier = CancellationToken::new();
145    tokio::spawn(listen_for_shutdown_signals(shutdown_notifier.clone()));
146
147    let config = LocalKubernetesNetConfig {
148        network: Network::Grpc,
149        testing_prng_seed,
150        num_other_initial_chains,
151        initial_amount: Amount::from_tokens(initial_amount),
152        num_initial_validators,
153        num_shards,
154        binaries: binaries.clone().into(),
155        no_build,
156        docker_image_name,
157        build_mode,
158        policy_config,
159        dual_store,
160    };
161    let (mut net, client) = config.instantiate().await?;
162    let faucet_service = print_messages_and_create_faucet(
163        client,
164        with_faucet,
165        faucet_chain,
166        faucet_port,
167        faucet_amount,
168        num_other_initial_chains,
169    )
170    .await?;
171    wait_for_shutdown(shutdown_notifier, &mut net, faucet_service).await
172}
173
174#[expect(clippy::too_many_arguments)]
175pub async fn handle_net_up_service(
176    num_other_initial_chains: u32,
177    initial_amount: u128,
178    num_initial_validators: usize,
179    num_shards: usize,
180    testing_prng_seed: Option<u64>,
181    policy_config: ResourceControlPolicyConfig,
182    cross_chain_config: CrossChainConfig,
183    with_block_exporter: bool,
184    block_exporter_address: String,
185    block_exporter_port: NonZeroU16,
186    path: &Option<String>,
187    storage: &Option<String>,
188    external_protocol: String,
189    with_faucet: bool,
190    faucet_chain: Option<u32>,
191    faucet_port: NonZeroU16,
192    faucet_amount: Amount,
193) -> anyhow::Result<()> {
194    assert!(
195        num_initial_validators >= 1,
196        "The local test network must have at least one validator."
197    );
198    assert!(
199        num_shards >= 1,
200        "The local test network must have at least one shard per validator."
201    );
202
203    let shutdown_notifier = CancellationToken::new();
204    tokio::spawn(listen_for_shutdown_signals(shutdown_notifier.clone()));
205
206    let storage = StorageConfigProvider::new(storage).await?;
207    let storage_config = storage.inner_storage_config().clone();
208    let namespace = storage.namespace().to_string();
209    let database = storage.database()?;
210    let storage_config_builder = InnerStorageConfigBuilder::ExistingConfig { storage_config };
211    let external = match external_protocol.as_str() {
212        "grpc" => Network::Grpc,
213        "grpcs" => Network::Grpcs,
214        _ => panic!("Only allowed options are grpc and grpcs"),
215    };
216    let internal = Network::Grpc;
217    let network = NetworkConfig { external, internal };
218    let path_provider = PathProvider::from_path_option(path)?;
219    let num_proxies = 1; // Local networks currently support exactly 1 proxy.
220    let block_exporters = if with_block_exporter {
221        let exporter_config = ExporterServiceConfig {
222            host: block_exporter_address,
223            port: block_exporter_port.into(),
224        };
225        ExportersSetup::Remote(vec![exporter_config])
226    } else {
227        ExportersSetup::Local(vec![])
228    };
229    let config = LocalNetConfig {
230        network,
231        database,
232        testing_prng_seed,
233        namespace,
234        num_other_initial_chains,
235        initial_amount: Amount::from_tokens(initial_amount),
236        num_initial_validators,
237        num_shards,
238        num_proxies,
239        policy_config,
240        cross_chain_config,
241        storage_config_builder,
242        path_provider,
243        block_exporters,
244    };
245    let (mut net, client) = config.instantiate().await?;
246    let faucet_service = print_messages_and_create_faucet(
247        client,
248        with_faucet,
249        faucet_chain,
250        faucet_port,
251        faucet_amount,
252        num_other_initial_chains,
253    )
254    .await?;
255
256    wait_for_shutdown(shutdown_notifier, &mut net, faucet_service).await
257}
258
259async fn wait_for_shutdown(
260    shutdown_notifier: CancellationToken,
261    net: &mut impl LineraNet,
262    faucet_service: Option<FaucetService>,
263) -> anyhow::Result<()> {
264    shutdown_notifier.cancelled().await;
265    eprintln!();
266    if let Some(service) = faucet_service {
267        eprintln!("Terminating the faucet service");
268        service.terminate().await?;
269    }
270    eprintln!("Terminating the local test network");
271    net.terminate().await?;
272    eprintln!("Done.");
273
274    Ok(())
275}
276
277async fn print_messages_and_create_faucet(
278    client: ClientWrapper,
279    with_faucet: bool,
280    faucet_chain: Option<u32>,
281    faucet_port: NonZeroU16,
282    faucet_amount: Amount,
283    num_other_initial_chains: u32,
284) -> Result<Option<FaucetService>, anyhow::Error> {
285    // Make time to (hopefully) display the message after the tracing logs.
286    linera_base::time::timer::sleep(Duration::from_secs(1)).await;
287
288    // Create the wallet for the initial "root" chains.
289    info!("Local test network successfully started.");
290
291    eprintln!(
292        "To use the admin wallet of this test network, you may set \
293         the environment variables LINERA_WALLET, LINERA_KEYSTORE, \
294         and LINERA_STORAGE as follows.\n"
295    );
296    println!(
297        "{}",
298        format!(
299            "export LINERA_WALLET=\"{}\"",
300            client.wallet_path().display()
301        )
302        .bold()
303    );
304    println!(
305        "{}",
306        format!(
307            "export LINERA_KEYSTORE=\"{}\"",
308            client.keystore_path().display()
309        )
310        .bold()
311    );
312    println!(
313        "{}",
314        format!("export LINERA_STORAGE=\"{}\"\n", client.storage_path()).bold()
315    );
316
317    let wallet = client.load_wallet()?;
318    let chains = wallet.chain_ids();
319
320    // Run the faucet,
321    let faucet_service = if with_faucet {
322        let faucet_chain_idx = faucet_chain.unwrap_or(0);
323        assert!(
324            num_other_initial_chains > faucet_chain_idx,
325            "num_other_initial_chains must be strictly greater than the faucet chain index if \
326            with_faucet is true"
327        );
328        // This picks a lexicographically faucet_chain_idx-th non-admin chain.
329        let faucet_chain = chains
330            .into_iter()
331            .filter(|chain_id| *chain_id != wallet.genesis_admin_chain())
332            .nth(faucet_chain_idx as usize)
333            .unwrap(); // we checked that there are enough chains above, so this should be safe
334        let service = client
335            .run_faucet(Some(faucet_port.into()), faucet_chain, faucet_amount)
336            .await?;
337        Some(service)
338    } else {
339        None
340    };
341
342    eprintln!(
343        "\nREADY!\nPress ^C to terminate the local test network and clean the temporary directory."
344    );
345
346    Ok(faucet_service)
347}