linera_client/
benchmark.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::HashMap,
6    iter,
7    sync::{
8        atomic::{AtomicUsize, Ordering},
9        Arc,
10    },
11};
12
13use hdrhistogram::Histogram;
14use linera_base::{
15    data_types::Amount,
16    identifiers::{AccountOwner, ApplicationId, ChainId},
17    listen_for_shutdown_signals,
18    time::Instant,
19};
20use linera_core::{client::ChainClient, Environment};
21use linera_execution::{
22    committee::Committee,
23    system::{Recipient, SystemOperation},
24    Operation,
25};
26use linera_sdk::abis::fungible::{self, FungibleOperation};
27use num_format::{Locale, ToFormattedString};
28use prometheus_parse::{HistogramCount, Scrape, Value};
29use tokio::{
30    sync::{mpsc, Barrier, Notify},
31    task, time,
32};
33use tokio_util::sync::CancellationToken;
34use tracing::{debug, error, info, warn, Instrument as _};
35
36const PROXY_LATENCY_P99_THRESHOLD: f64 = 400.0;
37const LATENCY_METRIC_PREFIX: &str = "linera_proxy_request_latency";
38
39#[derive(Debug, thiserror::Error)]
40pub enum BenchmarkError {
41    #[error("Failed to join task: {0}")]
42    JoinError(#[from] task::JoinError),
43    #[error("Chain client error: {0}")]
44    ChainClient(#[from] linera_core::client::ChainClientError),
45    #[error("Current histogram count is less than previous histogram count")]
46    HistogramCountMismatch,
47    #[error("Expected histogram value, got {0:?}")]
48    ExpectedHistogramValue(Value),
49    #[error("Expected untyped value, got {0:?}")]
50    ExpectedUntypedValue(Value),
51    #[error("Incomplete histogram data")]
52    IncompleteHistogramData,
53    #[error("Could not compute quantile")]
54    CouldNotComputeQuantile,
55    #[error("Bucket boundaries do not match: {0} vs {1}")]
56    BucketBoundariesDoNotMatch(f64, f64),
57    #[error("Reqwest error: {0}")]
58    Reqwest(#[from] reqwest::Error),
59    #[error("Io error: {0}")]
60    IoError(#[from] std::io::Error),
61    #[error("Previous histogram snapshot does not exist: {0}")]
62    PreviousHistogramSnapshotDoesNotExist(String),
63    #[error("No data available yet to calculate p99")]
64    NoDataYetForP99Calculation,
65    #[error("Unexpected empty bucket")]
66    UnexpectedEmptyBucket,
67    #[error("Failed to send unit message: {0}")]
68    TokioSendUnitError(#[from] mpsc::error::SendError<()>),
69    #[error("Failed to create histogram: {0}")]
70    HistogramCreationError(#[from] hdrhistogram::CreationError),
71    #[error("Failed to record histogram: {0}")]
72    HistogramRecordError(#[from] hdrhistogram::RecordError),
73    #[error("Failed to send block timings message: {0}")]
74    TokioSendBlockTimingsError(#[from] mpsc::error::SendError<BlockTimings>),
75}
76
77struct SubmitFastBlockProposalTimings {
78    creating_proposal_ms: u64,
79    stage_block_execution_ms: u64,
80    creating_confirmed_block_ms: u64,
81    submitting_block_proposal_ms: u64,
82}
83
84struct BlockTimeTimings {
85    get_pending_message_bundles_ms: u64,
86    submit_fast_block_proposal_ms: u64,
87    submit_fast_block_proposal_timings: SubmitFastBlockProposalTimings,
88    communicate_chain_updates_ms: u64,
89}
90
91pub struct BlockTimings {
92    block_time_ms: u64,
93    block_time_timings: BlockTimeTimings,
94}
95
96struct SubmitFastBlockProposalTimingsHistograms {
97    creating_proposal_histogram: Histogram<u64>,
98    stage_block_execution_histogram: Histogram<u64>,
99    creating_confirmed_block_histogram: Histogram<u64>,
100    submitting_block_proposal_histogram: Histogram<u64>,
101}
102
103impl SubmitFastBlockProposalTimingsHistograms {
104    pub fn new() -> Result<Self, BenchmarkError> {
105        Ok(Self {
106            creating_proposal_histogram: Histogram::<u64>::new(2)?,
107            stage_block_execution_histogram: Histogram::<u64>::new(2)?,
108            creating_confirmed_block_histogram: Histogram::<u64>::new(2)?,
109            submitting_block_proposal_histogram: Histogram::<u64>::new(2)?,
110        })
111    }
112
113    pub fn record(
114        &mut self,
115        submit_fast_block_proposal_timings: SubmitFastBlockProposalTimings,
116    ) -> Result<(), BenchmarkError> {
117        self.creating_proposal_histogram
118            .record(submit_fast_block_proposal_timings.creating_proposal_ms)?;
119        self.stage_block_execution_histogram
120            .record(submit_fast_block_proposal_timings.stage_block_execution_ms)?;
121        self.creating_confirmed_block_histogram
122            .record(submit_fast_block_proposal_timings.creating_confirmed_block_ms)?;
123        self.submitting_block_proposal_histogram
124            .record(submit_fast_block_proposal_timings.submitting_block_proposal_ms)?;
125        Ok(())
126    }
127}
128
129struct BlockTimeTimingsHistograms {
130    get_pending_message_bundles_histogram: Histogram<u64>,
131    submit_fast_block_proposal_histogram: Histogram<u64>,
132    submit_fast_block_proposal_timings_histograms: SubmitFastBlockProposalTimingsHistograms,
133    communicate_chain_updates_histogram: Histogram<u64>,
134}
135
136impl BlockTimeTimingsHistograms {
137    pub fn new() -> Result<Self, BenchmarkError> {
138        Ok(Self {
139            get_pending_message_bundles_histogram: Histogram::<u64>::new(2)?,
140            submit_fast_block_proposal_histogram: Histogram::<u64>::new(2)?,
141            submit_fast_block_proposal_timings_histograms:
142                SubmitFastBlockProposalTimingsHistograms::new()?,
143            communicate_chain_updates_histogram: Histogram::<u64>::new(2)?,
144        })
145    }
146
147    pub fn record(&mut self, block_time_timings: BlockTimeTimings) -> Result<(), BenchmarkError> {
148        self.get_pending_message_bundles_histogram
149            .record(block_time_timings.get_pending_message_bundles_ms)?;
150        self.submit_fast_block_proposal_histogram
151            .record(block_time_timings.submit_fast_block_proposal_ms)?;
152        self.submit_fast_block_proposal_timings_histograms
153            .record(block_time_timings.submit_fast_block_proposal_timings)?;
154        self.communicate_chain_updates_histogram
155            .record(block_time_timings.communicate_chain_updates_ms)?;
156        Ok(())
157    }
158}
159
160struct BlockTimingsHistograms {
161    block_time_histogram: Histogram<u64>,
162    block_time_timings_histograms: BlockTimeTimingsHistograms,
163}
164
165impl BlockTimingsHistograms {
166    pub fn new() -> Result<Self, BenchmarkError> {
167        Ok(Self {
168            block_time_histogram: Histogram::<u64>::new(2)?,
169            block_time_timings_histograms: BlockTimeTimingsHistograms::new()?,
170        })
171    }
172
173    pub fn record(&mut self, block_timings: BlockTimings) -> Result<(), BenchmarkError> {
174        self.block_time_histogram
175            .record(block_timings.block_time_ms)?;
176        self.block_time_timings_histograms
177            .record(block_timings.block_time_timings)?;
178        Ok(())
179    }
180}
181
182#[derive(Debug)]
183struct HistogramSnapshot {
184    buckets: Vec<HistogramCount>,
185    count: f64,
186    sum: f64,
187}
188
189pub struct Benchmark<Env: Environment> {
190    _phantom: std::marker::PhantomData<Env>,
191}
192
193impl<Env: Environment> Benchmark<Env> {
194    #[expect(clippy::too_many_arguments)]
195    pub async fn run_benchmark(
196        num_chain_groups: usize,
197        transactions_per_block: usize,
198        bps: usize,
199        chain_clients: Vec<Vec<ChainClient<Env>>>,
200        blocks_infos: Vec<Vec<(Vec<Operation>, AccountOwner)>>,
201        committee: Committee,
202        health_check_endpoints: Option<String>,
203        runtime_in_seconds: Option<u64>,
204        delay_between_chain_groups_ms: Option<u64>,
205    ) -> Result<(), BenchmarkError> {
206        let bps_counts = (0..num_chain_groups)
207            .map(|_| Arc::new(AtomicUsize::new(0)))
208            .collect::<Vec<_>>();
209        let notifier = Arc::new(Notify::new());
210        let barrier = Arc::new(Barrier::new(num_chain_groups + 1));
211
212        let shutdown_notifier = CancellationToken::new();
213        tokio::spawn(listen_for_shutdown_signals(shutdown_notifier.clone()));
214
215        let bps_control_task = Self::bps_control_task(
216            &barrier,
217            &shutdown_notifier,
218            &bps_counts,
219            &notifier,
220            transactions_per_block,
221            bps,
222        );
223
224        let (block_time_quantiles_sender, block_time_quantiles_task) =
225            Self::block_time_quantiles_task(&shutdown_notifier);
226
227        let (runtime_control_task, runtime_control_sender) =
228            Self::runtime_control_task(&shutdown_notifier, runtime_in_seconds, num_chain_groups);
229
230        let bps_initial_share = bps / num_chain_groups;
231        let mut bps_remainder = bps % num_chain_groups;
232        let mut join_set = task::JoinSet::<Result<(), BenchmarkError>>::new();
233        for (chain_group_index, (chain_group, chain_clients)) in blocks_infos
234            .into_iter()
235            .zip(chain_clients.into_iter())
236            .enumerate()
237        {
238            let shutdown_notifier_clone = shutdown_notifier.clone();
239            let committee = committee.clone();
240            let barrier_clone = barrier.clone();
241            let block_time_quantiles_sender = block_time_quantiles_sender.clone();
242            let bps_count_clone = bps_counts[chain_group_index].clone();
243            let notifier_clone = notifier.clone();
244            let runtime_control_sender_clone = runtime_control_sender.clone();
245            let bps_share = if bps_remainder > 0 {
246                bps_remainder -= 1;
247                bps_initial_share + 1
248            } else {
249                bps_initial_share
250            };
251            join_set.spawn(
252                async move {
253                    Box::pin(Self::run_benchmark_internal(
254                        chain_group_index,
255                        bps_share,
256                        chain_group,
257                        chain_clients,
258                        shutdown_notifier_clone,
259                        bps_count_clone,
260                        committee,
261                        block_time_quantiles_sender,
262                        barrier_clone,
263                        notifier_clone,
264                        runtime_control_sender_clone,
265                        delay_between_chain_groups_ms,
266                    ))
267                    .await?;
268
269                    Ok(())
270                }
271                .instrument(
272                    tracing::info_span!("chain_group", chain_group_index = ?chain_group_index),
273                ),
274            );
275        }
276
277        let metrics_watcher =
278            Self::metrics_watcher(health_check_endpoints, shutdown_notifier.clone()).await?;
279
280        join_set
281            .join_all()
282            .await
283            .into_iter()
284            .collect::<Result<Vec<_>, _>>()?;
285        info!("All benchmark tasks completed");
286        bps_control_task.await?;
287        if let Some(metrics_watcher) = metrics_watcher {
288            metrics_watcher.await??;
289        }
290        if let Some(runtime_control_task) = runtime_control_task {
291            runtime_control_task.await?;
292        }
293        drop(block_time_quantiles_sender);
294        block_time_quantiles_task.await??;
295
296        Ok(())
297    }
298
299    // The bps control task will control the BPS from the threads.
300    fn bps_control_task(
301        barrier: &Arc<Barrier>,
302        shutdown_notifier: &CancellationToken,
303        bps_counts: &[Arc<AtomicUsize>],
304        notifier: &Arc<Notify>,
305        transactions_per_block: usize,
306        bps: usize,
307    ) -> task::JoinHandle<()> {
308        let shutdown_notifier = shutdown_notifier.clone();
309        let bps_counts = bps_counts.to_vec();
310        let notifier = notifier.clone();
311        let barrier = barrier.clone();
312        task::spawn(
313            async move {
314                barrier.wait().await;
315                let mut one_second_interval = time::interval(time::Duration::from_secs(1));
316                loop {
317                    if shutdown_notifier.is_cancelled() {
318                        info!("Shutdown signal received in bps control task");
319                        break;
320                    }
321                    one_second_interval.tick().await;
322                    let current_bps_count: usize = bps_counts
323                        .iter()
324                        .map(|count| count.swap(0, Ordering::Relaxed))
325                        .sum();
326                    notifier.notify_waiters();
327                    let formatted_current_bps = current_bps_count.to_formatted_string(&Locale::en);
328                    let formatted_current_tps = (current_bps_count * transactions_per_block)
329                        .to_formatted_string(&Locale::en);
330                    let formatted_tps_goal =
331                        (bps * transactions_per_block).to_formatted_string(&Locale::en);
332                    let formatted_bps_goal = bps.to_formatted_string(&Locale::en);
333                    if current_bps_count >= bps {
334                        info!(
335                            "Achieved {} BPS/{} TPS",
336                            formatted_current_bps, formatted_current_tps
337                        );
338                    } else {
339                        warn!(
340                            "Failed to achieve {} BPS/{} TPS, only achieved {} BPS/{} TPS",
341                            formatted_bps_goal,
342                            formatted_tps_goal,
343                            formatted_current_bps,
344                            formatted_current_tps,
345                        );
346                    }
347                }
348
349                info!("Exiting bps control task");
350            }
351            .instrument(tracing::info_span!("bps_control")),
352        )
353    }
354
355    fn block_time_quantiles_task(
356        shutdown_notifier: &CancellationToken,
357    ) -> (
358        mpsc::UnboundedSender<BlockTimings>,
359        task::JoinHandle<Result<(), BenchmarkError>>,
360    ) {
361        let shutdown_notifier = shutdown_notifier.clone();
362        let (block_time_quantiles_sender, mut block_time_quantiles_receiver) =
363            mpsc::unbounded_channel();
364        let block_time_quantiles_task: task::JoinHandle<Result<(), BenchmarkError>> = task::spawn(
365            async move {
366                let mut histograms = BlockTimingsHistograms::new()?;
367                let mut block_time_quantiles_timer = Instant::now();
368
369                while let Some(block_timings) = block_time_quantiles_receiver.recv().await {
370                    if shutdown_notifier.is_cancelled() {
371                        info!("Shutdown signal received on block time quantiles task");
372                        break;
373                    }
374
375                    histograms.record(block_timings)?;
376
377                    // Print block time quantiles every 5 seconds.
378                    if block_time_quantiles_timer.elapsed().as_secs() >= 5 {
379                        for quantile in [0.99, 0.95, 0.90, 0.50] {
380                            let formatted_quantile = (quantile * 100.0) as usize;
381
382                            // Overall block timing
383                            info!(
384                                "Block time p{}: {} ms",
385                                formatted_quantile,
386                                histograms.block_time_histogram.value_at_quantile(quantile)
387                            );
388
389                            // Block time breakdown
390                            info!(
391                                "  ├─ Get pending message bundles p{}: {} ms",
392                                formatted_quantile,
393                                histograms
394                                    .block_time_timings_histograms
395                                    .get_pending_message_bundles_histogram
396                                    .value_at_quantile(quantile)
397                            );
398                            info!(
399                                "  ├─ Submit fast block proposal p{}: {} ms",
400                                formatted_quantile,
401                                histograms
402                                    .block_time_timings_histograms
403                                    .submit_fast_block_proposal_histogram
404                                    .value_at_quantile(quantile)
405                            );
406                            info!(
407                                "  │  ├─ Creating proposal p{}: {} ms",
408                                formatted_quantile,
409                                histograms
410                                    .block_time_timings_histograms
411                                    .submit_fast_block_proposal_timings_histograms
412                                    .creating_proposal_histogram
413                                    .value_at_quantile(quantile)
414                            );
415                            info!(
416                                "  │  ├─ Stage block execution p{}: {} ms",
417                                formatted_quantile,
418                                histograms
419                                    .block_time_timings_histograms
420                                    .submit_fast_block_proposal_timings_histograms
421                                    .stage_block_execution_histogram
422                                    .value_at_quantile(quantile)
423                            );
424                            info!(
425                                "  │  ├─ Creating confirmed block p{}: {} ms",
426                                formatted_quantile,
427                                histograms
428                                    .block_time_timings_histograms
429                                    .submit_fast_block_proposal_timings_histograms
430                                    .creating_confirmed_block_histogram
431                                    .value_at_quantile(quantile)
432                            );
433                            info!(
434                                "  │  └─ Submitting block proposal p{}: {} ms",
435                                formatted_quantile,
436                                histograms
437                                    .block_time_timings_histograms
438                                    .submit_fast_block_proposal_timings_histograms
439                                    .submitting_block_proposal_histogram
440                                    .value_at_quantile(quantile)
441                            );
442                            info!(
443                                "  └─ Communicate chain updates p{}: {} ms",
444                                formatted_quantile,
445                                histograms
446                                    .block_time_timings_histograms
447                                    .communicate_chain_updates_histogram
448                                    .value_at_quantile(quantile)
449                            );
450                        }
451                        block_time_quantiles_timer = Instant::now();
452                    }
453                }
454
455                info!("Exiting block time quantiles task");
456                Ok(())
457            }
458            .instrument(tracing::info_span!("block_time_quantiles")),
459        );
460        (block_time_quantiles_sender, block_time_quantiles_task)
461    }
462
463    async fn metrics_watcher(
464        health_check_endpoints: Option<String>,
465        shutdown_notifier: CancellationToken,
466    ) -> Result<Option<task::JoinHandle<Result<(), BenchmarkError>>>, BenchmarkError> {
467        if let Some(health_check_endpoints) = health_check_endpoints {
468            let metrics_addresses = health_check_endpoints
469                .split(',')
470                .map(|address| format!("http://{}/metrics", address.trim()))
471                .collect::<Vec<_>>();
472
473            let mut previous_histogram_snapshots: HashMap<String, HistogramSnapshot> =
474                HashMap::new();
475            let scrapes = Self::get_scrapes(&metrics_addresses).await?;
476            for (metrics_address, scrape) in scrapes {
477                previous_histogram_snapshots.insert(
478                    metrics_address,
479                    Self::parse_histogram(&scrape, LATENCY_METRIC_PREFIX)?,
480                );
481            }
482
483            let metrics_watcher: task::JoinHandle<Result<(), BenchmarkError>> = tokio::spawn(
484                async move {
485                    let mut health_interval = time::interval(time::Duration::from_secs(5));
486                    let mut shutdown_interval = time::interval(time::Duration::from_secs(1));
487                    loop {
488                        tokio::select! {
489                            biased;
490                            _ = health_interval.tick() => {
491                                let result = Self::validators_healthy(&metrics_addresses, &mut previous_histogram_snapshots).await;
492                                if let Err(ref err) = result {
493                                    info!("Shutting down benchmark due to error: {}", err);
494                                    shutdown_notifier.cancel();
495                                    break;
496                                } else if !result? {
497                                    info!("Shutting down benchmark due to unhealthy validators");
498                                    shutdown_notifier.cancel();
499                                    break;
500                                }
501                            }
502                            _ = shutdown_interval.tick() => {
503                                if shutdown_notifier.is_cancelled() {
504                                    info!("Shutdown signal received, stopping metrics watcher");
505                                    break;
506                                }
507                            }
508                        }
509                    }
510
511                    Ok(())
512                }
513                .instrument(tracing::info_span!("metrics_watcher")),
514            );
515
516            Ok(Some(metrics_watcher))
517        } else {
518            Ok(None)
519        }
520    }
521
522    fn runtime_control_task(
523        shutdown_notifier: &CancellationToken,
524        runtime_in_seconds: Option<u64>,
525        num_chain_groups: usize,
526    ) -> (Option<task::JoinHandle<()>>, Option<mpsc::Sender<()>>) {
527        if let Some(runtime_in_seconds) = runtime_in_seconds {
528            let (runtime_control_sender, mut runtime_control_receiver) =
529                mpsc::channel(num_chain_groups);
530            let shutdown_notifier = shutdown_notifier.clone();
531            let runtime_control_task = task::spawn(
532                async move {
533                    let mut chains_started = 0;
534                    while runtime_control_receiver.recv().await.is_some() {
535                        chains_started += 1;
536                        if chains_started == num_chain_groups {
537                            break;
538                        }
539                    }
540                    time::sleep(time::Duration::from_secs(runtime_in_seconds)).await;
541                    shutdown_notifier.cancel();
542                }
543                .instrument(tracing::info_span!("runtime_control")),
544            );
545            (Some(runtime_control_task), Some(runtime_control_sender))
546        } else {
547            (None, None)
548        }
549    }
550
551    async fn validators_healthy(
552        metrics_addresses: &[String],
553        previous_histogram_snapshots: &mut HashMap<String, HistogramSnapshot>,
554    ) -> Result<bool, BenchmarkError> {
555        let scrapes = Self::get_scrapes(metrics_addresses).await?;
556        for (metrics_address, scrape) in scrapes {
557            let histogram = Self::parse_histogram(&scrape, LATENCY_METRIC_PREFIX)?;
558            let diff = Self::diff_histograms(
559                previous_histogram_snapshots.get(&metrics_address).ok_or(
560                    BenchmarkError::PreviousHistogramSnapshotDoesNotExist(metrics_address.clone()),
561                )?,
562                &histogram,
563            )?;
564            let p99 = match Self::compute_quantile(&diff.buckets, diff.count, 0.99) {
565                Ok(p99) => p99,
566                Err(BenchmarkError::NoDataYetForP99Calculation) => {
567                    info!(
568                        "No data available yet to calculate p99 for {}",
569                        metrics_address
570                    );
571                    continue;
572                }
573                Err(e) => {
574                    error!("Error computing p99 for {}: {}", metrics_address, e);
575                    return Err(e);
576                }
577            };
578
579            let last_bucket_boundary = diff.buckets[diff.buckets.len() - 2].less_than;
580            if p99 == f64::INFINITY {
581                info!(
582                    "{} -> Estimated p99 for {} is higher than the last bucket boundary of {:?} ms",
583                    metrics_address, LATENCY_METRIC_PREFIX, last_bucket_boundary
584                );
585            } else {
586                info!(
587                    "{} -> Estimated p99 for {}: {:.2} ms",
588                    metrics_address, LATENCY_METRIC_PREFIX, p99
589                );
590            }
591            if p99 > PROXY_LATENCY_P99_THRESHOLD {
592                if p99 == f64::INFINITY {
593                    error!(
594                        "Proxy of validator {} unhealthy! Latency p99 is too high, it is higher than \
595                        the last bucket boundary of {:.2} ms",
596                        metrics_address, last_bucket_boundary
597                    );
598                } else {
599                    error!(
600                        "Proxy of validator {} unhealthy! Latency p99 is too high: {:.2} ms",
601                        metrics_address, p99
602                    );
603                }
604                return Ok(false);
605            }
606            previous_histogram_snapshots.insert(metrics_address.clone(), histogram);
607        }
608
609        Ok(true)
610    }
611
612    fn diff_histograms(
613        previous: &HistogramSnapshot,
614        current: &HistogramSnapshot,
615    ) -> Result<HistogramSnapshot, BenchmarkError> {
616        if current.count < previous.count {
617            return Err(BenchmarkError::HistogramCountMismatch);
618        }
619        let total_diff = current.count - previous.count;
620        let mut buckets_diff: Vec<HistogramCount> = Vec::new();
621        for (before, after) in previous.buckets.iter().zip(current.buckets.iter()) {
622            let bound_before = before.less_than;
623            let bound_after = after.less_than;
624            let cumulative_before = before.count;
625            let cumulative_after = after.count;
626            if (bound_before - bound_after).abs() > f64::EPSILON {
627                return Err(BenchmarkError::BucketBoundariesDoNotMatch(
628                    bound_before,
629                    bound_after,
630                ));
631            }
632            let diff = (cumulative_after - cumulative_before).max(0.0);
633            buckets_diff.push(HistogramCount {
634                less_than: bound_after,
635                count: diff,
636            });
637        }
638        Ok(HistogramSnapshot {
639            buckets: buckets_diff,
640            count: total_diff,
641            sum: current.sum - previous.sum,
642        })
643    }
644
645    async fn get_scrapes(
646        metrics_addresses: &[String],
647    ) -> Result<Vec<(String, Scrape)>, BenchmarkError> {
648        let mut scrapes = Vec::new();
649        for metrics_address in metrics_addresses {
650            let response = reqwest::get(metrics_address)
651                .await
652                .map_err(BenchmarkError::Reqwest)?;
653            let metrics = response.text().await.map_err(BenchmarkError::Reqwest)?;
654            let scrape = Scrape::parse(metrics.lines().map(|line| Ok(line.to_owned())))
655                .map_err(BenchmarkError::IoError)?;
656            scrapes.push((metrics_address.clone(), scrape));
657        }
658        Ok(scrapes)
659    }
660
661    fn parse_histogram(
662        scrape: &Scrape,
663        metric_prefix: &str,
664    ) -> Result<HistogramSnapshot, BenchmarkError> {
665        let mut buckets: Vec<HistogramCount> = Vec::new();
666        let mut total_count: Option<f64> = None;
667        let mut total_sum: Option<f64> = None;
668
669        // Iterate over each metric in the scrape.
670        for sample in &scrape.samples {
671            if sample.metric == metric_prefix {
672                if let Value::Histogram(histogram) = &sample.value {
673                    buckets.extend(histogram.iter().cloned());
674                } else {
675                    return Err(BenchmarkError::ExpectedHistogramValue(sample.value.clone()));
676                }
677            } else if sample.metric == format!("{}_count", metric_prefix) {
678                if let Value::Untyped(count) = sample.value {
679                    total_count = Some(count);
680                } else {
681                    return Err(BenchmarkError::ExpectedUntypedValue(sample.value.clone()));
682                }
683            } else if sample.metric == format!("{}_sum", metric_prefix) {
684                if let Value::Untyped(sum) = sample.value {
685                    total_sum = Some(sum);
686                } else {
687                    return Err(BenchmarkError::ExpectedUntypedValue(sample.value.clone()));
688                }
689            }
690        }
691
692        match (total_count, total_sum) {
693            (Some(count), Some(sum)) if !buckets.is_empty() => {
694                buckets.sort_by(|a, b| {
695                    a.less_than
696                        .partial_cmp(&b.less_than)
697                        .expect("Comparison should not fail")
698                });
699                Ok(HistogramSnapshot {
700                    buckets,
701                    count,
702                    sum,
703                })
704            }
705            _ => Err(BenchmarkError::IncompleteHistogramData),
706        }
707    }
708
709    fn compute_quantile(
710        buckets: &[HistogramCount],
711        total_count: f64,
712        quantile: f64,
713    ) -> Result<f64, BenchmarkError> {
714        if total_count == 0.0 {
715            // Had no samples in the last 5s.
716            return Err(BenchmarkError::NoDataYetForP99Calculation);
717        }
718        // Compute the target cumulative count.
719        let target = (quantile * total_count).ceil();
720        let mut prev_cumulative = 0.0;
721        let mut prev_bound = 0.0;
722        for bucket in buckets {
723            if bucket.count >= target {
724                let bucket_count = bucket.count - prev_cumulative;
725                if bucket_count == 0.0 {
726                    // Bucket that is supposed to contain the target quantile is empty, unexpectedly.
727                    return Err(BenchmarkError::UnexpectedEmptyBucket);
728                }
729                let fraction = (target - prev_cumulative) / bucket_count;
730                return Ok(prev_bound + (bucket.less_than - prev_bound) * fraction);
731            }
732            prev_cumulative = bucket.count;
733            prev_bound = bucket.less_than;
734        }
735        Err(BenchmarkError::CouldNotComputeQuantile)
736    }
737
738    #[expect(clippy::too_many_arguments)]
739    async fn run_benchmark_internal(
740        chain_group_index: usize,
741        bps: usize,
742        chain_group: Vec<(Vec<Operation>, AccountOwner)>,
743        chain_clients: Vec<ChainClient<Env>>,
744        shutdown_notifier: CancellationToken,
745        bps_count: Arc<AtomicUsize>,
746        committee: Committee,
747        block_time_quantiles_sender: mpsc::UnboundedSender<BlockTimings>,
748        barrier: Arc<Barrier>,
749        notifier: Arc<Notify>,
750        runtime_control_sender: Option<mpsc::Sender<()>>,
751        delay_between_chain_groups_ms: Option<u64>,
752    ) -> Result<(), BenchmarkError> {
753        barrier.wait().await;
754        if let Some(delay_between_chain_groups_ms) = delay_between_chain_groups_ms {
755            time::sleep(time::Duration::from_millis(
756                (chain_group_index as u64) * delay_between_chain_groups_ms,
757            ))
758            .await;
759        }
760        info!("Starting benchmark for chain group {:?}", chain_group_index);
761
762        if let Some(runtime_control_sender) = runtime_control_sender {
763            runtime_control_sender.send(()).await?;
764        }
765
766        for ((operations, chain_owner), chain_client) in chain_group
767            .into_iter()
768            .zip(chain_clients.into_iter())
769            .cycle()
770        {
771            if shutdown_notifier.is_cancelled() {
772                info!("Shutdown signal received, stopping benchmark");
773                break;
774            }
775
776            let block_time_start = Instant::now();
777            let submit_fast_block_proposal_start = Instant::now();
778            let get_pending_message_bundles_start = Instant::now();
779            let incoming_bundles = chain_client.pending_message_bundles().await?;
780            let get_pending_message_bundles_ms =
781                get_pending_message_bundles_start.elapsed().as_millis() as u64;
782            let (
783                creating_proposal_ms,
784                stage_block_execution_ms,
785                creating_confirmed_block_ms,
786                submitting_block_proposal_ms,
787            ) = chain_client
788                .submit_fast_block_proposal(&committee, &operations, &incoming_bundles, chain_owner)
789                .await
790                .map_err(BenchmarkError::ChainClient)?;
791            let submit_fast_block_proposal_ms =
792                submit_fast_block_proposal_start.elapsed().as_millis() as u64;
793            let communicate_chain_updates_start = Instant::now();
794            // We assume the committee will not change during the benchmark.
795            chain_client
796                .communicate_chain_updates(&committee)
797                .await
798                .map_err(BenchmarkError::ChainClient)?;
799            let communicate_chain_updates_ms =
800                communicate_chain_updates_start.elapsed().as_millis() as u64;
801            let block_time_ms = block_time_start.elapsed().as_millis() as u64;
802            let block_metrics = BlockTimings {
803                block_time_ms,
804                block_time_timings: BlockTimeTimings {
805                    get_pending_message_bundles_ms,
806                    submit_fast_block_proposal_ms,
807                    submit_fast_block_proposal_timings: SubmitFastBlockProposalTimings {
808                        creating_proposal_ms,
809                        stage_block_execution_ms,
810                        creating_confirmed_block_ms,
811                        submitting_block_proposal_ms,
812                    },
813                    communicate_chain_updates_ms,
814                },
815            };
816            if let Err(e) = block_time_quantiles_sender.send(block_metrics) {
817                // The quantiles task might receive the shutdown signal first and exit before this
818                // one receives it.
819                warn!("Failed to send block time quantiles: {}", e);
820            }
821
822            let current_bps_count = bps_count.fetch_add(1, Ordering::Relaxed) + 1;
823            if current_bps_count >= bps {
824                notifier.notified().await;
825            }
826        }
827
828        info!("Exiting task...");
829        Ok(())
830    }
831
832    /// Closes the chain that was created for the benchmark.
833    pub async fn close_benchmark_chain(
834        chain_client: &ChainClient<Env>,
835    ) -> Result<(), BenchmarkError> {
836        let start = Instant::now();
837        chain_client
838            .execute_operation(Operation::system(SystemOperation::CloseChain))
839            .await?
840            .expect("Close chain operation should not fail!");
841
842        debug!(
843            "Closed chain {:?} in {} ms",
844            chain_client.chain_id(),
845            start.elapsed().as_millis()
846        );
847
848        Ok(())
849    }
850
851    /// Generates information related to one block per chain.
852    pub fn make_benchmark_block_info(
853        benchmark_chains: Vec<Vec<(ChainId, AccountOwner)>>,
854        transactions_per_block: usize,
855        fungible_application_id: Option<ApplicationId>,
856    ) -> Vec<Vec<(Vec<Operation>, AccountOwner)>> {
857        let mut blocks_infos = Vec::new();
858        for chains in benchmark_chains {
859            let mut infos = Vec::new();
860            let chains_len = chains.len();
861            let amount = Amount::from(1);
862            for i in 0..chains_len {
863                let owner = chains[i].1;
864                let recipient_chain_id = chains[(i + chains_len - 1) % chains_len].0;
865                let operation = match fungible_application_id {
866                    Some(application_id) => Self::fungible_transfer(
867                        application_id,
868                        recipient_chain_id,
869                        owner,
870                        owner,
871                        amount,
872                    ),
873                    None => Operation::system(SystemOperation::Transfer {
874                        owner: AccountOwner::CHAIN,
875                        recipient: Recipient::chain(recipient_chain_id),
876                        amount,
877                    }),
878                };
879                let operations = iter::repeat_n(operation, transactions_per_block).collect();
880                infos.push((operations, owner));
881            }
882            blocks_infos.push(infos);
883        }
884        blocks_infos
885    }
886
887    /// Creates a fungible token transfer operation.
888    pub fn fungible_transfer(
889        application_id: ApplicationId,
890        chain_id: ChainId,
891        sender: AccountOwner,
892        receiver: AccountOwner,
893        amount: Amount,
894    ) -> Operation {
895        let target_account = fungible::Account {
896            chain_id,
897            owner: receiver,
898        };
899        let bytes = bcs::to_bytes(&FungibleOperation::Transfer {
900            owner: sender,
901            amount,
902            target_account,
903        })
904        .expect("should serialize fungible token operation");
905        Operation::User {
906            application_id,
907            bytes,
908        }
909    }
910}