linera_service/cli_wrappers/
local_kubernetes_net.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use anyhow::{anyhow, bail, ensure, Result};
7use async_trait::async_trait;
8use futures::{future, lock::Mutex};
9use k8s_openapi::api::core::v1::Pod;
10use kube::{api::ListParams, Api, Client};
11use linera_base::{
12    command::{resolve_binary, CommandExt},
13    data_types::Amount,
14};
15use linera_client::client_options::ResourceControlPolicyConfig;
16use tempfile::{tempdir, TempDir};
17use tokio::process::Command;
18#[cfg(with_testing)]
19use {linera_base::command::current_binary_parent, tokio::sync::OnceCell};
20
21use crate::cli_wrappers::{
22    docker::{BuildArg, DockerImage},
23    helmfile::HelmFile,
24    kind::KindCluster,
25    kubectl::KubectlInstance,
26    local_net::PathProvider,
27    util::get_github_root,
28    ClientWrapper, LineraNet, LineraNetConfig, Network, OnClientDrop,
29};
30
31#[cfg(with_testing)]
32static SHARED_LOCAL_KUBERNETES_TESTING_NET: OnceCell<(
33    Arc<Mutex<LocalKubernetesNet>>,
34    ClientWrapper,
35)> = OnceCell::const_new();
36
37#[derive(Clone, clap::Parser, clap::ValueEnum, Debug, Default)]
38pub enum BuildMode {
39    Debug,
40    #[default]
41    Release,
42}
43
44impl std::str::FromStr for BuildMode {
45    type Err = String;
46
47    fn from_str(s: &str) -> Result<Self, Self::Err> {
48        clap::ValueEnum::from_str(s, true)
49    }
50}
51
52impl std::fmt::Display for BuildMode {
53    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
54        write!(f, "{:?}", self)
55    }
56}
57
58/// The information needed to start a [`LocalKubernetesNet`].
59pub struct LocalKubernetesNetConfig {
60    pub network: Network,
61    pub testing_prng_seed: Option<u64>,
62    pub num_other_initial_chains: u32,
63    pub initial_amount: Amount,
64    pub num_initial_validators: usize,
65    pub num_shards: usize,
66    pub binaries: BuildArg,
67    pub no_build: bool,
68    pub docker_image_name: String,
69    pub build_mode: BuildMode,
70    pub policy_config: ResourceControlPolicyConfig,
71    pub dual_store: bool,
72}
73
74/// A wrapper of [`LocalKubernetesNetConfig`] to create a shared local Kubernetes network
75/// or use an existing one.
76#[cfg(with_testing)]
77pub struct SharedLocalKubernetesNetTestingConfig(LocalKubernetesNetConfig);
78
79/// A set of Linera validators running locally as native processes.
80#[derive(Clone)]
81pub struct LocalKubernetesNet {
82    network: Network,
83    testing_prng_seed: Option<u64>,
84    next_client_id: usize,
85    tmp_dir: Arc<TempDir>,
86    binaries: BuildArg,
87    no_build: bool,
88    docker_image_name: String,
89    build_mode: BuildMode,
90    kubectl_instance: Arc<Mutex<KubectlInstance>>,
91    kind_clusters: Vec<KindCluster>,
92    num_initial_validators: usize,
93    num_shards: usize,
94    dual_store: bool,
95}
96
97#[cfg(with_testing)]
98impl SharedLocalKubernetesNetTestingConfig {
99    // The second argument is sometimes used locally to use specific binaries for tests.
100    pub fn new(network: Network, mut binaries: BuildArg) -> Self {
101        if std::env::var("LINERA_TRY_RELEASE_BINARIES").unwrap_or_default() == "true"
102            && matches!(binaries, BuildArg::Build)
103        {
104            // For cargo test, current binary should be in debug mode
105            let current_binary_parent =
106                current_binary_parent().expect("Fetching current binaries path should not fail");
107            // But binaries for cluster should be release mode
108            let binaries_dir = current_binary_parent
109                .parent()
110                .expect("Getting parent should not fail")
111                .join("release");
112            if binaries_dir.exists() {
113                // If release exists, use those binaries
114                binaries = BuildArg::Directory(binaries_dir);
115            }
116        }
117        Self(LocalKubernetesNetConfig {
118            network,
119            testing_prng_seed: Some(37),
120            num_other_initial_chains: 2,
121            initial_amount: Amount::from_tokens(2000),
122            num_initial_validators: 4,
123            num_shards: 4,
124            binaries,
125            no_build: false,
126            docker_image_name: String::from("linera:latest"),
127            build_mode: BuildMode::Release,
128            policy_config: ResourceControlPolicyConfig::Testnet,
129            dual_store: false,
130        })
131    }
132}
133
134#[async_trait]
135impl LineraNetConfig for LocalKubernetesNetConfig {
136    type Net = LocalKubernetesNet;
137
138    async fn instantiate(self) -> Result<(Self::Net, ClientWrapper)> {
139        ensure!(
140            self.num_initial_validators > 0,
141            "There should be at least one initial validator"
142        );
143
144        let clusters = future::join_all((0..self.num_initial_validators).map(|_| async {
145            KindCluster::create()
146                .await
147                .expect("Creating kind cluster should not fail")
148        }))
149        .await;
150
151        let mut net = LocalKubernetesNet::new(
152            self.network,
153            self.testing_prng_seed,
154            self.binaries,
155            self.no_build,
156            self.docker_image_name,
157            self.build_mode,
158            KubectlInstance::new(Vec::new()),
159            clusters,
160            self.num_initial_validators,
161            self.num_shards,
162            self.dual_store,
163        )?;
164
165        let client = net.make_client().await;
166        net.generate_initial_validator_config().await.unwrap();
167        client
168            .create_genesis_config(
169                self.num_other_initial_chains,
170                self.initial_amount,
171                self.policy_config,
172                Some(vec!["localhost".to_owned()]),
173            )
174            .await
175            .unwrap();
176        net.run().await.unwrap();
177
178        Ok((net, client))
179    }
180}
181
182#[cfg(with_testing)]
183#[async_trait]
184impl LineraNetConfig for SharedLocalKubernetesNetTestingConfig {
185    type Net = Arc<Mutex<LocalKubernetesNet>>;
186
187    async fn instantiate(self) -> Result<(Self::Net, ClientWrapper)> {
188        let (net, initial_client) = SHARED_LOCAL_KUBERNETES_TESTING_NET
189            .get_or_init(|| async {
190                let (net, initial_client) = self
191                    .0
192                    .instantiate()
193                    .await
194                    .expect("Instantiating LocalKubernetesNetConfig should not fail");
195                (Arc::new(Mutex::new(net)), initial_client)
196            })
197            .await;
198
199        let mut net = net.clone();
200        let client = net.make_client().await;
201        // The tests assume we've created a genesis config with 2
202        // chains with 10 tokens each.
203        client.wallet_init(None).await.unwrap();
204        for _ in 0..2 {
205            initial_client
206                .open_and_assign(&client, Amount::from_tokens(10))
207                .await
208                .unwrap();
209        }
210
211        Ok((net, client))
212    }
213}
214
215#[async_trait]
216impl LineraNet for Arc<Mutex<LocalKubernetesNet>> {
217    async fn ensure_is_running(&mut self) -> Result<()> {
218        let self_clone = self.clone();
219        let mut self_lock = self_clone.lock().await;
220
221        self_lock.ensure_is_running().await
222    }
223
224    async fn make_client(&mut self) -> ClientWrapper {
225        let self_clone = self.clone();
226        let mut self_lock = self_clone.lock().await;
227
228        self_lock.make_client().await
229    }
230
231    async fn terminate(&mut self) -> Result<()> {
232        // Users are responsible for killing the clusters if they want to
233        Ok(())
234    }
235}
236
237#[async_trait]
238impl LineraNet for LocalKubernetesNet {
239    async fn ensure_is_running(&mut self) -> Result<()> {
240        let client = Client::try_default().await?;
241        let pods: Api<Pod> = Api::namespaced(client, "default");
242
243        let list_params = ListParams::default().labels("app=proxy");
244        for pod in pods.list(&list_params).await? {
245            if let Some(status) = pod.status {
246                if let Some(phase) = status.phase {
247                    if phase != "Running" {
248                        bail!(
249                            "Validator {} is not Running",
250                            pod.metadata
251                                .name
252                                .expect("Fetching pod name should not fail")
253                        );
254                    }
255                }
256            }
257        }
258
259        let list_params = ListParams::default().labels("app=shards");
260        for pod in pods.list(&list_params).await? {
261            if let Some(status) = pod.status {
262                if let Some(phase) = status.phase {
263                    if phase != "Running" {
264                        bail!(
265                            "Shard {} is not Running",
266                            pod.metadata
267                                .name
268                                .expect("Fetching pod name should not fail")
269                        );
270                    }
271                }
272            }
273        }
274
275        Ok(())
276    }
277
278    async fn make_client(&mut self) -> ClientWrapper {
279        let path_provider = PathProvider::TemporaryDirectory {
280            tmp_dir: self.tmp_dir.clone(),
281        };
282        let client = ClientWrapper::new(
283            path_provider,
284            self.network,
285            self.testing_prng_seed,
286            self.next_client_id,
287            OnClientDrop::LeakChains,
288        );
289        if let Some(seed) = self.testing_prng_seed {
290            self.testing_prng_seed = Some(seed + 1);
291        }
292        self.next_client_id += 1;
293        client
294    }
295
296    async fn terminate(&mut self) -> Result<()> {
297        let mut kubectl_instance = self.kubectl_instance.lock().await;
298        let mut errors = Vec::new();
299        for port_forward_child in &mut kubectl_instance.port_forward_children {
300            if let Err(e) = port_forward_child.kill().await {
301                errors.push(e.into());
302            }
303        }
304
305        for kind_cluster in &mut self.kind_clusters {
306            if let Err(e) = kind_cluster.delete().await {
307                errors.push(e);
308            }
309        }
310
311        if errors.is_empty() {
312            Ok(())
313        } else {
314            let err_str = if errors.len() > 1 {
315                "Multiple errors"
316            } else {
317                "One error"
318            };
319
320            Err(errors
321                .into_iter()
322                .fold(anyhow!("{err_str} occurred"), |acc, e: anyhow::Error| {
323                    acc.context(e)
324                }))
325        }
326    }
327}
328
329impl LocalKubernetesNet {
330    #[expect(clippy::too_many_arguments)]
331    fn new(
332        network: Network,
333        testing_prng_seed: Option<u64>,
334        binaries: BuildArg,
335        no_build: bool,
336        docker_image_name: String,
337        build_mode: BuildMode,
338        kubectl_instance: KubectlInstance,
339        kind_clusters: Vec<KindCluster>,
340        num_initial_validators: usize,
341        num_shards: usize,
342        dual_store: bool,
343    ) -> Result<Self> {
344        Ok(Self {
345            network,
346            testing_prng_seed,
347            next_client_id: 0,
348            tmp_dir: Arc::new(tempdir()?),
349            binaries,
350            no_build,
351            docker_image_name,
352            build_mode,
353            kubectl_instance: Arc::new(Mutex::new(kubectl_instance)),
354            kind_clusters,
355            num_initial_validators,
356            num_shards,
357            dual_store,
358        })
359    }
360
361    async fn command_for_binary(&self, name: &'static str) -> Result<Command> {
362        let path = resolve_binary(name, env!("CARGO_PKG_NAME")).await?;
363        let mut command = Command::new(path);
364        command.current_dir(self.tmp_dir.path());
365        Ok(command)
366    }
367
368    fn configuration_string(&self, server_number: usize) -> Result<String> {
369        let n = server_number;
370        let path = self.tmp_dir.path().join(format!("validator_{n}.toml"));
371        let port = 19100 + server_number;
372        let internal_port = 20100;
373        let metrics_port = 21100;
374        let mut content = format!(
375            r#"
376                server_config_path = "server_{n}.json"
377                host = "127.0.0.1"
378                port = {port}
379                [external_protocol]
380                Grpc = "ClearText"
381                [internal_protocol]
382                Grpc = "ClearText"
383
384                [[proxies]]
385                host = "proxy-0.default.svc.cluster.local"
386                public_port = {port}
387                private_port = {internal_port}
388                metrics_port = {metrics_port}
389            "#
390        );
391
392        for k in 0..self.num_shards {
393            let shard_port = 19100;
394            let shard_metrics_port = 21100;
395            content.push_str(&format!(
396                r#"
397
398                [[shards]]
399                host = "shards-{k}.shards.default.svc.cluster.local"
400                port = {shard_port}
401                metrics_port = {shard_metrics_port}
402                "#
403            ));
404        }
405        fs_err::write(&path, content)?;
406        path.into_os_string().into_string().map_err(|error| {
407            anyhow!(
408                "could not parse OS string into string: {}",
409                error.to_string_lossy()
410            )
411        })
412    }
413
414    async fn generate_initial_validator_config(&mut self) -> Result<()> {
415        let mut command = self.command_for_binary("linera-server").await?;
416        command.arg("generate");
417        if let Some(seed) = self.testing_prng_seed {
418            command.arg("--testing-prng-seed").arg(seed.to_string());
419            self.testing_prng_seed = Some(seed + 1);
420        }
421        command.arg("--validators");
422        for i in 0..self.num_initial_validators {
423            command.arg(&self.configuration_string(i)?);
424        }
425        command
426            .args(["--committee", "committee.json"])
427            .spawn_and_wait_for_stdout()
428            .await?;
429        Ok(())
430    }
431
432    async fn run(&mut self) -> Result<()> {
433        let github_root = get_github_root().await?;
434        // Build Docker image
435        let docker_image_name = if self.no_build {
436            self.docker_image_name.clone()
437        } else {
438            DockerImage::build(
439                &self.docker_image_name,
440                &self.binaries,
441                &github_root,
442                &self.build_mode,
443                self.dual_store,
444            )
445            .await?;
446            self.docker_image_name.clone()
447        };
448
449        let base_dir = github_root
450            .join("kubernetes")
451            .join("linera-validator")
452            .join("working");
453        fs_err::copy(
454            self.tmp_dir.path().join("genesis.json"),
455            base_dir.join("genesis.json"),
456        )?;
457
458        let kubectl_instance_clone = self.kubectl_instance.clone();
459        let tmp_dir_path_clone = self.tmp_dir.path().to_path_buf();
460        let num_shards = self.num_shards;
461
462        let mut validators_initialization_futures = Vec::new();
463        for (i, kind_cluster) in self.kind_clusters.iter().cloned().enumerate() {
464            let base_dir = base_dir.clone();
465            let github_root = github_root.clone();
466
467            let kubectl_instance = kubectl_instance_clone.clone();
468            let tmp_dir_path = tmp_dir_path_clone.clone();
469
470            let docker_image_name = docker_image_name.clone();
471            let dual_store = self.dual_store;
472            let future = async move {
473                let cluster_id = kind_cluster.id();
474                kind_cluster.load_docker_image(&docker_image_name).await?;
475
476                let server_config_filename = format!("server_{}.json", i);
477                fs_err::copy(
478                    tmp_dir_path.join(&server_config_filename),
479                    base_dir.join(&server_config_filename),
480                )?;
481
482                HelmFile::sync(
483                    i,
484                    &github_root,
485                    num_shards,
486                    cluster_id,
487                    docker_image_name,
488                    dual_store,
489                )
490                .await?;
491
492                let mut kubectl_instance = kubectl_instance.lock().await;
493                let proxy_service = "svc/proxy";
494
495                let local_port = 19100 + i;
496                kubectl_instance.port_forward(
497                    proxy_service,
498                    &format!("{local_port}:{local_port}"),
499                    cluster_id,
500                )?;
501
502                Result::<(), anyhow::Error>::Ok(())
503            };
504
505            validators_initialization_futures.push(future);
506        }
507
508        future::join_all(validators_initialization_futures)
509            .await
510            .into_iter()
511            .collect()
512    }
513}