linera_service/cli/validator_benchmark/
mod.rs1mod bulk_download;
9mod config;
10mod latency;
11mod partial_sync;
12mod preflight;
13mod progress;
14mod read_latency;
15mod report;
16mod rpc;
17mod tip_lag;
18
19use std::{io::IsTerminal as _, time::Duration};
20
21use anyhow::Result;
22use chrono::Utc;
23pub use config::Benchmark;
24use linera_base::identifiers::ChainId;
25use linera_client::client_context::ClientContext;
26use linera_core::{
27 data_types::ChainInfoQuery,
28 node::{ValidatorNode, ValidatorNodeProvider as _},
29};
30
31use self::{
32 progress::Progress,
33 report::{Candidate, Layers, Metadata, Observer, OutputSpec, Report, Writer},
34};
35
36impl Benchmark {
37 pub async fn run(
38 &self,
39 context: &mut ClientContext<
40 impl linera_core::Environment<ValidatorNode = linera_rpc::Client>,
41 >,
42 ) -> Result<()> {
43 let output_specs = OutputSpec::parse_all(&self.output)?;
45
46 let progress = Progress::new(!self.no_progress && std::io::stderr().is_terminal());
47 let rpc_timeout = Duration::from_secs(self.rpc_timeout_secs);
48
49 let started_at = Utc::now();
50 let node = context.make_node_provider().make_node(&self.address)?;
51 let writer = Writer::new(output_specs);
52
53 let mut report = Report {
56 metadata: Metadata {
57 tool_version: env!("CARGO_PKG_VERSION").to_string(),
58 candidate: Candidate {
59 address: self.address.clone(),
60 public_key: self.public_key.map(|k| k.to_string()),
61 version_info: None,
62 network_description: None,
63 },
64 observer: Observer {
65 location: self.observer_location.clone(),
66 hostname: std::env::var("HOSTNAME").unwrap_or_default(),
67 started_at: started_at.to_rfc3339(),
68 ended_at: None,
69 duration_secs: None,
70 },
71 config: serde_json::to_value(self)?,
72 chains_tested: self.chain.iter().map(|c| c.to_string()).collect(),
73 complete: false,
74 },
75 layers: Layers::default(),
76 };
77
78 if !self.skip_preflight {
81 let outcome = preflight::run(&node, rpc_timeout, &progress).await;
82 if self.abort_on_preflight_fail
83 && outcome.report.status == report::PreflightStatus::Fail
84 {
85 progress.clear();
86 anyhow::bail!(
87 "preflight failed for {}: {:?}",
88 self.address,
89 outcome.report.errors
90 );
91 }
92 report.metadata.candidate.version_info = outcome.version_info;
93 report.metadata.candidate.network_description = outcome.network_description;
94 report.layers.preflight = Some(outcome.report);
95 } else {
96 report.metadata.candidate.version_info =
97 rpc::timed(rpc_timeout, node.get_version_info())
98 .await
99 .ok()
100 .map(|v| format!("{v:?}"));
101 report.metadata.candidate.network_description =
102 rpc::timed(rpc_timeout, node.get_network_description())
103 .await
104 .ok()
105 .and_then(|nd| serde_json::to_value(nd).ok());
106 }
107 writer.write_files(&report)?;
108
109 let deep_chain = self.deep.then(|| self.deep_chain.unwrap_or(self.chain[0]));
113 warn_unheld_chains(&node, &self.chain, deep_chain, rpc_timeout).await;
114 if let Some(deep_chain) = deep_chain {
115 report.layers.partial_sync = Some(
116 Box::pin(partial_sync::run(
117 &node,
118 context,
119 deep_chain,
120 self.deep_blocks,
121 rpc_timeout,
122 &progress,
123 ))
124 .await?,
125 );
126 writer.write_files(&report)?;
127 }
128
129 if !self.skip_read_baseline {
131 report.layers.read_baseline = Some(
132 Box::pin(read_latency::run_baseline(
133 &node,
134 &self.chain,
135 self.baseline_requests,
136 rpc_timeout,
137 &progress,
138 ))
139 .await,
140 );
141 writer.write_files(&report)?;
142 }
143
144 if !self.skip_read_stress {
145 report.layers.read_stress = Some(
146 Box::pin(read_latency::run_stress(
147 &node,
148 &self.chain,
149 &self.stress_levels,
150 Duration::from_secs(self.stress_duration_secs),
151 rpc_timeout,
152 &progress,
153 ))
154 .await,
155 );
156 writer.write_files(&report)?;
157 }
158
159 if !self.skip_bulk_download {
160 report.layers.bulk_download = Some(
161 Box::pin(bulk_download::run(
162 &node,
163 &self.chain,
164 self.bulk_batch_size,
165 &self.bulk_concurrency,
166 &self.bulk_height_range,
167 rpc_timeout,
168 &progress,
169 ))
170 .await?,
171 );
172 writer.write_files(&report)?;
173 }
174
175 if !self.skip_tip_lag {
176 report.layers.tip_lag = Some(
177 Box::pin(tip_lag::run(
178 &node,
179 context,
180 &self.chain,
181 self.tip_lag_samples,
182 Duration::from_secs(self.tip_lag_interval_secs),
183 rpc_timeout,
184 &progress,
185 ))
186 .await?,
187 );
188 writer.write_files(&report)?;
189 }
190
191 let ended_at = Utc::now();
192 report.metadata.observer.ended_at = Some(ended_at.to_rfc3339());
193 report.metadata.observer.duration_secs =
194 Some(u64::try_from((ended_at - started_at).num_seconds()).unwrap_or(0));
195 report.metadata.complete = true;
196 progress.clear();
197 writer.emit(&report)?;
198 Ok(())
199 }
200}
201
202async fn warn_unheld_chains(
206 node: &impl ValidatorNode,
207 chains: &[ChainId],
208 seeded: Option<ChainId>,
209 rpc_timeout: Duration,
210) {
211 let mut unheld = Vec::new();
212 for &chain in chains {
213 if Some(chain) == seeded {
214 continue;
215 }
216 let held = rpc::timed(
217 rpc_timeout,
218 node.handle_chain_info_query(ChainInfoQuery::new(chain)),
219 )
220 .await
221 .is_ok_and(|response| response.info.next_block_height.0 > 0);
222 if !held {
223 unheld.push(chain.to_string());
224 }
225 }
226 if !unheld.is_empty() {
227 tracing::warn!(
228 "candidate does not hold chain(s) [{}]; read layers (L3-L5) will be shallow. \
229 Pre-sync them (`linera validator sync`) or pass `--deep` to seed blocks first.",
230 unheld.join(", ")
231 );
232 }
233}