linera_service/cli_wrappers/
local_kubernetes_net.rs1use std::sync::Arc;
5
6use anyhow::{anyhow, bail, ensure, Result};
7use async_trait::async_trait;
8use futures::{future, lock::Mutex};
9use k8s_openapi::api::core::v1::Pod;
10use kube::{api::ListParams, Api, Client};
11use linera_base::{
12 command::{resolve_binary, CommandExt},
13 data_types::Amount,
14};
15use linera_client::client_options::ResourceControlPolicyConfig;
16use tempfile::{tempdir, TempDir};
17use tokio::process::Command;
18#[cfg(with_testing)]
19use {linera_base::command::current_binary_parent, tokio::sync::OnceCell};
20
21use crate::cli_wrappers::{
22 docker::{BuildArg, DockerImage},
23 helmfile::HelmFile,
24 kind::KindCluster,
25 kubectl::KubectlInstance,
26 local_net::PathProvider,
27 util::get_github_root,
28 ClientWrapper, LineraNet, LineraNetConfig, Network, OnClientDrop,
29};
30
31#[cfg(with_testing)]
32static SHARED_LOCAL_KUBERNETES_TESTING_NET: OnceCell<(
33 Arc<Mutex<LocalKubernetesNet>>,
34 ClientWrapper,
35)> = OnceCell::const_new();
36
37#[derive(Clone, clap::Parser, clap::ValueEnum, Debug, Default)]
38pub enum BuildMode {
39 Debug,
40 #[default]
41 Release,
42}
43
44impl std::str::FromStr for BuildMode {
45 type Err = String;
46
47 fn from_str(s: &str) -> Result<Self, Self::Err> {
48 clap::ValueEnum::from_str(s, true)
49 }
50}
51
52impl std::fmt::Display for BuildMode {
53 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
54 write!(f, "{:?}", self)
55 }
56}
57
58pub struct LocalKubernetesNetConfig {
60 pub network: Network,
61 pub testing_prng_seed: Option<u64>,
62 pub num_other_initial_chains: u32,
63 pub initial_amount: Amount,
64 pub num_initial_validators: usize,
65 pub num_shards: usize,
66 pub binaries: BuildArg,
67 pub no_build: bool,
68 pub docker_image_name: String,
69 pub build_mode: BuildMode,
70 pub policy_config: ResourceControlPolicyConfig,
71 pub dual_store: bool,
72}
73
74#[cfg(with_testing)]
77pub struct SharedLocalKubernetesNetTestingConfig(LocalKubernetesNetConfig);
78
79#[derive(Clone)]
81pub struct LocalKubernetesNet {
82 network: Network,
83 testing_prng_seed: Option<u64>,
84 next_client_id: usize,
85 tmp_dir: Arc<TempDir>,
86 binaries: BuildArg,
87 no_build: bool,
88 docker_image_name: String,
89 build_mode: BuildMode,
90 kubectl_instance: Arc<Mutex<KubectlInstance>>,
91 kind_clusters: Vec<KindCluster>,
92 num_initial_validators: usize,
93 num_shards: usize,
94 dual_store: bool,
95}
96
97#[cfg(with_testing)]
98impl SharedLocalKubernetesNetTestingConfig {
99 pub fn new(network: Network, mut binaries: BuildArg) -> Self {
101 if std::env::var("LINERA_TRY_RELEASE_BINARIES").unwrap_or_default() == "true"
102 && matches!(binaries, BuildArg::Build)
103 {
104 let current_binary_parent =
106 current_binary_parent().expect("Fetching current binaries path should not fail");
107 let binaries_dir = current_binary_parent
109 .parent()
110 .expect("Getting parent should not fail")
111 .join("release");
112 if binaries_dir.exists() {
113 binaries = BuildArg::Directory(binaries_dir);
115 }
116 }
117 Self(LocalKubernetesNetConfig {
118 network,
119 testing_prng_seed: Some(37),
120 num_other_initial_chains: 2,
121 initial_amount: Amount::from_tokens(2000),
122 num_initial_validators: 4,
123 num_shards: 4,
124 binaries,
125 no_build: false,
126 docker_image_name: String::from("linera:latest"),
127 build_mode: BuildMode::Release,
128 policy_config: ResourceControlPolicyConfig::Testnet,
129 dual_store: false,
130 })
131 }
132}
133
134#[async_trait]
135impl LineraNetConfig for LocalKubernetesNetConfig {
136 type Net = LocalKubernetesNet;
137
138 async fn instantiate(self) -> Result<(Self::Net, ClientWrapper)> {
139 ensure!(
140 self.num_initial_validators > 0,
141 "There should be at least one initial validator"
142 );
143
144 let clusters = future::join_all((0..self.num_initial_validators).map(|_| async {
145 KindCluster::create()
146 .await
147 .expect("Creating kind cluster should not fail")
148 }))
149 .await;
150
151 let mut net = LocalKubernetesNet::new(
152 self.network,
153 self.testing_prng_seed,
154 self.binaries,
155 self.no_build,
156 self.docker_image_name,
157 self.build_mode,
158 KubectlInstance::new(Vec::new()),
159 clusters,
160 self.num_initial_validators,
161 self.num_shards,
162 self.dual_store,
163 )?;
164
165 let client = net.make_client().await;
166 net.generate_initial_validator_config().await.unwrap();
167 client
168 .create_genesis_config(
169 self.num_other_initial_chains,
170 self.initial_amount,
171 self.policy_config,
172 Some(vec!["localhost".to_owned()]),
173 )
174 .await
175 .unwrap();
176 net.run().await.unwrap();
177
178 Ok((net, client))
179 }
180}
181
182#[cfg(with_testing)]
183#[async_trait]
184impl LineraNetConfig for SharedLocalKubernetesNetTestingConfig {
185 type Net = Arc<Mutex<LocalKubernetesNet>>;
186
187 async fn instantiate(self) -> Result<(Self::Net, ClientWrapper)> {
188 let (net, initial_client) = SHARED_LOCAL_KUBERNETES_TESTING_NET
189 .get_or_init(|| async {
190 let (net, initial_client) = self
191 .0
192 .instantiate()
193 .await
194 .expect("Instantiating LocalKubernetesNetConfig should not fail");
195 (Arc::new(Mutex::new(net)), initial_client)
196 })
197 .await;
198
199 let mut net = net.clone();
200 let client = net.make_client().await;
201 client.wallet_init(None).await.unwrap();
204 for _ in 0..2 {
205 initial_client
206 .open_and_assign(&client, Amount::from_tokens(10))
207 .await
208 .unwrap();
209 }
210
211 Ok((net, client))
212 }
213}
214
215#[async_trait]
216impl LineraNet for Arc<Mutex<LocalKubernetesNet>> {
217 async fn ensure_is_running(&mut self) -> Result<()> {
218 let self_clone = self.clone();
219 let mut self_lock = self_clone.lock().await;
220
221 self_lock.ensure_is_running().await
222 }
223
224 async fn make_client(&mut self) -> ClientWrapper {
225 let self_clone = self.clone();
226 let mut self_lock = self_clone.lock().await;
227
228 self_lock.make_client().await
229 }
230
231 async fn terminate(&mut self) -> Result<()> {
232 Ok(())
234 }
235}
236
237#[async_trait]
238impl LineraNet for LocalKubernetesNet {
239 async fn ensure_is_running(&mut self) -> Result<()> {
240 let client = Client::try_default().await?;
241 let pods: Api<Pod> = Api::namespaced(client, "default");
242
243 let list_params = ListParams::default().labels("app=proxy");
244 for pod in pods.list(&list_params).await? {
245 if let Some(status) = pod.status {
246 if let Some(phase) = status.phase {
247 if phase != "Running" {
248 bail!(
249 "Validator {} is not Running",
250 pod.metadata
251 .name
252 .expect("Fetching pod name should not fail")
253 );
254 }
255 }
256 }
257 }
258
259 let list_params = ListParams::default().labels("app=shards");
260 for pod in pods.list(&list_params).await? {
261 if let Some(status) = pod.status {
262 if let Some(phase) = status.phase {
263 if phase != "Running" {
264 bail!(
265 "Shard {} is not Running",
266 pod.metadata
267 .name
268 .expect("Fetching pod name should not fail")
269 );
270 }
271 }
272 }
273 }
274
275 Ok(())
276 }
277
278 async fn make_client(&mut self) -> ClientWrapper {
279 let path_provider = PathProvider::TemporaryDirectory {
280 tmp_dir: self.tmp_dir.clone(),
281 };
282 let client = ClientWrapper::new(
283 path_provider,
284 self.network,
285 self.testing_prng_seed,
286 self.next_client_id,
287 OnClientDrop::LeakChains,
288 );
289 if let Some(seed) = self.testing_prng_seed {
290 self.testing_prng_seed = Some(seed + 1);
291 }
292 self.next_client_id += 1;
293 client
294 }
295
296 async fn terminate(&mut self) -> Result<()> {
297 let mut kubectl_instance = self.kubectl_instance.lock().await;
298 let mut errors = Vec::new();
299 for port_forward_child in &mut kubectl_instance.port_forward_children {
300 if let Err(e) = port_forward_child.kill().await {
301 errors.push(e.into());
302 }
303 }
304
305 for kind_cluster in &mut self.kind_clusters {
306 if let Err(e) = kind_cluster.delete().await {
307 errors.push(e);
308 }
309 }
310
311 if errors.is_empty() {
312 Ok(())
313 } else {
314 let err_str = if errors.len() > 1 {
315 "Multiple errors"
316 } else {
317 "One error"
318 };
319
320 Err(errors
321 .into_iter()
322 .fold(anyhow!("{err_str} occurred"), |acc, e: anyhow::Error| {
323 acc.context(e)
324 }))
325 }
326 }
327}
328
329impl LocalKubernetesNet {
330 #[expect(clippy::too_many_arguments)]
331 fn new(
332 network: Network,
333 testing_prng_seed: Option<u64>,
334 binaries: BuildArg,
335 no_build: bool,
336 docker_image_name: String,
337 build_mode: BuildMode,
338 kubectl_instance: KubectlInstance,
339 kind_clusters: Vec<KindCluster>,
340 num_initial_validators: usize,
341 num_shards: usize,
342 dual_store: bool,
343 ) -> Result<Self> {
344 Ok(Self {
345 network,
346 testing_prng_seed,
347 next_client_id: 0,
348 tmp_dir: Arc::new(tempdir()?),
349 binaries,
350 no_build,
351 docker_image_name,
352 build_mode,
353 kubectl_instance: Arc::new(Mutex::new(kubectl_instance)),
354 kind_clusters,
355 num_initial_validators,
356 num_shards,
357 dual_store,
358 })
359 }
360
361 async fn command_for_binary(&self, name: &'static str) -> Result<Command> {
362 let path = resolve_binary(name, env!("CARGO_PKG_NAME")).await?;
363 let mut command = Command::new(path);
364 command.current_dir(self.tmp_dir.path());
365 Ok(command)
366 }
367
368 fn configuration_string(&self, server_number: usize) -> Result<String> {
369 let n = server_number;
370 let path = self.tmp_dir.path().join(format!("validator_{n}.toml"));
371 let port = 19100 + server_number;
372 let internal_port = 20100;
373 let metrics_port = 21100;
374 let mut content = format!(
375 r#"
376 server_config_path = "server_{n}.json"
377 host = "127.0.0.1"
378 port = {port}
379 [external_protocol]
380 Grpc = "ClearText"
381 [internal_protocol]
382 Grpc = "ClearText"
383
384 [[proxies]]
385 host = "proxy-0.default.svc.cluster.local"
386 public_port = {port}
387 private_port = {internal_port}
388 metrics_port = {metrics_port}
389 "#
390 );
391
392 for k in 0..self.num_shards {
393 let shard_port = 19100;
394 let shard_metrics_port = 21100;
395 content.push_str(&format!(
396 r#"
397
398 [[shards]]
399 host = "shards-{k}.shards.default.svc.cluster.local"
400 port = {shard_port}
401 metrics_port = {shard_metrics_port}
402 "#
403 ));
404 }
405 fs_err::write(&path, content)?;
406 path.into_os_string().into_string().map_err(|error| {
407 anyhow!(
408 "could not parse OS string into string: {}",
409 error.to_string_lossy()
410 )
411 })
412 }
413
414 async fn generate_initial_validator_config(&mut self) -> Result<()> {
415 let mut command = self.command_for_binary("linera-server").await?;
416 command.arg("generate");
417 if let Some(seed) = self.testing_prng_seed {
418 command.arg("--testing-prng-seed").arg(seed.to_string());
419 self.testing_prng_seed = Some(seed + 1);
420 }
421 command.arg("--validators");
422 for i in 0..self.num_initial_validators {
423 command.arg(&self.configuration_string(i)?);
424 }
425 command
426 .args(["--committee", "committee.json"])
427 .spawn_and_wait_for_stdout()
428 .await?;
429 Ok(())
430 }
431
432 async fn run(&mut self) -> Result<()> {
433 let github_root = get_github_root().await?;
434 let docker_image_name = if self.no_build {
436 self.docker_image_name.clone()
437 } else {
438 DockerImage::build(
439 &self.docker_image_name,
440 &self.binaries,
441 &github_root,
442 &self.build_mode,
443 self.dual_store,
444 )
445 .await?;
446 self.docker_image_name.clone()
447 };
448
449 let base_dir = github_root
450 .join("kubernetes")
451 .join("linera-validator")
452 .join("working");
453 fs_err::copy(
454 self.tmp_dir.path().join("genesis.json"),
455 base_dir.join("genesis.json"),
456 )?;
457
458 let kubectl_instance_clone = self.kubectl_instance.clone();
459 let tmp_dir_path_clone = self.tmp_dir.path().to_path_buf();
460 let num_shards = self.num_shards;
461
462 let mut validators_initialization_futures = Vec::new();
463 for (i, kind_cluster) in self.kind_clusters.iter().cloned().enumerate() {
464 let base_dir = base_dir.clone();
465 let github_root = github_root.clone();
466
467 let kubectl_instance = kubectl_instance_clone.clone();
468 let tmp_dir_path = tmp_dir_path_clone.clone();
469
470 let docker_image_name = docker_image_name.clone();
471 let dual_store = self.dual_store;
472 let future = async move {
473 let cluster_id = kind_cluster.id();
474 kind_cluster.load_docker_image(&docker_image_name).await?;
475
476 let server_config_filename = format!("server_{}.json", i);
477 fs_err::copy(
478 tmp_dir_path.join(&server_config_filename),
479 base_dir.join(&server_config_filename),
480 )?;
481
482 HelmFile::sync(
483 i,
484 &github_root,
485 num_shards,
486 cluster_id,
487 docker_image_name,
488 dual_store,
489 )
490 .await?;
491
492 let mut kubectl_instance = kubectl_instance.lock().await;
493 let proxy_service = "svc/proxy";
494
495 let local_port = 19100 + i;
496 kubectl_instance.port_forward(
497 proxy_service,
498 &format!("{local_port}:{local_port}"),
499 cluster_id,
500 )?;
501
502 Result::<(), anyhow::Error>::Ok(())
503 };
504
505 validators_initialization_futures.push(future);
506 }
507
508 future::join_all(validators_initialization_futures)
509 .await
510 .into_iter()
511 .collect()
512 }
513}