linera_client/
benchmark.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::HashMap,
6    path::Path,
7    sync::{
8        atomic::{AtomicUsize, Ordering},
9        Arc,
10    },
11};
12
13use linera_base::{
14    data_types::{Amount, Timestamp},
15    identifiers::{Account, AccountOwner, ApplicationId, ChainId},
16    time::Instant,
17};
18use linera_core::{
19    client::chain_client::{self, ChainClient},
20    data_types::ClientOutcome,
21    Environment,
22};
23use linera_execution::{system::SystemOperation, Operation};
24use linera_sdk::abis::fungible::FungibleOperation;
25use num_format::{Locale, ToFormattedString};
26use prometheus_parse::{HistogramCount, Scrape, Value};
27use rand::{rngs::SmallRng, seq::SliceRandom, thread_rng, SeedableRng};
28use serde::{Deserialize, Serialize};
29use tokio::{
30    sync::{mpsc, Barrier, Notify},
31    task, time,
32};
33use tokio_util::sync::CancellationToken;
34use tracing::{debug, error, info, warn, Instrument as _};
35
36use crate::chain_listener::{ChainListener, ClientContext};
37
38/// Trait for generating benchmark operations.
39///
40/// Implement this trait to create custom operation generators for different
41/// application benchmarks (e.g., prediction markets, custom tokens, etc.).
42///
43/// Each benchmark chain gets its own generator instance. The generator is responsible
44/// for producing operations to include in blocks, including any destination chain
45/// selection logic.
46pub trait OperationGenerator: Send + 'static {
47    /// Generate a batch of operations for a single block.
48    fn generate_operations(&mut self, owner: AccountOwner, count: usize) -> Vec<Operation>;
49}
50
51/// Generates native fungible token transfer operations between chains.
52pub struct NativeFungibleTransferGenerator {
53    source_chain_id: ChainId,
54    destination_chains: Vec<ChainId>,
55    destination_index: usize,
56    rng: SmallRng,
57    single_destination_per_block: bool,
58}
59
60impl NativeFungibleTransferGenerator {
61    pub fn new(
62        source_chain_id: ChainId,
63        mut destination_chains: Vec<ChainId>,
64        single_destination_per_block: bool,
65    ) -> Result<Self, BenchmarkError> {
66        // With a single chain, send to self.
67        if destination_chains.is_empty() {
68            destination_chains.push(source_chain_id);
69        }
70        let mut rng = SmallRng::from_rng(thread_rng())?;
71        destination_chains.shuffle(&mut rng);
72        Ok(Self {
73            source_chain_id,
74            destination_chains,
75            destination_index: 0,
76            rng,
77            single_destination_per_block,
78        })
79    }
80
81    fn next_destination(&mut self) -> ChainId {
82        if self.destination_index >= self.destination_chains.len() {
83            self.destination_chains.shuffle(&mut self.rng);
84            self.destination_index = 0;
85        }
86        let destination_chain_id = self.destination_chains[self.destination_index];
87        self.destination_index += 1;
88        // Skip self when there are other destinations available.
89        if destination_chain_id == self.source_chain_id && self.destination_chains.len() > 1 {
90            self.next_destination()
91        } else {
92            destination_chain_id
93        }
94    }
95}
96
97impl OperationGenerator for NativeFungibleTransferGenerator {
98    fn generate_operations(&mut self, _owner: AccountOwner, count: usize) -> Vec<Operation> {
99        let amount = Amount::from_attos(1);
100        if self.single_destination_per_block {
101            let recipient = self.next_destination();
102            (0..count)
103                .map(|_| {
104                    Operation::system(SystemOperation::Transfer {
105                        owner: AccountOwner::CHAIN,
106                        recipient: Account::chain(recipient),
107                        amount,
108                    })
109                })
110                .collect()
111        } else {
112            (0..count)
113                .map(|_| {
114                    let recipient = self.next_destination();
115                    Operation::system(SystemOperation::Transfer {
116                        owner: AccountOwner::CHAIN,
117                        recipient: Account::chain(recipient),
118                        amount,
119                    })
120                })
121                .collect()
122        }
123    }
124}
125
126/// Generates fungible token transfer operations between chains.
127pub struct FungibleTransferGenerator {
128    application_id: ApplicationId,
129    source_chain_id: ChainId,
130    destination_chains: Vec<ChainId>,
131    destination_index: usize,
132    rng: SmallRng,
133    single_destination_per_block: bool,
134}
135
136impl FungibleTransferGenerator {
137    pub fn new(
138        application_id: ApplicationId,
139        source_chain_id: ChainId,
140        mut destination_chains: Vec<ChainId>,
141        single_destination_per_block: bool,
142    ) -> Result<Self, BenchmarkError> {
143        // With a single chain, send to self (matching old behavior).
144        if destination_chains.is_empty() {
145            destination_chains.push(source_chain_id);
146        }
147        let mut rng = SmallRng::from_rng(thread_rng())?;
148        destination_chains.shuffle(&mut rng);
149        Ok(Self {
150            application_id,
151            source_chain_id,
152            destination_chains,
153            destination_index: 0,
154            rng,
155            single_destination_per_block,
156        })
157    }
158
159    fn next_destination(&mut self) -> ChainId {
160        if self.destination_index >= self.destination_chains.len() {
161            self.destination_chains.shuffle(&mut self.rng);
162            self.destination_index = 0;
163        }
164        let destination_chain_id = self.destination_chains[self.destination_index];
165        self.destination_index += 1;
166        // Skip self when there are other destinations available.
167        if destination_chain_id == self.source_chain_id && self.destination_chains.len() > 1 {
168            self.next_destination()
169        } else {
170            destination_chain_id
171        }
172    }
173}
174
175impl OperationGenerator for FungibleTransferGenerator {
176    fn generate_operations(&mut self, owner: AccountOwner, count: usize) -> Vec<Operation> {
177        let amount = Amount::from_attos(1);
178        if self.single_destination_per_block {
179            let recipient = self.next_destination();
180            (0..count)
181                .map(|_| fungible_transfer(self.application_id, recipient, owner, owner, amount))
182                .collect()
183        } else {
184            (0..count)
185                .map(|_| {
186                    let recipient = self.next_destination();
187                    fungible_transfer(self.application_id, recipient, owner, owner, amount)
188                })
189                .collect()
190        }
191    }
192}
193
194const PROXY_LATENCY_P99_THRESHOLD: f64 = 400.0;
195const LATENCY_METRIC_PREFIX: &str = "linera_proxy_request_latency";
196
197#[derive(Debug, thiserror::Error)]
198pub enum BenchmarkError {
199    #[error("Failed to join task: {0}")]
200    JoinError(#[from] task::JoinError),
201    #[error("Chain client error: {0}")]
202    ChainClient(#[from] chain_client::Error),
203    #[error("Current histogram count is less than previous histogram count")]
204    HistogramCountMismatch,
205    #[error("Expected histogram value, got {0:?}")]
206    ExpectedHistogramValue(Value),
207    #[error("Expected untyped value, got {0:?}")]
208    ExpectedUntypedValue(Value),
209    #[error("Incomplete histogram data")]
210    IncompleteHistogramData,
211    #[error("Could not compute quantile")]
212    CouldNotComputeQuantile,
213    #[error("Bucket boundaries do not match: {0} vs {1}")]
214    BucketBoundariesDoNotMatch(f64, f64),
215    #[error("Reqwest error: {0}")]
216    Reqwest(#[from] reqwest::Error),
217    #[error("Io error: {0}")]
218    IoError(#[from] std::io::Error),
219    #[error("Previous histogram snapshot does not exist: {0}")]
220    PreviousHistogramSnapshotDoesNotExist(String),
221    #[error("No data available yet to calculate p99")]
222    NoDataYetForP99Calculation,
223    #[error("Unexpected empty bucket")]
224    UnexpectedEmptyBucket,
225    #[error("Failed to send unit message: {0}")]
226    TokioSendUnitError(#[from] mpsc::error::SendError<()>),
227    #[error("Config file not found: {0}")]
228    ConfigFileNotFound(std::path::PathBuf),
229    #[error("Failed to load config file: {0}")]
230    ConfigLoadError(#[from] anyhow::Error),
231    #[error("Could not find enough chains in wallet alone: needed {0}, but only found {1}")]
232    NotEnoughChainsInWallet(usize, usize),
233    #[error("Random number generator error: {0}")]
234    RandError(#[from] rand::Error),
235    #[error("Chain listener startup error")]
236    ChainListenerStartupError,
237}
238
239#[derive(Debug)]
240struct HistogramSnapshot {
241    buckets: Vec<HistogramCount>,
242    count: f64,
243    sum: f64,
244}
245
246#[derive(Debug, Clone, Serialize, Deserialize)]
247#[serde(rename_all = "kebab-case")]
248pub struct BenchmarkConfig {
249    pub chain_ids: Vec<ChainId>,
250}
251
252impl BenchmarkConfig {
253    pub fn load_from_file<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
254        let content = std::fs::read_to_string(path)?;
255        let config = serde_yaml::from_str(&content)?;
256        Ok(config)
257    }
258
259    pub fn save_to_file<P: AsRef<Path>>(&self, path: P) -> anyhow::Result<()> {
260        let content = serde_yaml::to_string(self)?;
261        std::fs::write(path, content)?;
262        Ok(())
263    }
264}
265
266pub struct Benchmark<Env: Environment> {
267    _phantom: std::marker::PhantomData<Env>,
268}
269
270impl<Env: Environment> Benchmark<Env> {
271    /// Runs a benchmark with the given chain clients and operation generators.
272    ///
273    /// Each chain client is paired with an operation generator (one per chain).
274    /// The generators produce the operations to include in each block.
275    #[expect(clippy::too_many_arguments)]
276    pub async fn run_benchmark<C: ClientContext<Environment = Env> + 'static>(
277        bps: usize,
278        chain_clients: Vec<ChainClient<Env>>,
279        generators: Vec<Box<dyn OperationGenerator>>,
280        transactions_per_block: usize,
281        health_check_endpoints: Option<String>,
282        runtime_in_seconds: Option<u64>,
283        delay_between_chains_ms: Option<u64>,
284        chain_listener: ChainListener<C>,
285        shutdown_notifier: &CancellationToken,
286    ) -> Result<(), BenchmarkError> {
287        assert_eq!(
288            chain_clients.len(),
289            generators.len(),
290            "Must have one generator per chain client"
291        );
292        let num_chains = chain_clients.len();
293        let bps_counts = (0..num_chains)
294            .map(|_| Arc::new(AtomicUsize::new(0)))
295            .collect::<Vec<_>>();
296        let notifier = Arc::new(Notify::new());
297        let barrier = Arc::new(Barrier::new(num_chains + 1));
298
299        let chain_listener_future = chain_listener
300            .run()
301            .await
302            .map_err(|_| BenchmarkError::ChainListenerStartupError)?;
303        let chain_listener_handle = tokio::spawn(chain_listener_future.in_current_span());
304
305        let bps_control_task = Self::bps_control_task(
306            &barrier,
307            shutdown_notifier,
308            &bps_counts,
309            &notifier,
310            transactions_per_block,
311            bps,
312        );
313
314        let (runtime_control_task, runtime_control_sender) =
315            Self::runtime_control_task(shutdown_notifier, runtime_in_seconds, num_chains);
316
317        let bps_initial_share = bps / num_chains;
318        let mut bps_remainder = bps % num_chains;
319        let mut join_set = task::JoinSet::<Result<(), BenchmarkError>>::new();
320        for (chain_idx, (chain_client, generator)) in
321            chain_clients.into_iter().zip(generators).enumerate()
322        {
323            let chain_id = chain_client.chain_id();
324            let shutdown_notifier_clone = shutdown_notifier.clone();
325            let barrier_clone = barrier.clone();
326            let bps_count_clone = bps_counts[chain_idx].clone();
327            let notifier_clone = notifier.clone();
328            let runtime_control_sender_clone = runtime_control_sender.clone();
329            let bps_share = if bps_remainder > 0 {
330                bps_remainder -= 1;
331                bps_initial_share + 1
332            } else {
333                bps_initial_share
334            };
335            join_set.spawn(
336                async move {
337                    Box::pin(Self::run_benchmark_internal(
338                        chain_idx,
339                        chain_id,
340                        bps_share,
341                        chain_client,
342                        generator,
343                        transactions_per_block,
344                        shutdown_notifier_clone,
345                        bps_count_clone,
346                        barrier_clone,
347                        notifier_clone,
348                        runtime_control_sender_clone,
349                        delay_between_chains_ms,
350                    ))
351                    .await?;
352
353                    Ok(())
354                }
355                .instrument(tracing::info_span!("chain_id", chain_id = ?chain_id)),
356            );
357        }
358
359        let metrics_watcher =
360            Self::metrics_watcher(health_check_endpoints, shutdown_notifier).await?;
361
362        // Wait for tasks and fail immediately if any task returns an error or panics
363        while let Some(result) = join_set.join_next().await {
364            let inner_result = result?;
365            if let Err(e) = inner_result {
366                error!("Benchmark task failed: {}", e);
367                shutdown_notifier.cancel();
368                join_set.abort_all();
369                return Err(e);
370            }
371        }
372        info!("All benchmark tasks completed successfully");
373
374        bps_control_task.await?;
375        if let Some(metrics_watcher) = metrics_watcher {
376            metrics_watcher.await??;
377        }
378        if let Some(runtime_control_task) = runtime_control_task {
379            runtime_control_task.await?;
380        }
381
382        if let Err(e) = chain_listener_handle.await? {
383            tracing::error!("chain listener error: {e}");
384        }
385
386        Ok(())
387    }
388
389    // The bps control task will control the BPS from the threads.
390    fn bps_control_task(
391        barrier: &Arc<Barrier>,
392        shutdown_notifier: &CancellationToken,
393        bps_counts: &[Arc<AtomicUsize>],
394        notifier: &Arc<Notify>,
395        transactions_per_block: usize,
396        bps: usize,
397    ) -> task::JoinHandle<()> {
398        let shutdown_notifier = shutdown_notifier.clone();
399        let bps_counts = bps_counts.to_vec();
400        let notifier = notifier.clone();
401        let barrier = barrier.clone();
402        task::spawn(
403            async move {
404                barrier.wait().await;
405                let mut one_second_interval = time::interval(time::Duration::from_secs(1));
406                loop {
407                    if shutdown_notifier.is_cancelled() {
408                        info!("Shutdown signal received in bps control task");
409                        break;
410                    }
411                    one_second_interval.tick().await;
412                    let current_bps_count: usize = bps_counts
413                        .iter()
414                        .map(|count| count.swap(0, Ordering::Relaxed))
415                        .sum();
416                    notifier.notify_waiters();
417                    let formatted_current_bps = current_bps_count.to_formatted_string(&Locale::en);
418                    let formatted_current_tps = (current_bps_count * transactions_per_block)
419                        .to_formatted_string(&Locale::en);
420                    let formatted_tps_goal =
421                        (bps * transactions_per_block).to_formatted_string(&Locale::en);
422                    let formatted_bps_goal = bps.to_formatted_string(&Locale::en);
423                    if current_bps_count >= bps {
424                        info!(
425                            "Achieved {} BPS/{} TPS",
426                            formatted_current_bps, formatted_current_tps
427                        );
428                    } else {
429                        warn!(
430                            "Failed to achieve {} BPS/{} TPS, only achieved {} BPS/{} TPS",
431                            formatted_bps_goal,
432                            formatted_tps_goal,
433                            formatted_current_bps,
434                            formatted_current_tps,
435                        );
436                    }
437                }
438
439                info!("Exiting bps control task");
440            }
441            .instrument(tracing::info_span!("bps_control")),
442        )
443    }
444
445    async fn metrics_watcher(
446        health_check_endpoints: Option<String>,
447        shutdown_notifier: &CancellationToken,
448    ) -> Result<Option<task::JoinHandle<Result<(), BenchmarkError>>>, BenchmarkError> {
449        if let Some(health_check_endpoints) = health_check_endpoints {
450            let metrics_addresses = health_check_endpoints
451                .split(',')
452                .map(|address| format!("http://{}/metrics", address.trim()))
453                .collect::<Vec<_>>();
454
455            let mut previous_histogram_snapshots: HashMap<String, HistogramSnapshot> =
456                HashMap::new();
457            let scrapes = Self::get_scrapes(&metrics_addresses).await?;
458            for (metrics_address, scrape) in scrapes {
459                previous_histogram_snapshots.insert(
460                    metrics_address,
461                    Self::parse_histogram(&scrape, LATENCY_METRIC_PREFIX)?,
462                );
463            }
464
465            let shutdown_notifier = shutdown_notifier.clone();
466            let metrics_watcher: task::JoinHandle<Result<(), BenchmarkError>> = tokio::spawn(
467                async move {
468                    let mut health_interval = time::interval(time::Duration::from_secs(5));
469                    let mut shutdown_interval = time::interval(time::Duration::from_secs(1));
470                    loop {
471                        tokio::select! {
472                            biased;
473                            _ = health_interval.tick() => {
474                                let result = Self::validators_healthy(&metrics_addresses, &mut previous_histogram_snapshots).await;
475                                if let Err(ref err) = result {
476                                    info!("Shutting down benchmark due to error: {}", err);
477                                    shutdown_notifier.cancel();
478                                    break;
479                                } else if !result? {
480                                    info!("Shutting down benchmark due to unhealthy validators");
481                                    shutdown_notifier.cancel();
482                                    break;
483                                }
484                            }
485                            _ = shutdown_interval.tick() => {
486                                if shutdown_notifier.is_cancelled() {
487                                    info!("Shutdown signal received, stopping metrics watcher");
488                                    break;
489                                }
490                            }
491                        }
492                    }
493
494                    Ok(())
495                }
496                .instrument(tracing::info_span!("metrics_watcher")),
497            );
498
499            Ok(Some(metrics_watcher))
500        } else {
501            Ok(None)
502        }
503    }
504
505    fn runtime_control_task(
506        shutdown_notifier: &CancellationToken,
507        runtime_in_seconds: Option<u64>,
508        num_chain_groups: usize,
509    ) -> (Option<task::JoinHandle<()>>, Option<mpsc::Sender<()>>) {
510        if let Some(runtime_in_seconds) = runtime_in_seconds {
511            let (runtime_control_sender, mut runtime_control_receiver) =
512                mpsc::channel(num_chain_groups);
513            let shutdown_notifier = shutdown_notifier.clone();
514            let runtime_control_task = task::spawn(
515                async move {
516                    let mut chains_started = 0;
517                    while runtime_control_receiver.recv().await.is_some() {
518                        chains_started += 1;
519                        if chains_started == num_chain_groups {
520                            break;
521                        }
522                    }
523                    time::sleep(time::Duration::from_secs(runtime_in_seconds)).await;
524                    shutdown_notifier.cancel();
525                }
526                .instrument(tracing::info_span!("runtime_control")),
527            );
528            (Some(runtime_control_task), Some(runtime_control_sender))
529        } else {
530            (None, None)
531        }
532    }
533
534    async fn validators_healthy(
535        metrics_addresses: &[String],
536        previous_histogram_snapshots: &mut HashMap<String, HistogramSnapshot>,
537    ) -> Result<bool, BenchmarkError> {
538        let scrapes = Self::get_scrapes(metrics_addresses).await?;
539        for (metrics_address, scrape) in scrapes {
540            let histogram = Self::parse_histogram(&scrape, LATENCY_METRIC_PREFIX)?;
541            let diff = Self::diff_histograms(
542                previous_histogram_snapshots.get(&metrics_address).ok_or(
543                    BenchmarkError::PreviousHistogramSnapshotDoesNotExist(metrics_address.clone()),
544                )?,
545                &histogram,
546            )?;
547            let p99 = match Self::compute_quantile(&diff.buckets, diff.count, 0.99) {
548                Ok(p99) => p99,
549                Err(BenchmarkError::NoDataYetForP99Calculation) => {
550                    info!(
551                        "No data available yet to calculate p99 for {}",
552                        metrics_address
553                    );
554                    continue;
555                }
556                Err(e) => {
557                    error!("Error computing p99 for {}: {}", metrics_address, e);
558                    return Err(e);
559                }
560            };
561
562            let last_bucket_boundary = diff.buckets[diff.buckets.len() - 2].less_than;
563            if p99 == f64::INFINITY {
564                info!(
565                    "{} -> Estimated p99 for {} is higher than the last bucket boundary of {:?} ms",
566                    metrics_address, LATENCY_METRIC_PREFIX, last_bucket_boundary
567                );
568            } else {
569                info!(
570                    "{} -> Estimated p99 for {}: {:.2} ms",
571                    metrics_address, LATENCY_METRIC_PREFIX, p99
572                );
573            }
574            if p99 > PROXY_LATENCY_P99_THRESHOLD {
575                if p99 == f64::INFINITY {
576                    error!(
577                        "Proxy of validator {} unhealthy! Latency p99 is too high, it is higher than \
578                        the last bucket boundary of {:.2} ms",
579                        metrics_address, last_bucket_boundary
580                    );
581                } else {
582                    error!(
583                        "Proxy of validator {} unhealthy! Latency p99 is too high: {:.2} ms",
584                        metrics_address, p99
585                    );
586                }
587                return Ok(false);
588            }
589            previous_histogram_snapshots.insert(metrics_address.clone(), histogram);
590        }
591
592        Ok(true)
593    }
594
595    fn diff_histograms(
596        previous: &HistogramSnapshot,
597        current: &HistogramSnapshot,
598    ) -> Result<HistogramSnapshot, BenchmarkError> {
599        if current.count < previous.count {
600            return Err(BenchmarkError::HistogramCountMismatch);
601        }
602        let total_diff = current.count - previous.count;
603        let mut buckets_diff: Vec<HistogramCount> = Vec::new();
604        for (before, after) in previous.buckets.iter().zip(current.buckets.iter()) {
605            let bound_before = before.less_than;
606            let bound_after = after.less_than;
607            let cumulative_before = before.count;
608            let cumulative_after = after.count;
609            if (bound_before - bound_after).abs() > f64::EPSILON {
610                return Err(BenchmarkError::BucketBoundariesDoNotMatch(
611                    bound_before,
612                    bound_after,
613                ));
614            }
615            let diff = (cumulative_after - cumulative_before).max(0.0);
616            buckets_diff.push(HistogramCount {
617                less_than: bound_after,
618                count: diff,
619            });
620        }
621        Ok(HistogramSnapshot {
622            buckets: buckets_diff,
623            count: total_diff,
624            sum: current.sum - previous.sum,
625        })
626    }
627
628    async fn get_scrapes(
629        metrics_addresses: &[String],
630    ) -> Result<Vec<(String, Scrape)>, BenchmarkError> {
631        let mut scrapes = Vec::new();
632        for metrics_address in metrics_addresses {
633            let response = reqwest::get(metrics_address)
634                .await
635                .map_err(BenchmarkError::Reqwest)?;
636            let metrics = response.text().await.map_err(BenchmarkError::Reqwest)?;
637            let scrape = Scrape::parse(metrics.lines().map(|line| Ok(line.to_owned())))
638                .map_err(BenchmarkError::IoError)?;
639            scrapes.push((metrics_address.clone(), scrape));
640        }
641        Ok(scrapes)
642    }
643
644    fn parse_histogram(
645        scrape: &Scrape,
646        metric_prefix: &str,
647    ) -> Result<HistogramSnapshot, BenchmarkError> {
648        let mut buckets: Vec<HistogramCount> = Vec::new();
649        let mut total_count: Option<f64> = None;
650        let mut total_sum: Option<f64> = None;
651
652        // Iterate over each metric in the scrape.
653        for sample in &scrape.samples {
654            if sample.metric == metric_prefix {
655                if let Value::Histogram(histogram) = &sample.value {
656                    buckets.extend(histogram.iter().cloned());
657                } else {
658                    return Err(BenchmarkError::ExpectedHistogramValue(sample.value.clone()));
659                }
660            } else if sample.metric == format!("{}_count", metric_prefix) {
661                if let Value::Untyped(count) = sample.value {
662                    total_count = Some(count);
663                } else {
664                    return Err(BenchmarkError::ExpectedUntypedValue(sample.value.clone()));
665                }
666            } else if sample.metric == format!("{}_sum", metric_prefix) {
667                if let Value::Untyped(sum) = sample.value {
668                    total_sum = Some(sum);
669                } else {
670                    return Err(BenchmarkError::ExpectedUntypedValue(sample.value.clone()));
671                }
672            }
673        }
674
675        match (total_count, total_sum) {
676            (Some(count), Some(sum)) if !buckets.is_empty() => {
677                buckets.sort_by(|a, b| {
678                    a.less_than
679                        .partial_cmp(&b.less_than)
680                        .expect("Comparison should not fail")
681                });
682                Ok(HistogramSnapshot {
683                    buckets,
684                    count,
685                    sum,
686                })
687            }
688            _ => Err(BenchmarkError::IncompleteHistogramData),
689        }
690    }
691
692    fn compute_quantile(
693        buckets: &[HistogramCount],
694        total_count: f64,
695        quantile: f64,
696    ) -> Result<f64, BenchmarkError> {
697        if total_count == 0.0 {
698            // Had no samples in the last 5s.
699            return Err(BenchmarkError::NoDataYetForP99Calculation);
700        }
701        // Compute the target cumulative count.
702        let target = (quantile * total_count).ceil();
703        let mut prev_cumulative = 0.0;
704        let mut prev_bound = 0.0;
705        for bucket in buckets {
706            if bucket.count >= target {
707                let bucket_count = bucket.count - prev_cumulative;
708                if bucket_count == 0.0 {
709                    // Bucket that is supposed to contain the target quantile is empty, unexpectedly.
710                    return Err(BenchmarkError::UnexpectedEmptyBucket);
711                }
712                let fraction = (target - prev_cumulative) / bucket_count;
713                return Ok(prev_bound + (bucket.less_than - prev_bound) * fraction);
714            }
715            prev_cumulative = bucket.count;
716            prev_bound = bucket.less_than;
717        }
718        Err(BenchmarkError::CouldNotComputeQuantile)
719    }
720
721    #[expect(clippy::too_many_arguments)]
722    async fn run_benchmark_internal(
723        chain_idx: usize,
724        chain_id: ChainId,
725        bps: usize,
726        chain_client: ChainClient<Env>,
727        mut generator: Box<dyn OperationGenerator>,
728        transactions_per_block: usize,
729        shutdown_notifier: CancellationToken,
730        bps_count: Arc<AtomicUsize>,
731        barrier: Arc<Barrier>,
732        notifier: Arc<Notify>,
733        runtime_control_sender: Option<mpsc::Sender<()>>,
734        delay_between_chains_ms: Option<u64>,
735    ) -> Result<(), BenchmarkError> {
736        barrier.wait().await;
737        if let Some(delay_between_chains_ms) = delay_between_chains_ms {
738            time::sleep(time::Duration::from_millis(
739                (chain_idx as u64) * delay_between_chains_ms,
740            ))
741            .await;
742        }
743        info!("Starting benchmark for chain {:?}", chain_id);
744
745        if let Some(runtime_control_sender) = runtime_control_sender {
746            runtime_control_sender.send(()).await?;
747        }
748
749        let owner = chain_client
750            .identity()
751            .await
752            .map_err(BenchmarkError::ChainClient)?;
753
754        loop {
755            tokio::select! {
756                biased;
757
758                _ = shutdown_notifier.cancelled() => {
759                    info!("Shutdown signal received, stopping benchmark");
760                    break;
761                }
762                result = chain_client.execute_operations(
763                    generator.generate_operations(owner, transactions_per_block),
764                    vec![]
765                ) => {
766                    result
767                        .map_err(BenchmarkError::ChainClient)?
768                        .expect("should execute block with operations");
769
770                    let current_bps_count = bps_count.fetch_add(1, Ordering::Relaxed) + 1;
771                    if current_bps_count >= bps {
772                        notifier.notified().await;
773                    }
774                }
775            }
776        }
777
778        info!("Exiting task...");
779        Ok(())
780    }
781
782    /// Closes the chain that was created for the benchmark.
783    pub async fn close_benchmark_chain(
784        chain_client: &ChainClient<Env>,
785    ) -> Result<(), BenchmarkError> {
786        let start = Instant::now();
787        loop {
788            let result = chain_client
789                .execute_operation(Operation::system(SystemOperation::CloseChain))
790                .await?;
791            match result {
792                ClientOutcome::Committed(_) => break,
793                ClientOutcome::Conflict(certificate) => {
794                    info!(
795                        "Conflict while closing chain {:?}: {}. Retrying...",
796                        chain_client.chain_id(),
797                        certificate.hash()
798                    );
799                }
800                ClientOutcome::WaitForTimeout(timeout) => {
801                    info!(
802                        "Waiting for timeout while closing chain {:?}: {}",
803                        chain_client.chain_id(),
804                        timeout
805                    );
806                    linera_base::time::timer::sleep(
807                        timeout.timestamp.duration_since(Timestamp::now()),
808                    )
809                    .await;
810                }
811            }
812        }
813
814        debug!(
815            "Closed chain {:?} in {} ms",
816            chain_client.chain_id(),
817            start.elapsed().as_millis()
818        );
819
820        Ok(())
821    }
822
823    pub fn get_all_chains(
824        chains_config_path: Option<&Path>,
825        benchmark_chains: &[(ChainId, AccountOwner)],
826    ) -> Result<Vec<ChainId>, BenchmarkError> {
827        let all_chains = if let Some(config_path) = chains_config_path {
828            if !config_path.exists() {
829                return Err(BenchmarkError::ConfigFileNotFound(
830                    config_path.to_path_buf(),
831                ));
832            }
833            let config = BenchmarkConfig::load_from_file(config_path)
834                .map_err(BenchmarkError::ConfigLoadError)?;
835            config.chain_ids
836        } else {
837            benchmark_chains.iter().map(|(id, _)| *id).collect()
838        };
839
840        Ok(all_chains)
841    }
842}
843
844pub fn fungible_transfer(
845    application_id: ApplicationId,
846    chain_id: ChainId,
847    sender: AccountOwner,
848    receiver: AccountOwner,
849    amount: Amount,
850) -> Operation {
851    let target_account = Account {
852        chain_id,
853        owner: receiver,
854    };
855    let bytes = bcs::to_bytes(&FungibleOperation::Transfer {
856        owner: sender,
857        amount,
858        target_account,
859    })
860    .expect("should serialize fungible token operation");
861    Operation::User {
862        application_id,
863        bytes,
864    }
865}