Skip to main content

linera_service/cli/validator_benchmark/
mod.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Multi-layer pre-onboarding benchmark for a candidate validator.
5//!
6//! Tracking: linera-io/linera-infra#1198.
7
8mod bulk_download;
9mod config;
10mod latency;
11mod partial_sync;
12mod preflight;
13mod progress;
14mod read_latency;
15mod report;
16mod rpc;
17mod tip_lag;
18
19use std::{io::IsTerminal as _, time::Duration};
20
21use anyhow::Result;
22use chrono::Utc;
23pub use config::Benchmark;
24use linera_base::identifiers::ChainId;
25use linera_client::client_context::ClientContext;
26use linera_core::{
27    data_types::ChainInfoQuery,
28    node::{ValidatorNode, ValidatorNodeProvider as _},
29};
30
31use self::{
32    progress::Progress,
33    report::{Candidate, Layers, Metadata, Observer, OutputSpec, Report, Writer},
34};
35
36impl Benchmark {
37    pub async fn run(
38        &self,
39        context: &mut ClientContext<
40            impl linera_core::Environment<ValidatorNode = linera_rpc::Client>,
41        >,
42    ) -> Result<()> {
43        // Validate output specs up front so a typo fails fast, before any work.
44        let output_specs = OutputSpec::parse_all(&self.output)?;
45
46        let progress = Progress::new(!self.no_progress && std::io::stderr().is_terminal());
47        let rpc_timeout = Duration::from_secs(self.rpc_timeout_secs);
48
49        let started_at = Utc::now();
50        let node = context.make_node_provider().make_node(&self.address)?;
51        let writer = Writer::new(output_specs);
52
53        // Build the report up front and flush file targets after each layer, so
54        // an interrupted run still leaves the completed layers on disk.
55        let mut report = Report {
56            metadata: Metadata {
57                tool_version: env!("CARGO_PKG_VERSION").to_string(),
58                candidate: Candidate {
59                    address: self.address.clone(),
60                    public_key: self.public_key.map(|k| k.to_string()),
61                    version_info: None,
62                    network_description: None,
63                },
64                observer: Observer {
65                    location: self.observer_location.clone(),
66                    hostname: std::env::var("HOSTNAME").unwrap_or_default(),
67                    started_at: started_at.to_rfc3339(),
68                    ended_at: None,
69                    duration_secs: None,
70                },
71                config: serde_json::to_value(self)?,
72                chains_tested: self.chain.iter().map(|c| c.to_string()).collect(),
73                complete: false,
74            },
75            layers: Layers::default(),
76        };
77
78        // L1 preflight also yields version/network info for the report metadata.
79        // When skipped, fetch those two cheap fields best-effort anyway.
80        if !self.skip_preflight {
81            let outcome = preflight::run(&node, rpc_timeout, &progress).await;
82            if self.abort_on_preflight_fail
83                && outcome.report.status == report::PreflightStatus::Fail
84            {
85                progress.clear();
86                anyhow::bail!(
87                    "preflight failed for {}: {:?}",
88                    self.address,
89                    outcome.report.errors
90                );
91            }
92            report.metadata.candidate.version_info = outcome.version_info;
93            report.metadata.candidate.network_description = outcome.network_description;
94            report.layers.preflight = Some(outcome.report);
95        } else {
96            report.metadata.candidate.version_info =
97                rpc::timed(rpc_timeout, node.get_version_info())
98                    .await
99                    .ok()
100                    .map(|v| format!("{v:?}"));
101            report.metadata.candidate.network_description =
102                rpc::timed(rpc_timeout, node.get_network_description())
103                    .await
104                    .ok()
105                    .and_then(|nd| serde_json::to_value(nd).ok());
106        }
107        writer.write_files(&report)?;
108
109        // A not-yet-committee candidate may hold few or no blocks. Seed first when
110        // --deep so the read layers below exercise a candidate that actually has
111        // the data, and warn about any chain it does not hold and will not seed.
112        let deep_chain = self.deep.then(|| self.deep_chain.unwrap_or(self.chain[0]));
113        warn_unheld_chains(&node, &self.chain, deep_chain, rpc_timeout).await;
114        if let Some(deep_chain) = deep_chain {
115            report.layers.partial_sync = Some(
116                Box::pin(partial_sync::run(
117                    &node,
118                    context,
119                    deep_chain,
120                    self.deep_blocks,
121                    rpc_timeout,
122                    &progress,
123                ))
124                .await?,
125            );
126            writer.write_files(&report)?;
127        }
128
129        // Layer futures are large; box them at the await site (clippy::large_futures).
130        if !self.skip_read_baseline {
131            report.layers.read_baseline = Some(
132                Box::pin(read_latency::run_baseline(
133                    &node,
134                    &self.chain,
135                    self.baseline_requests,
136                    rpc_timeout,
137                    &progress,
138                ))
139                .await,
140            );
141            writer.write_files(&report)?;
142        }
143
144        if !self.skip_read_stress {
145            report.layers.read_stress = Some(
146                Box::pin(read_latency::run_stress(
147                    &node,
148                    &self.chain,
149                    &self.stress_levels,
150                    Duration::from_secs(self.stress_duration_secs),
151                    rpc_timeout,
152                    &progress,
153                ))
154                .await,
155            );
156            writer.write_files(&report)?;
157        }
158
159        if !self.skip_bulk_download {
160            report.layers.bulk_download = Some(
161                Box::pin(bulk_download::run(
162                    &node,
163                    &self.chain,
164                    self.bulk_batch_size,
165                    &self.bulk_concurrency,
166                    &self.bulk_height_range,
167                    rpc_timeout,
168                    &progress,
169                ))
170                .await?,
171            );
172            writer.write_files(&report)?;
173        }
174
175        if !self.skip_tip_lag {
176            report.layers.tip_lag = Some(
177                Box::pin(tip_lag::run(
178                    &node,
179                    context,
180                    &self.chain,
181                    self.tip_lag_samples,
182                    Duration::from_secs(self.tip_lag_interval_secs),
183                    rpc_timeout,
184                    &progress,
185                ))
186                .await?,
187            );
188            writer.write_files(&report)?;
189        }
190
191        let ended_at = Utc::now();
192        report.metadata.observer.ended_at = Some(ended_at.to_rfc3339());
193        report.metadata.observer.duration_secs =
194            Some(u64::try_from((ended_at - started_at).num_seconds()).unwrap_or(0));
195        report.metadata.complete = true;
196        progress.clear();
197        writer.emit(&report)?;
198        Ok(())
199    }
200}
201
202/// Warn about chains the candidate does not hold (read layers would be shallow),
203/// excluding one that `--deep` is about to seed. A chain with tip 0 or an
204/// unreachable lookup is treated as not held.
205async fn warn_unheld_chains(
206    node: &impl ValidatorNode,
207    chains: &[ChainId],
208    seeded: Option<ChainId>,
209    rpc_timeout: Duration,
210) {
211    let mut unheld = Vec::new();
212    for &chain in chains {
213        if Some(chain) == seeded {
214            continue;
215        }
216        let held = rpc::timed(
217            rpc_timeout,
218            node.handle_chain_info_query(ChainInfoQuery::new(chain)),
219        )
220        .await
221        .is_ok_and(|response| response.info.next_block_height.0 > 0);
222        if !held {
223            unheld.push(chain.to_string());
224        }
225    }
226    if !unheld.is_empty() {
227        tracing::warn!(
228            "candidate does not hold chain(s) [{}]; read layers (L3-L5) will be shallow. \
229             Pre-sync them (`linera validator sync`) or pass `--deep` to seed blocks first.",
230            unheld.join(", ")
231        );
232    }
233}