1use std::sync::Arc;
5
6use anyhow::{anyhow, bail, ensure, Result};
7use async_trait::async_trait;
8use futures::{future, lock::Mutex};
9use k8s_openapi::api::core::v1::Pod;
10use kube::{api::ListParams, Api, Client};
11use linera_base::{
12 command::{resolve_binary, CommandExt},
13 data_types::Amount,
14};
15use linera_client::client_options::ResourceControlPolicyConfig;
16use tokio::{process::Command, task::JoinSet};
17
18use crate::cli_wrappers::{
19 docker::{BuildArg, DockerImage, Dockerfile},
20 helmfile::{HelmFile, DEFAULT_BLOCK_EXPORTER_PORT},
21 kind::KindCluster,
22 kubectl::KubectlInstance,
23 local_net::PathProvider,
24 util::get_github_root,
25 ClientWrapper, LineraNet, LineraNetConfig, Network, OnClientDrop,
26};
27
28#[derive(Clone, clap::Parser, clap::ValueEnum, Debug, Default)]
29pub enum BuildMode {
30 Debug,
31 #[default]
32 Release,
33}
34
35impl std::str::FromStr for BuildMode {
36 type Err = String;
37
38 fn from_str(s: &str) -> Result<Self, Self::Err> {
39 clap::ValueEnum::from_str(s, true)
40 }
41}
42
43impl std::fmt::Display for BuildMode {
44 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
45 write!(f, "{:?}", self)
46 }
47}
48
49pub struct LocalKubernetesNetConfig {
51 pub network: Network,
52 pub testing_prng_seed: Option<u64>,
53 pub num_other_initial_chains: u32,
54 pub initial_amount: Amount,
55 pub num_initial_validators: usize,
56 pub num_proxies: usize,
57 pub num_shards: usize,
58 pub binaries: BuildArg,
59 pub no_build: bool,
60 pub docker_image_name: String,
61 pub build_mode: BuildMode,
62 pub policy_config: ResourceControlPolicyConfig,
63 pub num_block_exporters: usize,
64 pub indexer_image_name: String,
65 pub explorer_image_name: String,
66 pub dual_store: bool,
67 pub path_provider: PathProvider,
68}
69
70#[derive(Clone)]
72pub struct LocalKubernetesNet {
73 network: Network,
74 testing_prng_seed: Option<u64>,
75 next_client_id: usize,
76 binaries: BuildArg,
77 no_build: bool,
78 docker_image_name: String,
79 build_mode: BuildMode,
80 kubectl_instance: Arc<Mutex<KubectlInstance>>,
81 kind_clusters: Vec<KindCluster>,
82 num_initial_validators: usize,
83 num_proxies: usize,
84 num_shards: usize,
85 num_block_exporters: usize,
86 indexer_image_name: String,
87 explorer_image_name: String,
88 dual_store: bool,
89 path_provider: PathProvider,
90}
91
92#[async_trait]
93impl LineraNetConfig for LocalKubernetesNetConfig {
94 type Net = LocalKubernetesNet;
95
96 async fn instantiate(self) -> Result<(Self::Net, ClientWrapper)> {
97 ensure!(
98 self.num_initial_validators > 0,
99 "There should be at least one initial validator"
100 );
101
102 let clusters = future::join_all((0..self.num_initial_validators).map(|_| async {
103 KindCluster::create()
104 .await
105 .expect("Creating kind cluster should not fail")
106 }))
107 .await;
108
109 let mut net = LocalKubernetesNet::new(
110 self.network,
111 self.testing_prng_seed,
112 self.binaries,
113 self.no_build,
114 self.docker_image_name,
115 self.build_mode,
116 KubectlInstance::new(Vec::new()),
117 clusters,
118 self.num_initial_validators,
119 self.num_proxies,
120 self.num_shards,
121 self.num_block_exporters,
122 self.indexer_image_name,
123 self.explorer_image_name,
124 self.dual_store,
125 self.path_provider,
126 )?;
127
128 let client = net.make_client().await;
129 net.generate_initial_validator_config().await.unwrap();
130 client
131 .create_genesis_config(
132 self.num_other_initial_chains,
133 self.initial_amount,
134 self.policy_config,
135 Some(vec!["localhost".to_owned()]),
136 )
137 .await
138 .unwrap();
139 net.run().await.unwrap();
140
141 Ok((net, client))
142 }
143}
144
145#[async_trait]
146impl LineraNet for Arc<Mutex<LocalKubernetesNet>> {
147 async fn ensure_is_running(&mut self) -> Result<()> {
148 let self_clone = self.clone();
149 let mut self_lock = self_clone.lock().await;
150
151 self_lock.ensure_is_running().await
152 }
153
154 async fn make_client(&mut self) -> ClientWrapper {
155 let self_clone = self.clone();
156 let mut self_lock = self_clone.lock().await;
157
158 self_lock.make_client().await
159 }
160
161 async fn terminate(&mut self) -> Result<()> {
162 Ok(())
164 }
165}
166
167#[async_trait]
168impl LineraNet for LocalKubernetesNet {
169 async fn ensure_is_running(&mut self) -> Result<()> {
170 let client = Client::try_default().await?;
171 let pods: Api<Pod> = Api::namespaced(client, "default");
172
173 let list_params = ListParams::default().labels("app=proxy");
174 for pod in pods.list(&list_params).await? {
175 if let Some(status) = pod.status {
176 if let Some(phase) = status.phase {
177 if phase != "Running" {
178 bail!(
179 "Validator {} is not Running",
180 pod.metadata
181 .name
182 .expect("Fetching pod name should not fail")
183 );
184 }
185 }
186 }
187 }
188
189 let list_params = ListParams::default().labels("app=shards");
190 for pod in pods.list(&list_params).await? {
191 if let Some(status) = pod.status {
192 if let Some(phase) = status.phase {
193 if phase != "Running" {
194 bail!(
195 "Shard {} is not Running",
196 pod.metadata
197 .name
198 .expect("Fetching pod name should not fail")
199 );
200 }
201 }
202 }
203 }
204
205 Ok(())
206 }
207
208 async fn make_client(&mut self) -> ClientWrapper {
209 let client = ClientWrapper::new(
210 self.path_provider.clone(),
211 self.network,
212 self.testing_prng_seed,
213 self.next_client_id,
214 OnClientDrop::LeakChains,
215 );
216 if let Some(seed) = self.testing_prng_seed {
217 self.testing_prng_seed = Some(seed + 1);
218 }
219 self.next_client_id += 1;
220 client
221 }
222
223 async fn terminate(&mut self) -> Result<()> {
224 let mut kubectl_instance = self.kubectl_instance.lock().await;
225 let mut errors = Vec::new();
226 for port_forward_child in &mut kubectl_instance.port_forward_children {
227 if let Err(e) = port_forward_child.kill().await {
228 errors.push(e.into());
229 }
230 }
231
232 for kind_cluster in &mut self.kind_clusters {
233 if let Err(e) = kind_cluster.delete().await {
234 errors.push(e);
235 }
236 }
237
238 if errors.is_empty() {
239 Ok(())
240 } else {
241 let err_str = if errors.len() > 1 {
242 "Multiple errors"
243 } else {
244 "One error"
245 };
246
247 Err(errors
248 .into_iter()
249 .fold(anyhow!("{err_str} occurred"), |acc, e: anyhow::Error| {
250 acc.context(e)
251 }))
252 }
253 }
254}
255
256impl LocalKubernetesNet {
257 #[expect(clippy::too_many_arguments)]
258 fn new(
259 network: Network,
260 testing_prng_seed: Option<u64>,
261 binaries: BuildArg,
262 no_build: bool,
263 docker_image_name: String,
264 build_mode: BuildMode,
265 kubectl_instance: KubectlInstance,
266 kind_clusters: Vec<KindCluster>,
267 num_initial_validators: usize,
268 num_proxies: usize,
269 num_shards: usize,
270 num_block_exporters: usize,
271 indexer_image_name: String,
272 explorer_image_name: String,
273 dual_store: bool,
274 path_provider: PathProvider,
275 ) -> Result<Self> {
276 Ok(Self {
277 network,
278 testing_prng_seed,
279 next_client_id: 0,
280 binaries,
281 no_build,
282 docker_image_name,
283 build_mode,
284 kubectl_instance: Arc::new(Mutex::new(kubectl_instance)),
285 kind_clusters,
286 num_initial_validators,
287 num_proxies,
288 num_shards,
289 num_block_exporters,
290 indexer_image_name,
291 explorer_image_name,
292 dual_store,
293 path_provider,
294 })
295 }
296
297 async fn command_for_binary(&self, name: &'static str) -> Result<Command> {
298 let path = resolve_binary(name, env!("CARGO_PKG_NAME")).await?;
299 let mut command = Command::new(path);
300 command.current_dir(self.path_provider.path());
301 Ok(command)
302 }
303
304 fn configuration_string(&self, validator_number: usize) -> Result<String> {
305 let path = self
306 .path_provider
307 .path()
308 .join(format!("validator_{validator_number}.toml"));
309 let public_port = 19100 + validator_number;
310 let private_port = 20100;
311 let metrics_port = 21100;
312 let protocol = self.network.toml();
313 let host = self.network.localhost();
314 let mut content = format!(
315 r#"
316 server_config_path = "server_{validator_number}.json"
317 host = "{host}"
318 port = {public_port}
319 external_protocol = {protocol}
320 internal_protocol = {protocol}
321
322 "#
323 );
324
325 for proxy_id in 0..self.num_proxies {
326 content.push_str(&format!(
327 r#"
328 [[proxies]]
329 host = "proxy-{proxy_id}.proxy-internal.default.svc.cluster.local"
330 public_port = {public_port}
331 private_port = {private_port}
332 metrics_port = {metrics_port}
333 "#
334 ));
335 }
336
337 for shard_id in 0..self.num_shards {
338 content.push_str(&format!(
339 r#"
340
341 [[shards]]
342 host = "shards-{shard_id}.shards.default.svc.cluster.local"
343 port = {public_port}
344 metrics_port = {metrics_port}
345 "#
346 ));
347 }
348
349 if self.num_block_exporters > 0 {
350 for exporter_num in 0..self.num_block_exporters {
351 let block_exporter_port = DEFAULT_BLOCK_EXPORTER_PORT;
352 let block_exporter_host =
353 format!("linera-block-exporter-{exporter_num}.linera-block-exporter");
354 let config_content = format!(
355 r#"
356
357 [[block_exporters]]
358 host = "{block_exporter_host}"
359 port = {block_exporter_port}
360 "#
361 );
362
363 content.push_str(&config_content);
364 }
365 }
366
367 fs_err::write(&path, content)?;
368 path.into_os_string().into_string().map_err(|error| {
369 anyhow!(
370 "could not parse OS string into string: {}",
371 error.to_string_lossy()
372 )
373 })
374 }
375
376 async fn generate_initial_validator_config(&mut self) -> Result<()> {
377 let mut command = self.command_for_binary("linera-server").await?;
378 command.arg("generate");
379 if let Some(seed) = self.testing_prng_seed {
380 command.arg("--testing-prng-seed").arg(seed.to_string());
381 self.testing_prng_seed = Some(seed + 1);
382 }
383 command.arg("--validators");
384 for validator_number in 0..self.num_initial_validators {
385 command.arg(&self.configuration_string(validator_number)?);
386 }
387 command
388 .args(["--committee", "committee.json"])
389 .spawn_and_wait_for_stdout()
390 .await?;
391 Ok(())
392 }
393
394 async fn run(&mut self) -> Result<()> {
395 let github_root = get_github_root().await?;
396 let (docker_image_name, indexer_image_name, explorer_image_name) = if self.no_build {
398 (
399 self.docker_image_name.clone(),
400 self.indexer_image_name.clone(),
401 self.explorer_image_name.clone(),
402 )
403 } else {
404 let mut join_set = JoinSet::new();
405 join_set.spawn(DockerImage::build(
406 self.docker_image_name.clone(),
407 self.binaries.clone(),
408 github_root.clone(),
409 self.build_mode.clone(),
410 self.dual_store,
411 Dockerfile::Main,
412 ));
413 if self.num_block_exporters > 0 {
414 join_set.spawn(DockerImage::build(
415 self.indexer_image_name.clone(),
416 self.binaries.clone(),
417 github_root.clone(),
418 self.build_mode.clone(),
419 self.dual_store,
420 Dockerfile::Indexer,
421 ));
422 join_set.spawn(DockerImage::build(
423 self.explorer_image_name.clone(),
424 self.binaries.clone(),
425 github_root.clone(),
426 self.build_mode.clone(),
427 self.dual_store,
428 Dockerfile::Explorer,
429 ));
430 }
431
432 join_set
433 .join_all()
434 .await
435 .into_iter()
436 .collect::<Result<Vec<_>>>()?;
437
438 (
439 self.docker_image_name.clone(),
440 self.indexer_image_name.clone(),
441 self.explorer_image_name.clone(),
442 )
443 };
444
445 let base_dir = github_root
446 .join("kubernetes")
447 .join("linera-validator")
448 .join("working");
449 fs_err::copy(
450 self.path_provider.path().join("genesis.json"),
451 base_dir.join("genesis.json"),
452 )?;
453
454 let kubectl_instance_clone = self.kubectl_instance.clone();
455 let path_provider_path_clone = self.path_provider.path().to_path_buf();
456 let num_proxies = self.num_proxies;
457 let num_shards = self.num_shards;
458
459 let mut validators_initialization_futures = Vec::new();
460 for (validator_number, kind_cluster) in self.kind_clusters.iter().cloned().enumerate() {
461 let base_dir = base_dir.clone();
462 let github_root = github_root.clone();
463
464 let kubectl_instance = kubectl_instance_clone.clone();
465 let path_provider_path = path_provider_path_clone.clone();
466
467 let docker_image_name = docker_image_name.clone();
468 let indexer_image_name = indexer_image_name.clone();
469 let explorer_image_name = explorer_image_name.clone();
470 let dual_store = self.dual_store;
471 let num_block_exporters = self.num_block_exporters;
472 let future = async move {
473 let cluster_id = kind_cluster.id();
474 kind_cluster.load_docker_image(&docker_image_name).await?;
475 if num_block_exporters > 0 {
476 kind_cluster.load_docker_image(&indexer_image_name).await?;
477 kind_cluster.load_docker_image(&explorer_image_name).await?;
478 }
479
480 let server_config_filename = format!("server_{}.json", validator_number);
481 fs_err::copy(
482 path_provider_path.join(&server_config_filename),
483 base_dir.join(&server_config_filename),
484 )?;
485
486 HelmFile::sync(
487 validator_number,
488 &github_root,
489 num_proxies,
490 num_shards,
491 cluster_id,
492 docker_image_name,
493 num_block_exporters > 0,
494 num_block_exporters,
495 indexer_image_name,
496 explorer_image_name,
497 dual_store,
498 )
499 .await?;
500
501 let mut kubectl_instance = kubectl_instance.lock().await;
502 let proxy_service = "svc/proxy";
503
504 let local_port = 19100 + validator_number;
505 kubectl_instance.port_forward(
506 proxy_service,
507 &format!("{local_port}:19100"),
508 cluster_id,
509 )?;
510
511 Result::<(), anyhow::Error>::Ok(())
512 };
513
514 validators_initialization_futures.push(future);
515 }
516
517 future::join_all(validators_initialization_futures)
518 .await
519 .into_iter()
520 .collect()
521 }
522}