linera_service/cli_wrappers/
local_kubernetes_net.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use anyhow::{anyhow, bail, ensure, Result};
7use async_trait::async_trait;
8use futures::{future, lock::Mutex};
9use k8s_openapi::api::core::v1::Pod;
10use kube::{api::ListParams, Api, Client};
11use linera_base::{
12    command::{resolve_binary, CommandExt},
13    data_types::Amount,
14};
15use linera_client::client_options::ResourceControlPolicyConfig;
16use tokio::{process::Command, task::JoinSet};
17
18use crate::cli_wrappers::{
19    docker::{BuildArg, DockerImage, Dockerfile},
20    helmfile::{HelmFile, DEFAULT_BLOCK_EXPORTER_PORT},
21    kind::KindCluster,
22    kubectl::KubectlInstance,
23    local_net::PathProvider,
24    util::get_github_root,
25    ClientWrapper, LineraNet, LineraNetConfig, Network, OnClientDrop,
26};
27
28#[derive(Clone, clap::Parser, clap::ValueEnum, Debug, Default)]
29pub enum BuildMode {
30    Debug,
31    #[default]
32    Release,
33}
34
35impl std::str::FromStr for BuildMode {
36    type Err = String;
37
38    fn from_str(s: &str) -> Result<Self, Self::Err> {
39        clap::ValueEnum::from_str(s, true)
40    }
41}
42
43impl std::fmt::Display for BuildMode {
44    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
45        write!(f, "{:?}", self)
46    }
47}
48
49/// The information needed to start a [`LocalKubernetesNet`].
50pub struct LocalKubernetesNetConfig {
51    pub network: Network,
52    pub testing_prng_seed: Option<u64>,
53    pub num_other_initial_chains: u32,
54    pub initial_amount: Amount,
55    pub num_initial_validators: usize,
56    pub num_proxies: usize,
57    pub num_shards: usize,
58    pub binaries: BuildArg,
59    pub no_build: bool,
60    pub docker_image_name: String,
61    pub build_mode: BuildMode,
62    pub policy_config: ResourceControlPolicyConfig,
63    pub num_block_exporters: usize,
64    pub indexer_image_name: String,
65    pub explorer_image_name: String,
66    pub dual_store: bool,
67    pub path_provider: PathProvider,
68}
69
70/// A set of Linera validators running locally as native processes.
71#[derive(Clone)]
72pub struct LocalKubernetesNet {
73    network: Network,
74    testing_prng_seed: Option<u64>,
75    next_client_id: usize,
76    binaries: BuildArg,
77    no_build: bool,
78    docker_image_name: String,
79    build_mode: BuildMode,
80    kubectl_instance: Arc<Mutex<KubectlInstance>>,
81    kind_clusters: Vec<KindCluster>,
82    num_initial_validators: usize,
83    num_proxies: usize,
84    num_shards: usize,
85    num_block_exporters: usize,
86    indexer_image_name: String,
87    explorer_image_name: String,
88    dual_store: bool,
89    path_provider: PathProvider,
90}
91
92#[async_trait]
93impl LineraNetConfig for LocalKubernetesNetConfig {
94    type Net = LocalKubernetesNet;
95
96    async fn instantiate(self) -> Result<(Self::Net, ClientWrapper)> {
97        ensure!(
98            self.num_initial_validators > 0,
99            "There should be at least one initial validator"
100        );
101
102        let clusters = future::join_all((0..self.num_initial_validators).map(|_| async {
103            KindCluster::create()
104                .await
105                .expect("Creating kind cluster should not fail")
106        }))
107        .await;
108
109        let mut net = LocalKubernetesNet::new(
110            self.network,
111            self.testing_prng_seed,
112            self.binaries,
113            self.no_build,
114            self.docker_image_name,
115            self.build_mode,
116            KubectlInstance::new(Vec::new()),
117            clusters,
118            self.num_initial_validators,
119            self.num_proxies,
120            self.num_shards,
121            self.num_block_exporters,
122            self.indexer_image_name,
123            self.explorer_image_name,
124            self.dual_store,
125            self.path_provider,
126        )?;
127
128        let client = net.make_client().await;
129        net.generate_initial_validator_config().await.unwrap();
130        client
131            .create_genesis_config(
132                self.num_other_initial_chains,
133                self.initial_amount,
134                self.policy_config,
135                Some(vec!["localhost".to_owned()]),
136            )
137            .await
138            .unwrap();
139        net.run().await.unwrap();
140
141        Ok((net, client))
142    }
143}
144
145#[async_trait]
146impl LineraNet for Arc<Mutex<LocalKubernetesNet>> {
147    async fn ensure_is_running(&mut self) -> Result<()> {
148        let self_clone = self.clone();
149        let mut self_lock = self_clone.lock().await;
150
151        self_lock.ensure_is_running().await
152    }
153
154    async fn make_client(&mut self) -> ClientWrapper {
155        let self_clone = self.clone();
156        let mut self_lock = self_clone.lock().await;
157
158        self_lock.make_client().await
159    }
160
161    async fn terminate(&mut self) -> Result<()> {
162        // Users are responsible for killing the clusters if they want to
163        Ok(())
164    }
165}
166
167#[async_trait]
168impl LineraNet for LocalKubernetesNet {
169    async fn ensure_is_running(&mut self) -> Result<()> {
170        let client = Client::try_default().await?;
171        let pods: Api<Pod> = Api::namespaced(client, "default");
172
173        let list_params = ListParams::default().labels("app=proxy");
174        for pod in pods.list(&list_params).await? {
175            if let Some(status) = pod.status {
176                if let Some(phase) = status.phase {
177                    if phase != "Running" {
178                        bail!(
179                            "Validator {} is not Running",
180                            pod.metadata
181                                .name
182                                .expect("Fetching pod name should not fail")
183                        );
184                    }
185                }
186            }
187        }
188
189        let list_params = ListParams::default().labels("app=shards");
190        for pod in pods.list(&list_params).await? {
191            if let Some(status) = pod.status {
192                if let Some(phase) = status.phase {
193                    if phase != "Running" {
194                        bail!(
195                            "Shard {} is not Running",
196                            pod.metadata
197                                .name
198                                .expect("Fetching pod name should not fail")
199                        );
200                    }
201                }
202            }
203        }
204
205        Ok(())
206    }
207
208    async fn make_client(&mut self) -> ClientWrapper {
209        let client = ClientWrapper::new(
210            self.path_provider.clone(),
211            self.network,
212            self.testing_prng_seed,
213            self.next_client_id,
214            OnClientDrop::LeakChains,
215        );
216        if let Some(seed) = self.testing_prng_seed {
217            self.testing_prng_seed = Some(seed + 1);
218        }
219        self.next_client_id += 1;
220        client
221    }
222
223    async fn terminate(&mut self) -> Result<()> {
224        let mut kubectl_instance = self.kubectl_instance.lock().await;
225        let mut errors = Vec::new();
226        for port_forward_child in &mut kubectl_instance.port_forward_children {
227            if let Err(e) = port_forward_child.kill().await {
228                errors.push(e.into());
229            }
230        }
231
232        for kind_cluster in &mut self.kind_clusters {
233            if let Err(e) = kind_cluster.delete().await {
234                errors.push(e);
235            }
236        }
237
238        if errors.is_empty() {
239            Ok(())
240        } else {
241            let err_str = if errors.len() > 1 {
242                "Multiple errors"
243            } else {
244                "One error"
245            };
246
247            Err(errors
248                .into_iter()
249                .fold(anyhow!("{err_str} occurred"), |acc, e: anyhow::Error| {
250                    acc.context(e)
251                }))
252        }
253    }
254}
255
256impl LocalKubernetesNet {
257    #[expect(clippy::too_many_arguments)]
258    fn new(
259        network: Network,
260        testing_prng_seed: Option<u64>,
261        binaries: BuildArg,
262        no_build: bool,
263        docker_image_name: String,
264        build_mode: BuildMode,
265        kubectl_instance: KubectlInstance,
266        kind_clusters: Vec<KindCluster>,
267        num_initial_validators: usize,
268        num_proxies: usize,
269        num_shards: usize,
270        num_block_exporters: usize,
271        indexer_image_name: String,
272        explorer_image_name: String,
273        dual_store: bool,
274        path_provider: PathProvider,
275    ) -> Result<Self> {
276        Ok(Self {
277            network,
278            testing_prng_seed,
279            next_client_id: 0,
280            binaries,
281            no_build,
282            docker_image_name,
283            build_mode,
284            kubectl_instance: Arc::new(Mutex::new(kubectl_instance)),
285            kind_clusters,
286            num_initial_validators,
287            num_proxies,
288            num_shards,
289            num_block_exporters,
290            indexer_image_name,
291            explorer_image_name,
292            dual_store,
293            path_provider,
294        })
295    }
296
297    async fn command_for_binary(&self, name: &'static str) -> Result<Command> {
298        let path = resolve_binary(name, env!("CARGO_PKG_NAME")).await?;
299        let mut command = Command::new(path);
300        command.current_dir(self.path_provider.path());
301        Ok(command)
302    }
303
304    fn configuration_string(&self, validator_number: usize) -> Result<String> {
305        let path = self
306            .path_provider
307            .path()
308            .join(format!("validator_{validator_number}.toml"));
309        let public_port = 19100 + validator_number;
310        let private_port = 20100;
311        let metrics_port = 21100;
312        let protocol = self.network.toml();
313        let host = self.network.localhost();
314        let mut content = format!(
315            r#"
316                server_config_path = "server_{validator_number}.json"
317                host = "{host}"
318                port = {public_port}
319                external_protocol = {protocol}
320                internal_protocol = {protocol}
321
322            "#
323        );
324
325        for proxy_id in 0..self.num_proxies {
326            content.push_str(&format!(
327                r#"
328                    [[proxies]]
329                    host = "proxy-{proxy_id}.proxy-internal.default.svc.cluster.local"
330                    public_port = {public_port}
331                    private_port = {private_port}
332                    metrics_port = {metrics_port}
333                "#
334            ));
335        }
336
337        for shard_id in 0..self.num_shards {
338            content.push_str(&format!(
339                r#"
340
341                [[shards]]
342                host = "shards-{shard_id}.shards.default.svc.cluster.local"
343                port = {public_port}
344                metrics_port = {metrics_port}
345                "#
346            ));
347        }
348
349        if self.num_block_exporters > 0 {
350            for exporter_num in 0..self.num_block_exporters {
351                let block_exporter_port = DEFAULT_BLOCK_EXPORTER_PORT;
352                let block_exporter_host =
353                    format!("linera-block-exporter-{exporter_num}.linera-block-exporter");
354                let config_content = format!(
355                    r#"
356
357                        [[block_exporters]]
358                        host = "{block_exporter_host}"
359                        port = {block_exporter_port}
360                        "#
361                );
362
363                content.push_str(&config_content);
364            }
365        }
366
367        fs_err::write(&path, content)?;
368        path.into_os_string().into_string().map_err(|error| {
369            anyhow!(
370                "could not parse OS string into string: {}",
371                error.to_string_lossy()
372            )
373        })
374    }
375
376    async fn generate_initial_validator_config(&mut self) -> Result<()> {
377        let mut command = self.command_for_binary("linera-server").await?;
378        command.arg("generate");
379        if let Some(seed) = self.testing_prng_seed {
380            command.arg("--testing-prng-seed").arg(seed.to_string());
381            self.testing_prng_seed = Some(seed + 1);
382        }
383        command.arg("--validators");
384        for validator_number in 0..self.num_initial_validators {
385            command.arg(&self.configuration_string(validator_number)?);
386        }
387        command
388            .args(["--committee", "committee.json"])
389            .spawn_and_wait_for_stdout()
390            .await?;
391        Ok(())
392    }
393
394    async fn run(&mut self) -> Result<()> {
395        let github_root = get_github_root().await?;
396        // Build Docker images
397        let (docker_image_name, indexer_image_name, explorer_image_name) = if self.no_build {
398            (
399                self.docker_image_name.clone(),
400                self.indexer_image_name.clone(),
401                self.explorer_image_name.clone(),
402            )
403        } else {
404            let mut join_set = JoinSet::new();
405            join_set.spawn(DockerImage::build(
406                self.docker_image_name.clone(),
407                self.binaries.clone(),
408                github_root.clone(),
409                self.build_mode.clone(),
410                self.dual_store,
411                Dockerfile::Main,
412            ));
413            if self.num_block_exporters > 0 {
414                join_set.spawn(DockerImage::build(
415                    self.indexer_image_name.clone(),
416                    self.binaries.clone(),
417                    github_root.clone(),
418                    self.build_mode.clone(),
419                    self.dual_store,
420                    Dockerfile::Indexer,
421                ));
422                join_set.spawn(DockerImage::build(
423                    self.explorer_image_name.clone(),
424                    self.binaries.clone(),
425                    github_root.clone(),
426                    self.build_mode.clone(),
427                    self.dual_store,
428                    Dockerfile::Explorer,
429                ));
430            }
431
432            join_set
433                .join_all()
434                .await
435                .into_iter()
436                .collect::<Result<Vec<_>>>()?;
437
438            (
439                self.docker_image_name.clone(),
440                self.indexer_image_name.clone(),
441                self.explorer_image_name.clone(),
442            )
443        };
444
445        let base_dir = github_root
446            .join("kubernetes")
447            .join("linera-validator")
448            .join("working");
449        fs_err::copy(
450            self.path_provider.path().join("genesis.json"),
451            base_dir.join("genesis.json"),
452        )?;
453
454        let kubectl_instance_clone = self.kubectl_instance.clone();
455        let path_provider_path_clone = self.path_provider.path().to_path_buf();
456        let num_proxies = self.num_proxies;
457        let num_shards = self.num_shards;
458
459        let mut validators_initialization_futures = Vec::new();
460        for (validator_number, kind_cluster) in self.kind_clusters.iter().cloned().enumerate() {
461            let base_dir = base_dir.clone();
462            let github_root = github_root.clone();
463
464            let kubectl_instance = kubectl_instance_clone.clone();
465            let path_provider_path = path_provider_path_clone.clone();
466
467            let docker_image_name = docker_image_name.clone();
468            let indexer_image_name = indexer_image_name.clone();
469            let explorer_image_name = explorer_image_name.clone();
470            let dual_store = self.dual_store;
471            let num_block_exporters = self.num_block_exporters;
472            let future = async move {
473                let cluster_id = kind_cluster.id();
474                kind_cluster.load_docker_image(&docker_image_name).await?;
475                if num_block_exporters > 0 {
476                    kind_cluster.load_docker_image(&indexer_image_name).await?;
477                    kind_cluster.load_docker_image(&explorer_image_name).await?;
478                }
479
480                let server_config_filename = format!("server_{}.json", validator_number);
481                fs_err::copy(
482                    path_provider_path.join(&server_config_filename),
483                    base_dir.join(&server_config_filename),
484                )?;
485
486                HelmFile::sync(
487                    validator_number,
488                    &github_root,
489                    num_proxies,
490                    num_shards,
491                    cluster_id,
492                    docker_image_name,
493                    num_block_exporters > 0,
494                    num_block_exporters,
495                    indexer_image_name,
496                    explorer_image_name,
497                    dual_store,
498                )
499                .await?;
500
501                let mut kubectl_instance = kubectl_instance.lock().await;
502                let proxy_service = "svc/proxy";
503
504                let local_port = 19100 + validator_number;
505                kubectl_instance.port_forward(
506                    proxy_service,
507                    &format!("{local_port}:19100"),
508                    cluster_id,
509                )?;
510
511                Result::<(), anyhow::Error>::Ok(())
512            };
513
514            validators_initialization_futures.push(future);
515        }
516
517        future::join_all(validators_initialization_futures)
518            .await
519            .into_iter()
520            .collect()
521    }
522}