1use std::{
5 collections::HashMap,
6 iter,
7 sync::{
8 atomic::{AtomicUsize, Ordering},
9 Arc,
10 },
11};
12
13use hdrhistogram::Histogram;
14use linera_base::{
15 data_types::Amount,
16 identifiers::{AccountOwner, ApplicationId, ChainId},
17 listen_for_shutdown_signals,
18 time::Instant,
19};
20use linera_core::{client::ChainClient, Environment};
21use linera_execution::{
22 committee::Committee,
23 system::{Recipient, SystemOperation},
24 Operation,
25};
26use linera_sdk::abis::fungible::{self, FungibleOperation};
27use num_format::{Locale, ToFormattedString};
28use prometheus_parse::{HistogramCount, Scrape, Value};
29use tokio::{
30 sync::{mpsc, Barrier, Notify},
31 task, time,
32};
33use tokio_util::sync::CancellationToken;
34use tracing::{debug, error, info, warn, Instrument as _};
35
36const PROXY_LATENCY_P99_THRESHOLD: f64 = 400.0;
37const LATENCY_METRIC_PREFIX: &str = "linera_proxy_request_latency";
38
39#[derive(Debug, thiserror::Error)]
40pub enum BenchmarkError {
41 #[error("Failed to join task: {0}")]
42 JoinError(#[from] task::JoinError),
43 #[error("Chain client error: {0}")]
44 ChainClient(#[from] linera_core::client::ChainClientError),
45 #[error("Current histogram count is less than previous histogram count")]
46 HistogramCountMismatch,
47 #[error("Expected histogram value, got {0:?}")]
48 ExpectedHistogramValue(Value),
49 #[error("Expected untyped value, got {0:?}")]
50 ExpectedUntypedValue(Value),
51 #[error("Incomplete histogram data")]
52 IncompleteHistogramData,
53 #[error("Could not compute quantile")]
54 CouldNotComputeQuantile,
55 #[error("Bucket boundaries do not match: {0} vs {1}")]
56 BucketBoundariesDoNotMatch(f64, f64),
57 #[error("Reqwest error: {0}")]
58 Reqwest(#[from] reqwest::Error),
59 #[error("Io error: {0}")]
60 IoError(#[from] std::io::Error),
61 #[error("Previous histogram snapshot does not exist: {0}")]
62 PreviousHistogramSnapshotDoesNotExist(String),
63 #[error("No data available yet to calculate p99")]
64 NoDataYetForP99Calculation,
65 #[error("Unexpected empty bucket")]
66 UnexpectedEmptyBucket,
67 #[error("Failed to send unit message: {0}")]
68 TokioSendUnitError(#[from] mpsc::error::SendError<()>),
69 #[error("Failed to create histogram: {0}")]
70 HistogramCreationError(#[from] hdrhistogram::CreationError),
71 #[error("Failed to record histogram: {0}")]
72 HistogramRecordError(#[from] hdrhistogram::RecordError),
73 #[error("Failed to send block timings message: {0}")]
74 TokioSendBlockTimingsError(#[from] mpsc::error::SendError<BlockTimings>),
75}
76
77struct SubmitFastBlockProposalTimings {
78 creating_proposal_ms: u64,
79 stage_block_execution_ms: u64,
80 creating_confirmed_block_ms: u64,
81 submitting_block_proposal_ms: u64,
82}
83
84struct BlockTimeTimings {
85 get_pending_message_bundles_ms: u64,
86 submit_fast_block_proposal_ms: u64,
87 submit_fast_block_proposal_timings: SubmitFastBlockProposalTimings,
88 communicate_chain_updates_ms: u64,
89}
90
91pub struct BlockTimings {
92 block_time_ms: u64,
93 block_time_timings: BlockTimeTimings,
94}
95
96struct SubmitFastBlockProposalTimingsHistograms {
97 creating_proposal_histogram: Histogram<u64>,
98 stage_block_execution_histogram: Histogram<u64>,
99 creating_confirmed_block_histogram: Histogram<u64>,
100 submitting_block_proposal_histogram: Histogram<u64>,
101}
102
103impl SubmitFastBlockProposalTimingsHistograms {
104 pub fn new() -> Result<Self, BenchmarkError> {
105 Ok(Self {
106 creating_proposal_histogram: Histogram::<u64>::new(2)?,
107 stage_block_execution_histogram: Histogram::<u64>::new(2)?,
108 creating_confirmed_block_histogram: Histogram::<u64>::new(2)?,
109 submitting_block_proposal_histogram: Histogram::<u64>::new(2)?,
110 })
111 }
112
113 pub fn record(
114 &mut self,
115 submit_fast_block_proposal_timings: SubmitFastBlockProposalTimings,
116 ) -> Result<(), BenchmarkError> {
117 self.creating_proposal_histogram
118 .record(submit_fast_block_proposal_timings.creating_proposal_ms)?;
119 self.stage_block_execution_histogram
120 .record(submit_fast_block_proposal_timings.stage_block_execution_ms)?;
121 self.creating_confirmed_block_histogram
122 .record(submit_fast_block_proposal_timings.creating_confirmed_block_ms)?;
123 self.submitting_block_proposal_histogram
124 .record(submit_fast_block_proposal_timings.submitting_block_proposal_ms)?;
125 Ok(())
126 }
127}
128
129struct BlockTimeTimingsHistograms {
130 get_pending_message_bundles_histogram: Histogram<u64>,
131 submit_fast_block_proposal_histogram: Histogram<u64>,
132 submit_fast_block_proposal_timings_histograms: SubmitFastBlockProposalTimingsHistograms,
133 communicate_chain_updates_histogram: Histogram<u64>,
134}
135
136impl BlockTimeTimingsHistograms {
137 pub fn new() -> Result<Self, BenchmarkError> {
138 Ok(Self {
139 get_pending_message_bundles_histogram: Histogram::<u64>::new(2)?,
140 submit_fast_block_proposal_histogram: Histogram::<u64>::new(2)?,
141 submit_fast_block_proposal_timings_histograms:
142 SubmitFastBlockProposalTimingsHistograms::new()?,
143 communicate_chain_updates_histogram: Histogram::<u64>::new(2)?,
144 })
145 }
146
147 pub fn record(&mut self, block_time_timings: BlockTimeTimings) -> Result<(), BenchmarkError> {
148 self.get_pending_message_bundles_histogram
149 .record(block_time_timings.get_pending_message_bundles_ms)?;
150 self.submit_fast_block_proposal_histogram
151 .record(block_time_timings.submit_fast_block_proposal_ms)?;
152 self.submit_fast_block_proposal_timings_histograms
153 .record(block_time_timings.submit_fast_block_proposal_timings)?;
154 self.communicate_chain_updates_histogram
155 .record(block_time_timings.communicate_chain_updates_ms)?;
156 Ok(())
157 }
158}
159
160struct BlockTimingsHistograms {
161 block_time_histogram: Histogram<u64>,
162 block_time_timings_histograms: BlockTimeTimingsHistograms,
163}
164
165impl BlockTimingsHistograms {
166 pub fn new() -> Result<Self, BenchmarkError> {
167 Ok(Self {
168 block_time_histogram: Histogram::<u64>::new(2)?,
169 block_time_timings_histograms: BlockTimeTimingsHistograms::new()?,
170 })
171 }
172
173 pub fn record(&mut self, block_timings: BlockTimings) -> Result<(), BenchmarkError> {
174 self.block_time_histogram
175 .record(block_timings.block_time_ms)?;
176 self.block_time_timings_histograms
177 .record(block_timings.block_time_timings)?;
178 Ok(())
179 }
180}
181
182#[derive(Debug)]
183struct HistogramSnapshot {
184 buckets: Vec<HistogramCount>,
185 count: f64,
186 sum: f64,
187}
188
189pub struct Benchmark<Env: Environment> {
190 _phantom: std::marker::PhantomData<Env>,
191}
192
193impl<Env: Environment> Benchmark<Env> {
194 #[expect(clippy::too_many_arguments)]
195 pub async fn run_benchmark(
196 num_chain_groups: usize,
197 transactions_per_block: usize,
198 bps: usize,
199 chain_clients: Vec<Vec<ChainClient<Env>>>,
200 blocks_infos: Vec<Vec<(Vec<Operation>, AccountOwner)>>,
201 committee: Committee,
202 health_check_endpoints: Option<String>,
203 runtime_in_seconds: Option<u64>,
204 delay_between_chain_groups_ms: Option<u64>,
205 ) -> Result<(), BenchmarkError> {
206 let bps_counts = (0..num_chain_groups)
207 .map(|_| Arc::new(AtomicUsize::new(0)))
208 .collect::<Vec<_>>();
209 let notifier = Arc::new(Notify::new());
210 let barrier = Arc::new(Barrier::new(num_chain_groups + 1));
211
212 let shutdown_notifier = CancellationToken::new();
213 tokio::spawn(listen_for_shutdown_signals(shutdown_notifier.clone()));
214
215 let bps_control_task = Self::bps_control_task(
216 &barrier,
217 &shutdown_notifier,
218 &bps_counts,
219 ¬ifier,
220 transactions_per_block,
221 bps,
222 );
223
224 let (block_time_quantiles_sender, block_time_quantiles_task) =
225 Self::block_time_quantiles_task(&shutdown_notifier);
226
227 let (runtime_control_task, runtime_control_sender) =
228 Self::runtime_control_task(&shutdown_notifier, runtime_in_seconds, num_chain_groups);
229
230 let bps_initial_share = bps / num_chain_groups;
231 let mut bps_remainder = bps % num_chain_groups;
232 let mut join_set = task::JoinSet::<Result<(), BenchmarkError>>::new();
233 for (chain_group_index, (chain_group, chain_clients)) in blocks_infos
234 .into_iter()
235 .zip(chain_clients.into_iter())
236 .enumerate()
237 {
238 let shutdown_notifier_clone = shutdown_notifier.clone();
239 let committee = committee.clone();
240 let barrier_clone = barrier.clone();
241 let block_time_quantiles_sender = block_time_quantiles_sender.clone();
242 let bps_count_clone = bps_counts[chain_group_index].clone();
243 let notifier_clone = notifier.clone();
244 let runtime_control_sender_clone = runtime_control_sender.clone();
245 let bps_share = if bps_remainder > 0 {
246 bps_remainder -= 1;
247 bps_initial_share + 1
248 } else {
249 bps_initial_share
250 };
251 join_set.spawn(
252 async move {
253 Box::pin(Self::run_benchmark_internal(
254 chain_group_index,
255 bps_share,
256 chain_group,
257 chain_clients,
258 shutdown_notifier_clone,
259 bps_count_clone,
260 committee,
261 block_time_quantiles_sender,
262 barrier_clone,
263 notifier_clone,
264 runtime_control_sender_clone,
265 delay_between_chain_groups_ms,
266 ))
267 .await?;
268
269 Ok(())
270 }
271 .instrument(
272 tracing::info_span!("chain_group", chain_group_index = ?chain_group_index),
273 ),
274 );
275 }
276
277 let metrics_watcher =
278 Self::metrics_watcher(health_check_endpoints, shutdown_notifier.clone()).await?;
279
280 join_set
281 .join_all()
282 .await
283 .into_iter()
284 .collect::<Result<Vec<_>, _>>()?;
285 info!("All benchmark tasks completed");
286 bps_control_task.await?;
287 if let Some(metrics_watcher) = metrics_watcher {
288 metrics_watcher.await??;
289 }
290 if let Some(runtime_control_task) = runtime_control_task {
291 runtime_control_task.await?;
292 }
293 drop(block_time_quantiles_sender);
294 block_time_quantiles_task.await??;
295
296 Ok(())
297 }
298
299 fn bps_control_task(
301 barrier: &Arc<Barrier>,
302 shutdown_notifier: &CancellationToken,
303 bps_counts: &[Arc<AtomicUsize>],
304 notifier: &Arc<Notify>,
305 transactions_per_block: usize,
306 bps: usize,
307 ) -> task::JoinHandle<()> {
308 let shutdown_notifier = shutdown_notifier.clone();
309 let bps_counts = bps_counts.to_vec();
310 let notifier = notifier.clone();
311 let barrier = barrier.clone();
312 task::spawn(
313 async move {
314 barrier.wait().await;
315 let mut one_second_interval = time::interval(time::Duration::from_secs(1));
316 loop {
317 if shutdown_notifier.is_cancelled() {
318 info!("Shutdown signal received in bps control task");
319 break;
320 }
321 one_second_interval.tick().await;
322 let current_bps_count: usize = bps_counts
323 .iter()
324 .map(|count| count.swap(0, Ordering::Relaxed))
325 .sum();
326 notifier.notify_waiters();
327 let formatted_current_bps = current_bps_count.to_formatted_string(&Locale::en);
328 let formatted_current_tps = (current_bps_count * transactions_per_block)
329 .to_formatted_string(&Locale::en);
330 let formatted_tps_goal =
331 (bps * transactions_per_block).to_formatted_string(&Locale::en);
332 let formatted_bps_goal = bps.to_formatted_string(&Locale::en);
333 if current_bps_count >= bps {
334 info!(
335 "Achieved {} BPS/{} TPS",
336 formatted_current_bps, formatted_current_tps
337 );
338 } else {
339 warn!(
340 "Failed to achieve {} BPS/{} TPS, only achieved {} BPS/{} TPS",
341 formatted_bps_goal,
342 formatted_tps_goal,
343 formatted_current_bps,
344 formatted_current_tps,
345 );
346 }
347 }
348
349 info!("Exiting bps control task");
350 }
351 .instrument(tracing::info_span!("bps_control")),
352 )
353 }
354
355 fn block_time_quantiles_task(
356 shutdown_notifier: &CancellationToken,
357 ) -> (
358 mpsc::UnboundedSender<BlockTimings>,
359 task::JoinHandle<Result<(), BenchmarkError>>,
360 ) {
361 let shutdown_notifier = shutdown_notifier.clone();
362 let (block_time_quantiles_sender, mut block_time_quantiles_receiver) =
363 mpsc::unbounded_channel();
364 let block_time_quantiles_task: task::JoinHandle<Result<(), BenchmarkError>> = task::spawn(
365 async move {
366 let mut histograms = BlockTimingsHistograms::new()?;
367 let mut block_time_quantiles_timer = Instant::now();
368
369 while let Some(block_timings) = block_time_quantiles_receiver.recv().await {
370 if shutdown_notifier.is_cancelled() {
371 info!("Shutdown signal received on block time quantiles task");
372 break;
373 }
374
375 histograms.record(block_timings)?;
376
377 if block_time_quantiles_timer.elapsed().as_secs() >= 5 {
379 for quantile in [0.99, 0.95, 0.90, 0.50] {
380 let formatted_quantile = (quantile * 100.0) as usize;
381
382 info!(
384 "Block time p{}: {} ms",
385 formatted_quantile,
386 histograms.block_time_histogram.value_at_quantile(quantile)
387 );
388
389 info!(
391 " ├─ Get pending message bundles p{}: {} ms",
392 formatted_quantile,
393 histograms
394 .block_time_timings_histograms
395 .get_pending_message_bundles_histogram
396 .value_at_quantile(quantile)
397 );
398 info!(
399 " ├─ Submit fast block proposal p{}: {} ms",
400 formatted_quantile,
401 histograms
402 .block_time_timings_histograms
403 .submit_fast_block_proposal_histogram
404 .value_at_quantile(quantile)
405 );
406 info!(
407 " │ ├─ Creating proposal p{}: {} ms",
408 formatted_quantile,
409 histograms
410 .block_time_timings_histograms
411 .submit_fast_block_proposal_timings_histograms
412 .creating_proposal_histogram
413 .value_at_quantile(quantile)
414 );
415 info!(
416 " │ ├─ Stage block execution p{}: {} ms",
417 formatted_quantile,
418 histograms
419 .block_time_timings_histograms
420 .submit_fast_block_proposal_timings_histograms
421 .stage_block_execution_histogram
422 .value_at_quantile(quantile)
423 );
424 info!(
425 " │ ├─ Creating confirmed block p{}: {} ms",
426 formatted_quantile,
427 histograms
428 .block_time_timings_histograms
429 .submit_fast_block_proposal_timings_histograms
430 .creating_confirmed_block_histogram
431 .value_at_quantile(quantile)
432 );
433 info!(
434 " │ └─ Submitting block proposal p{}: {} ms",
435 formatted_quantile,
436 histograms
437 .block_time_timings_histograms
438 .submit_fast_block_proposal_timings_histograms
439 .submitting_block_proposal_histogram
440 .value_at_quantile(quantile)
441 );
442 info!(
443 " └─ Communicate chain updates p{}: {} ms",
444 formatted_quantile,
445 histograms
446 .block_time_timings_histograms
447 .communicate_chain_updates_histogram
448 .value_at_quantile(quantile)
449 );
450 }
451 block_time_quantiles_timer = Instant::now();
452 }
453 }
454
455 info!("Exiting block time quantiles task");
456 Ok(())
457 }
458 .instrument(tracing::info_span!("block_time_quantiles")),
459 );
460 (block_time_quantiles_sender, block_time_quantiles_task)
461 }
462
463 async fn metrics_watcher(
464 health_check_endpoints: Option<String>,
465 shutdown_notifier: CancellationToken,
466 ) -> Result<Option<task::JoinHandle<Result<(), BenchmarkError>>>, BenchmarkError> {
467 if let Some(health_check_endpoints) = health_check_endpoints {
468 let metrics_addresses = health_check_endpoints
469 .split(',')
470 .map(|address| format!("http://{}/metrics", address.trim()))
471 .collect::<Vec<_>>();
472
473 let mut previous_histogram_snapshots: HashMap<String, HistogramSnapshot> =
474 HashMap::new();
475 let scrapes = Self::get_scrapes(&metrics_addresses).await?;
476 for (metrics_address, scrape) in scrapes {
477 previous_histogram_snapshots.insert(
478 metrics_address,
479 Self::parse_histogram(&scrape, LATENCY_METRIC_PREFIX)?,
480 );
481 }
482
483 let metrics_watcher: task::JoinHandle<Result<(), BenchmarkError>> = tokio::spawn(
484 async move {
485 let mut health_interval = time::interval(time::Duration::from_secs(5));
486 let mut shutdown_interval = time::interval(time::Duration::from_secs(1));
487 loop {
488 tokio::select! {
489 biased;
490 _ = health_interval.tick() => {
491 let result = Self::validators_healthy(&metrics_addresses, &mut previous_histogram_snapshots).await;
492 if let Err(ref err) = result {
493 info!("Shutting down benchmark due to error: {}", err);
494 shutdown_notifier.cancel();
495 break;
496 } else if !result? {
497 info!("Shutting down benchmark due to unhealthy validators");
498 shutdown_notifier.cancel();
499 break;
500 }
501 }
502 _ = shutdown_interval.tick() => {
503 if shutdown_notifier.is_cancelled() {
504 info!("Shutdown signal received, stopping metrics watcher");
505 break;
506 }
507 }
508 }
509 }
510
511 Ok(())
512 }
513 .instrument(tracing::info_span!("metrics_watcher")),
514 );
515
516 Ok(Some(metrics_watcher))
517 } else {
518 Ok(None)
519 }
520 }
521
522 fn runtime_control_task(
523 shutdown_notifier: &CancellationToken,
524 runtime_in_seconds: Option<u64>,
525 num_chain_groups: usize,
526 ) -> (Option<task::JoinHandle<()>>, Option<mpsc::Sender<()>>) {
527 if let Some(runtime_in_seconds) = runtime_in_seconds {
528 let (runtime_control_sender, mut runtime_control_receiver) =
529 mpsc::channel(num_chain_groups);
530 let shutdown_notifier = shutdown_notifier.clone();
531 let runtime_control_task = task::spawn(
532 async move {
533 let mut chains_started = 0;
534 while runtime_control_receiver.recv().await.is_some() {
535 chains_started += 1;
536 if chains_started == num_chain_groups {
537 break;
538 }
539 }
540 time::sleep(time::Duration::from_secs(runtime_in_seconds)).await;
541 shutdown_notifier.cancel();
542 }
543 .instrument(tracing::info_span!("runtime_control")),
544 );
545 (Some(runtime_control_task), Some(runtime_control_sender))
546 } else {
547 (None, None)
548 }
549 }
550
551 async fn validators_healthy(
552 metrics_addresses: &[String],
553 previous_histogram_snapshots: &mut HashMap<String, HistogramSnapshot>,
554 ) -> Result<bool, BenchmarkError> {
555 let scrapes = Self::get_scrapes(metrics_addresses).await?;
556 for (metrics_address, scrape) in scrapes {
557 let histogram = Self::parse_histogram(&scrape, LATENCY_METRIC_PREFIX)?;
558 let diff = Self::diff_histograms(
559 previous_histogram_snapshots.get(&metrics_address).ok_or(
560 BenchmarkError::PreviousHistogramSnapshotDoesNotExist(metrics_address.clone()),
561 )?,
562 &histogram,
563 )?;
564 let p99 = match Self::compute_quantile(&diff.buckets, diff.count, 0.99) {
565 Ok(p99) => p99,
566 Err(BenchmarkError::NoDataYetForP99Calculation) => {
567 info!(
568 "No data available yet to calculate p99 for {}",
569 metrics_address
570 );
571 continue;
572 }
573 Err(e) => {
574 error!("Error computing p99 for {}: {}", metrics_address, e);
575 return Err(e);
576 }
577 };
578
579 let last_bucket_boundary = diff.buckets[diff.buckets.len() - 2].less_than;
580 if p99 == f64::INFINITY {
581 info!(
582 "{} -> Estimated p99 for {} is higher than the last bucket boundary of {:?} ms",
583 metrics_address, LATENCY_METRIC_PREFIX, last_bucket_boundary
584 );
585 } else {
586 info!(
587 "{} -> Estimated p99 for {}: {:.2} ms",
588 metrics_address, LATENCY_METRIC_PREFIX, p99
589 );
590 }
591 if p99 > PROXY_LATENCY_P99_THRESHOLD {
592 if p99 == f64::INFINITY {
593 error!(
594 "Proxy of validator {} unhealthy! Latency p99 is too high, it is higher than \
595 the last bucket boundary of {:.2} ms",
596 metrics_address, last_bucket_boundary
597 );
598 } else {
599 error!(
600 "Proxy of validator {} unhealthy! Latency p99 is too high: {:.2} ms",
601 metrics_address, p99
602 );
603 }
604 return Ok(false);
605 }
606 previous_histogram_snapshots.insert(metrics_address.clone(), histogram);
607 }
608
609 Ok(true)
610 }
611
612 fn diff_histograms(
613 previous: &HistogramSnapshot,
614 current: &HistogramSnapshot,
615 ) -> Result<HistogramSnapshot, BenchmarkError> {
616 if current.count < previous.count {
617 return Err(BenchmarkError::HistogramCountMismatch);
618 }
619 let total_diff = current.count - previous.count;
620 let mut buckets_diff: Vec<HistogramCount> = Vec::new();
621 for (before, after) in previous.buckets.iter().zip(current.buckets.iter()) {
622 let bound_before = before.less_than;
623 let bound_after = after.less_than;
624 let cumulative_before = before.count;
625 let cumulative_after = after.count;
626 if (bound_before - bound_after).abs() > f64::EPSILON {
627 return Err(BenchmarkError::BucketBoundariesDoNotMatch(
628 bound_before,
629 bound_after,
630 ));
631 }
632 let diff = (cumulative_after - cumulative_before).max(0.0);
633 buckets_diff.push(HistogramCount {
634 less_than: bound_after,
635 count: diff,
636 });
637 }
638 Ok(HistogramSnapshot {
639 buckets: buckets_diff,
640 count: total_diff,
641 sum: current.sum - previous.sum,
642 })
643 }
644
645 async fn get_scrapes(
646 metrics_addresses: &[String],
647 ) -> Result<Vec<(String, Scrape)>, BenchmarkError> {
648 let mut scrapes = Vec::new();
649 for metrics_address in metrics_addresses {
650 let response = reqwest::get(metrics_address)
651 .await
652 .map_err(BenchmarkError::Reqwest)?;
653 let metrics = response.text().await.map_err(BenchmarkError::Reqwest)?;
654 let scrape = Scrape::parse(metrics.lines().map(|line| Ok(line.to_owned())))
655 .map_err(BenchmarkError::IoError)?;
656 scrapes.push((metrics_address.clone(), scrape));
657 }
658 Ok(scrapes)
659 }
660
661 fn parse_histogram(
662 scrape: &Scrape,
663 metric_prefix: &str,
664 ) -> Result<HistogramSnapshot, BenchmarkError> {
665 let mut buckets: Vec<HistogramCount> = Vec::new();
666 let mut total_count: Option<f64> = None;
667 let mut total_sum: Option<f64> = None;
668
669 for sample in &scrape.samples {
671 if sample.metric == metric_prefix {
672 if let Value::Histogram(histogram) = &sample.value {
673 buckets.extend(histogram.iter().cloned());
674 } else {
675 return Err(BenchmarkError::ExpectedHistogramValue(sample.value.clone()));
676 }
677 } else if sample.metric == format!("{}_count", metric_prefix) {
678 if let Value::Untyped(count) = sample.value {
679 total_count = Some(count);
680 } else {
681 return Err(BenchmarkError::ExpectedUntypedValue(sample.value.clone()));
682 }
683 } else if sample.metric == format!("{}_sum", metric_prefix) {
684 if let Value::Untyped(sum) = sample.value {
685 total_sum = Some(sum);
686 } else {
687 return Err(BenchmarkError::ExpectedUntypedValue(sample.value.clone()));
688 }
689 }
690 }
691
692 match (total_count, total_sum) {
693 (Some(count), Some(sum)) if !buckets.is_empty() => {
694 buckets.sort_by(|a, b| {
695 a.less_than
696 .partial_cmp(&b.less_than)
697 .expect("Comparison should not fail")
698 });
699 Ok(HistogramSnapshot {
700 buckets,
701 count,
702 sum,
703 })
704 }
705 _ => Err(BenchmarkError::IncompleteHistogramData),
706 }
707 }
708
709 fn compute_quantile(
710 buckets: &[HistogramCount],
711 total_count: f64,
712 quantile: f64,
713 ) -> Result<f64, BenchmarkError> {
714 if total_count == 0.0 {
715 return Err(BenchmarkError::NoDataYetForP99Calculation);
717 }
718 let target = (quantile * total_count).ceil();
720 let mut prev_cumulative = 0.0;
721 let mut prev_bound = 0.0;
722 for bucket in buckets {
723 if bucket.count >= target {
724 let bucket_count = bucket.count - prev_cumulative;
725 if bucket_count == 0.0 {
726 return Err(BenchmarkError::UnexpectedEmptyBucket);
728 }
729 let fraction = (target - prev_cumulative) / bucket_count;
730 return Ok(prev_bound + (bucket.less_than - prev_bound) * fraction);
731 }
732 prev_cumulative = bucket.count;
733 prev_bound = bucket.less_than;
734 }
735 Err(BenchmarkError::CouldNotComputeQuantile)
736 }
737
738 #[expect(clippy::too_many_arguments)]
739 async fn run_benchmark_internal(
740 chain_group_index: usize,
741 bps: usize,
742 chain_group: Vec<(Vec<Operation>, AccountOwner)>,
743 chain_clients: Vec<ChainClient<Env>>,
744 shutdown_notifier: CancellationToken,
745 bps_count: Arc<AtomicUsize>,
746 committee: Committee,
747 block_time_quantiles_sender: mpsc::UnboundedSender<BlockTimings>,
748 barrier: Arc<Barrier>,
749 notifier: Arc<Notify>,
750 runtime_control_sender: Option<mpsc::Sender<()>>,
751 delay_between_chain_groups_ms: Option<u64>,
752 ) -> Result<(), BenchmarkError> {
753 barrier.wait().await;
754 if let Some(delay_between_chain_groups_ms) = delay_between_chain_groups_ms {
755 time::sleep(time::Duration::from_millis(
756 (chain_group_index as u64) * delay_between_chain_groups_ms,
757 ))
758 .await;
759 }
760 info!("Starting benchmark for chain group {:?}", chain_group_index);
761
762 if let Some(runtime_control_sender) = runtime_control_sender {
763 runtime_control_sender.send(()).await?;
764 }
765
766 for ((operations, chain_owner), chain_client) in chain_group
767 .into_iter()
768 .zip(chain_clients.into_iter())
769 .cycle()
770 {
771 if shutdown_notifier.is_cancelled() {
772 info!("Shutdown signal received, stopping benchmark");
773 break;
774 }
775
776 let block_time_start = Instant::now();
777 let submit_fast_block_proposal_start = Instant::now();
778 let get_pending_message_bundles_start = Instant::now();
779 let incoming_bundles = chain_client.pending_message_bundles().await?;
780 let get_pending_message_bundles_ms =
781 get_pending_message_bundles_start.elapsed().as_millis() as u64;
782 let (
783 creating_proposal_ms,
784 stage_block_execution_ms,
785 creating_confirmed_block_ms,
786 submitting_block_proposal_ms,
787 ) = chain_client
788 .submit_fast_block_proposal(&committee, &operations, &incoming_bundles, chain_owner)
789 .await
790 .map_err(BenchmarkError::ChainClient)?;
791 let submit_fast_block_proposal_ms =
792 submit_fast_block_proposal_start.elapsed().as_millis() as u64;
793 let communicate_chain_updates_start = Instant::now();
794 chain_client
796 .communicate_chain_updates(&committee)
797 .await
798 .map_err(BenchmarkError::ChainClient)?;
799 let communicate_chain_updates_ms =
800 communicate_chain_updates_start.elapsed().as_millis() as u64;
801 let block_time_ms = block_time_start.elapsed().as_millis() as u64;
802 let block_metrics = BlockTimings {
803 block_time_ms,
804 block_time_timings: BlockTimeTimings {
805 get_pending_message_bundles_ms,
806 submit_fast_block_proposal_ms,
807 submit_fast_block_proposal_timings: SubmitFastBlockProposalTimings {
808 creating_proposal_ms,
809 stage_block_execution_ms,
810 creating_confirmed_block_ms,
811 submitting_block_proposal_ms,
812 },
813 communicate_chain_updates_ms,
814 },
815 };
816 if let Err(e) = block_time_quantiles_sender.send(block_metrics) {
817 warn!("Failed to send block time quantiles: {}", e);
820 }
821
822 let current_bps_count = bps_count.fetch_add(1, Ordering::Relaxed) + 1;
823 if current_bps_count >= bps {
824 notifier.notified().await;
825 }
826 }
827
828 info!("Exiting task...");
829 Ok(())
830 }
831
832 pub async fn close_benchmark_chain(
834 chain_client: &ChainClient<Env>,
835 ) -> Result<(), BenchmarkError> {
836 let start = Instant::now();
837 chain_client
838 .execute_operation(Operation::system(SystemOperation::CloseChain))
839 .await?
840 .expect("Close chain operation should not fail!");
841
842 debug!(
843 "Closed chain {:?} in {} ms",
844 chain_client.chain_id(),
845 start.elapsed().as_millis()
846 );
847
848 Ok(())
849 }
850
851 pub fn make_benchmark_block_info(
853 benchmark_chains: Vec<Vec<(ChainId, AccountOwner)>>,
854 transactions_per_block: usize,
855 fungible_application_id: Option<ApplicationId>,
856 ) -> Vec<Vec<(Vec<Operation>, AccountOwner)>> {
857 let mut blocks_infos = Vec::new();
858 for chains in benchmark_chains {
859 let mut infos = Vec::new();
860 let chains_len = chains.len();
861 let amount = Amount::from(1);
862 for i in 0..chains_len {
863 let owner = chains[i].1;
864 let recipient_chain_id = chains[(i + chains_len - 1) % chains_len].0;
865 let operation = match fungible_application_id {
866 Some(application_id) => Self::fungible_transfer(
867 application_id,
868 recipient_chain_id,
869 owner,
870 owner,
871 amount,
872 ),
873 None => Operation::system(SystemOperation::Transfer {
874 owner: AccountOwner::CHAIN,
875 recipient: Recipient::chain(recipient_chain_id),
876 amount,
877 }),
878 };
879 let operations = iter::repeat_n(operation, transactions_per_block).collect();
880 infos.push((operations, owner));
881 }
882 blocks_infos.push(infos);
883 }
884 blocks_infos
885 }
886
887 pub fn fungible_transfer(
889 application_id: ApplicationId,
890 chain_id: ChainId,
891 sender: AccountOwner,
892 receiver: AccountOwner,
893 amount: Amount,
894 ) -> Operation {
895 let target_account = fungible::Account {
896 chain_id,
897 owner: receiver,
898 };
899 let bytes = bcs::to_bytes(&FungibleOperation::Transfer {
900 owner: sender,
901 amount,
902 target_account,
903 })
904 .expect("should serialize fungible token operation");
905 Operation::User {
906 application_id,
907 bytes,
908 }
909 }
910}