linera_client/
benchmark.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::HashMap,
6    path::Path,
7    sync::{
8        atomic::{AtomicUsize, Ordering},
9        Arc,
10    },
11};
12
13use linera_base::{
14    data_types::Amount,
15    identifiers::{Account, AccountOwner, ApplicationId, ChainId},
16    time::Instant,
17};
18use linera_core::{
19    client::{ChainClient, ChainClientError},
20    Environment,
21};
22use linera_execution::{system::SystemOperation, Operation};
23use linera_sdk::abis::fungible::FungibleOperation;
24use num_format::{Locale, ToFormattedString};
25use prometheus_parse::{HistogramCount, Scrape, Value};
26use rand::{rngs::SmallRng, seq::SliceRandom, thread_rng, SeedableRng};
27use serde::{Deserialize, Serialize};
28use tokio::{
29    sync::{mpsc, Barrier, Notify},
30    task, time,
31};
32use tokio_util::sync::CancellationToken;
33use tracing::{debug, error, info, warn, Instrument as _};
34
35use crate::chain_listener::{ChainListener, ClientContext};
36
37const PROXY_LATENCY_P99_THRESHOLD: f64 = 400.0;
38const LATENCY_METRIC_PREFIX: &str = "linera_proxy_request_latency";
39
40#[derive(Debug, thiserror::Error)]
41pub enum BenchmarkError {
42    #[error("Failed to join task: {0}")]
43    JoinError(#[from] task::JoinError),
44    #[error("Chain client error: {0}")]
45    ChainClient(#[from] ChainClientError),
46    #[error("Current histogram count is less than previous histogram count")]
47    HistogramCountMismatch,
48    #[error("Expected histogram value, got {0:?}")]
49    ExpectedHistogramValue(Value),
50    #[error("Expected untyped value, got {0:?}")]
51    ExpectedUntypedValue(Value),
52    #[error("Incomplete histogram data")]
53    IncompleteHistogramData,
54    #[error("Could not compute quantile")]
55    CouldNotComputeQuantile,
56    #[error("Bucket boundaries do not match: {0} vs {1}")]
57    BucketBoundariesDoNotMatch(f64, f64),
58    #[error("Reqwest error: {0}")]
59    Reqwest(#[from] reqwest::Error),
60    #[error("Io error: {0}")]
61    IoError(#[from] std::io::Error),
62    #[error("Previous histogram snapshot does not exist: {0}")]
63    PreviousHistogramSnapshotDoesNotExist(String),
64    #[error("No data available yet to calculate p99")]
65    NoDataYetForP99Calculation,
66    #[error("Unexpected empty bucket")]
67    UnexpectedEmptyBucket,
68    #[error("Failed to send unit message: {0}")]
69    TokioSendUnitError(#[from] mpsc::error::SendError<()>),
70    #[error("Config file not found: {0}")]
71    ConfigFileNotFound(std::path::PathBuf),
72    #[error("Failed to load config file: {0}")]
73    ConfigLoadError(#[from] anyhow::Error),
74    #[error("Could not find enough chains in wallet alone: needed {0}, but only found {1}")]
75    NotEnoughChainsInWallet(usize, usize),
76    #[error("Random number generator error: {0}")]
77    RandError(#[from] rand::Error),
78    #[error("Chain listener startup error")]
79    ChainListenerStartupError,
80}
81
82#[derive(Debug)]
83struct HistogramSnapshot {
84    buckets: Vec<HistogramCount>,
85    count: f64,
86    sum: f64,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
90#[serde(rename_all = "kebab-case")]
91pub struct BenchmarkConfig {
92    pub chain_ids: Vec<ChainId>,
93}
94
95impl BenchmarkConfig {
96    pub fn load_from_file<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
97        let content = std::fs::read_to_string(path)?;
98        let config = serde_yaml::from_str(&content)?;
99        Ok(config)
100    }
101
102    pub fn save_to_file<P: AsRef<Path>>(&self, path: P) -> anyhow::Result<()> {
103        let content = serde_yaml::to_string(self)?;
104        std::fs::write(path, content)?;
105        Ok(())
106    }
107}
108
109pub struct Benchmark<Env: Environment> {
110    _phantom: std::marker::PhantomData<Env>,
111}
112
113impl<Env: Environment> Benchmark<Env> {
114    #[expect(clippy::too_many_arguments)]
115    pub async fn run_benchmark<C: ClientContext<Environment = Env> + 'static>(
116        bps: usize,
117        chain_clients: Vec<ChainClient<Env>>,
118        all_chains: Vec<ChainId>,
119        transactions_per_block: usize,
120        fungible_application_id: Option<ApplicationId>,
121        health_check_endpoints: Option<String>,
122        runtime_in_seconds: Option<u64>,
123        delay_between_chains_ms: Option<u64>,
124        chain_listener: ChainListener<C>,
125        shutdown_notifier: &CancellationToken,
126        single_destination_per_block: bool,
127    ) -> Result<(), BenchmarkError> {
128        let num_chains = chain_clients.len();
129        let bps_counts = (0..num_chains)
130            .map(|_| Arc::new(AtomicUsize::new(0)))
131            .collect::<Vec<_>>();
132        let notifier = Arc::new(Notify::new());
133        let barrier = Arc::new(Barrier::new(num_chains + 1));
134
135        let chain_listener_future = chain_listener
136            .run()
137            .await
138            .map_err(|_| BenchmarkError::ChainListenerStartupError)?;
139        let chain_listener_handle = tokio::spawn(chain_listener_future.in_current_span());
140
141        let bps_control_task = Self::bps_control_task(
142            &barrier,
143            shutdown_notifier,
144            &bps_counts,
145            &notifier,
146            transactions_per_block,
147            bps,
148        );
149
150        let (runtime_control_task, runtime_control_sender) =
151            Self::runtime_control_task(shutdown_notifier, runtime_in_seconds, num_chains);
152
153        let bps_initial_share = bps / num_chains;
154        let mut bps_remainder = bps % num_chains;
155        let mut join_set = task::JoinSet::<Result<(), BenchmarkError>>::new();
156        for (chain_idx, chain_client) in chain_clients.into_iter().enumerate() {
157            let chain_id = chain_client.chain_id();
158            let shutdown_notifier_clone = shutdown_notifier.clone();
159            let barrier_clone = barrier.clone();
160            let bps_count_clone = bps_counts[chain_idx].clone();
161            let notifier_clone = notifier.clone();
162            let runtime_control_sender_clone = runtime_control_sender.clone();
163            let all_chains_clone = all_chains.clone();
164            let bps_share = if bps_remainder > 0 {
165                bps_remainder -= 1;
166                bps_initial_share + 1
167            } else {
168                bps_initial_share
169            };
170            join_set.spawn(
171                async move {
172                    Box::pin(Self::run_benchmark_internal(
173                        chain_idx,
174                        chain_id,
175                        bps_share,
176                        chain_client,
177                        all_chains_clone,
178                        transactions_per_block,
179                        fungible_application_id,
180                        shutdown_notifier_clone,
181                        bps_count_clone,
182                        barrier_clone,
183                        notifier_clone,
184                        runtime_control_sender_clone,
185                        delay_between_chains_ms,
186                        single_destination_per_block,
187                    ))
188                    .await?;
189
190                    Ok(())
191                }
192                .instrument(tracing::info_span!("chain_id", chain_id = ?chain_id)),
193            );
194        }
195
196        let metrics_watcher =
197            Self::metrics_watcher(health_check_endpoints, shutdown_notifier).await?;
198
199        // Wait for tasks and fail immediately if any task returns an error or panics
200        while let Some(result) = join_set.join_next().await {
201            let inner_result = result?;
202            if let Err(e) = inner_result {
203                error!("Benchmark task failed: {}", e);
204                shutdown_notifier.cancel();
205                join_set.abort_all();
206                return Err(e);
207            }
208        }
209        info!("All benchmark tasks completed successfully");
210
211        bps_control_task.await?;
212        if let Some(metrics_watcher) = metrics_watcher {
213            metrics_watcher.await??;
214        }
215        if let Some(runtime_control_task) = runtime_control_task {
216            runtime_control_task.await?;
217        }
218
219        if let Err(e) = chain_listener_handle.await? {
220            tracing::error!("chain listener error: {e}");
221        }
222
223        Ok(())
224    }
225
226    // The bps control task will control the BPS from the threads.
227    fn bps_control_task(
228        barrier: &Arc<Barrier>,
229        shutdown_notifier: &CancellationToken,
230        bps_counts: &[Arc<AtomicUsize>],
231        notifier: &Arc<Notify>,
232        transactions_per_block: usize,
233        bps: usize,
234    ) -> task::JoinHandle<()> {
235        let shutdown_notifier = shutdown_notifier.clone();
236        let bps_counts = bps_counts.to_vec();
237        let notifier = notifier.clone();
238        let barrier = barrier.clone();
239        task::spawn(
240            async move {
241                barrier.wait().await;
242                let mut one_second_interval = time::interval(time::Duration::from_secs(1));
243                loop {
244                    if shutdown_notifier.is_cancelled() {
245                        info!("Shutdown signal received in bps control task");
246                        break;
247                    }
248                    one_second_interval.tick().await;
249                    let current_bps_count: usize = bps_counts
250                        .iter()
251                        .map(|count| count.swap(0, Ordering::Relaxed))
252                        .sum();
253                    notifier.notify_waiters();
254                    let formatted_current_bps = current_bps_count.to_formatted_string(&Locale::en);
255                    let formatted_current_tps = (current_bps_count * transactions_per_block)
256                        .to_formatted_string(&Locale::en);
257                    let formatted_tps_goal =
258                        (bps * transactions_per_block).to_formatted_string(&Locale::en);
259                    let formatted_bps_goal = bps.to_formatted_string(&Locale::en);
260                    if current_bps_count >= bps {
261                        info!(
262                            "Achieved {} BPS/{} TPS",
263                            formatted_current_bps, formatted_current_tps
264                        );
265                    } else {
266                        warn!(
267                            "Failed to achieve {} BPS/{} TPS, only achieved {} BPS/{} TPS",
268                            formatted_bps_goal,
269                            formatted_tps_goal,
270                            formatted_current_bps,
271                            formatted_current_tps,
272                        );
273                    }
274                }
275
276                info!("Exiting bps control task");
277            }
278            .instrument(tracing::info_span!("bps_control")),
279        )
280    }
281
282    async fn metrics_watcher(
283        health_check_endpoints: Option<String>,
284        shutdown_notifier: &CancellationToken,
285    ) -> Result<Option<task::JoinHandle<Result<(), BenchmarkError>>>, BenchmarkError> {
286        if let Some(health_check_endpoints) = health_check_endpoints {
287            let metrics_addresses = health_check_endpoints
288                .split(',')
289                .map(|address| format!("http://{}/metrics", address.trim()))
290                .collect::<Vec<_>>();
291
292            let mut previous_histogram_snapshots: HashMap<String, HistogramSnapshot> =
293                HashMap::new();
294            let scrapes = Self::get_scrapes(&metrics_addresses).await?;
295            for (metrics_address, scrape) in scrapes {
296                previous_histogram_snapshots.insert(
297                    metrics_address,
298                    Self::parse_histogram(&scrape, LATENCY_METRIC_PREFIX)?,
299                );
300            }
301
302            let shutdown_notifier = shutdown_notifier.clone();
303            let metrics_watcher: task::JoinHandle<Result<(), BenchmarkError>> = tokio::spawn(
304                async move {
305                    let mut health_interval = time::interval(time::Duration::from_secs(5));
306                    let mut shutdown_interval = time::interval(time::Duration::from_secs(1));
307                    loop {
308                        tokio::select! {
309                            biased;
310                            _ = health_interval.tick() => {
311                                let result = Self::validators_healthy(&metrics_addresses, &mut previous_histogram_snapshots).await;
312                                if let Err(ref err) = result {
313                                    info!("Shutting down benchmark due to error: {}", err);
314                                    shutdown_notifier.cancel();
315                                    break;
316                                } else if !result? {
317                                    info!("Shutting down benchmark due to unhealthy validators");
318                                    shutdown_notifier.cancel();
319                                    break;
320                                }
321                            }
322                            _ = shutdown_interval.tick() => {
323                                if shutdown_notifier.is_cancelled() {
324                                    info!("Shutdown signal received, stopping metrics watcher");
325                                    break;
326                                }
327                            }
328                        }
329                    }
330
331                    Ok(())
332                }
333                .instrument(tracing::info_span!("metrics_watcher")),
334            );
335
336            Ok(Some(metrics_watcher))
337        } else {
338            Ok(None)
339        }
340    }
341
342    fn runtime_control_task(
343        shutdown_notifier: &CancellationToken,
344        runtime_in_seconds: Option<u64>,
345        num_chain_groups: usize,
346    ) -> (Option<task::JoinHandle<()>>, Option<mpsc::Sender<()>>) {
347        if let Some(runtime_in_seconds) = runtime_in_seconds {
348            let (runtime_control_sender, mut runtime_control_receiver) =
349                mpsc::channel(num_chain_groups);
350            let shutdown_notifier = shutdown_notifier.clone();
351            let runtime_control_task = task::spawn(
352                async move {
353                    let mut chains_started = 0;
354                    while runtime_control_receiver.recv().await.is_some() {
355                        chains_started += 1;
356                        if chains_started == num_chain_groups {
357                            break;
358                        }
359                    }
360                    time::sleep(time::Duration::from_secs(runtime_in_seconds)).await;
361                    shutdown_notifier.cancel();
362                }
363                .instrument(tracing::info_span!("runtime_control")),
364            );
365            (Some(runtime_control_task), Some(runtime_control_sender))
366        } else {
367            (None, None)
368        }
369    }
370
371    async fn validators_healthy(
372        metrics_addresses: &[String],
373        previous_histogram_snapshots: &mut HashMap<String, HistogramSnapshot>,
374    ) -> Result<bool, BenchmarkError> {
375        let scrapes = Self::get_scrapes(metrics_addresses).await?;
376        for (metrics_address, scrape) in scrapes {
377            let histogram = Self::parse_histogram(&scrape, LATENCY_METRIC_PREFIX)?;
378            let diff = Self::diff_histograms(
379                previous_histogram_snapshots.get(&metrics_address).ok_or(
380                    BenchmarkError::PreviousHistogramSnapshotDoesNotExist(metrics_address.clone()),
381                )?,
382                &histogram,
383            )?;
384            let p99 = match Self::compute_quantile(&diff.buckets, diff.count, 0.99) {
385                Ok(p99) => p99,
386                Err(BenchmarkError::NoDataYetForP99Calculation) => {
387                    info!(
388                        "No data available yet to calculate p99 for {}",
389                        metrics_address
390                    );
391                    continue;
392                }
393                Err(e) => {
394                    error!("Error computing p99 for {}: {}", metrics_address, e);
395                    return Err(e);
396                }
397            };
398
399            let last_bucket_boundary = diff.buckets[diff.buckets.len() - 2].less_than;
400            if p99 == f64::INFINITY {
401                info!(
402                    "{} -> Estimated p99 for {} is higher than the last bucket boundary of {:?} ms",
403                    metrics_address, LATENCY_METRIC_PREFIX, last_bucket_boundary
404                );
405            } else {
406                info!(
407                    "{} -> Estimated p99 for {}: {:.2} ms",
408                    metrics_address, LATENCY_METRIC_PREFIX, p99
409                );
410            }
411            if p99 > PROXY_LATENCY_P99_THRESHOLD {
412                if p99 == f64::INFINITY {
413                    error!(
414                        "Proxy of validator {} unhealthy! Latency p99 is too high, it is higher than \
415                        the last bucket boundary of {:.2} ms",
416                        metrics_address, last_bucket_boundary
417                    );
418                } else {
419                    error!(
420                        "Proxy of validator {} unhealthy! Latency p99 is too high: {:.2} ms",
421                        metrics_address, p99
422                    );
423                }
424                return Ok(false);
425            }
426            previous_histogram_snapshots.insert(metrics_address.clone(), histogram);
427        }
428
429        Ok(true)
430    }
431
432    fn diff_histograms(
433        previous: &HistogramSnapshot,
434        current: &HistogramSnapshot,
435    ) -> Result<HistogramSnapshot, BenchmarkError> {
436        if current.count < previous.count {
437            return Err(BenchmarkError::HistogramCountMismatch);
438        }
439        let total_diff = current.count - previous.count;
440        let mut buckets_diff: Vec<HistogramCount> = Vec::new();
441        for (before, after) in previous.buckets.iter().zip(current.buckets.iter()) {
442            let bound_before = before.less_than;
443            let bound_after = after.less_than;
444            let cumulative_before = before.count;
445            let cumulative_after = after.count;
446            if (bound_before - bound_after).abs() > f64::EPSILON {
447                return Err(BenchmarkError::BucketBoundariesDoNotMatch(
448                    bound_before,
449                    bound_after,
450                ));
451            }
452            let diff = (cumulative_after - cumulative_before).max(0.0);
453            buckets_diff.push(HistogramCount {
454                less_than: bound_after,
455                count: diff,
456            });
457        }
458        Ok(HistogramSnapshot {
459            buckets: buckets_diff,
460            count: total_diff,
461            sum: current.sum - previous.sum,
462        })
463    }
464
465    async fn get_scrapes(
466        metrics_addresses: &[String],
467    ) -> Result<Vec<(String, Scrape)>, BenchmarkError> {
468        let mut scrapes = Vec::new();
469        for metrics_address in metrics_addresses {
470            let response = reqwest::get(metrics_address)
471                .await
472                .map_err(BenchmarkError::Reqwest)?;
473            let metrics = response.text().await.map_err(BenchmarkError::Reqwest)?;
474            let scrape = Scrape::parse(metrics.lines().map(|line| Ok(line.to_owned())))
475                .map_err(BenchmarkError::IoError)?;
476            scrapes.push((metrics_address.clone(), scrape));
477        }
478        Ok(scrapes)
479    }
480
481    fn parse_histogram(
482        scrape: &Scrape,
483        metric_prefix: &str,
484    ) -> Result<HistogramSnapshot, BenchmarkError> {
485        let mut buckets: Vec<HistogramCount> = Vec::new();
486        let mut total_count: Option<f64> = None;
487        let mut total_sum: Option<f64> = None;
488
489        // Iterate over each metric in the scrape.
490        for sample in &scrape.samples {
491            if sample.metric == metric_prefix {
492                if let Value::Histogram(histogram) = &sample.value {
493                    buckets.extend(histogram.iter().cloned());
494                } else {
495                    return Err(BenchmarkError::ExpectedHistogramValue(sample.value.clone()));
496                }
497            } else if sample.metric == format!("{}_count", metric_prefix) {
498                if let Value::Untyped(count) = sample.value {
499                    total_count = Some(count);
500                } else {
501                    return Err(BenchmarkError::ExpectedUntypedValue(sample.value.clone()));
502                }
503            } else if sample.metric == format!("{}_sum", metric_prefix) {
504                if let Value::Untyped(sum) = sample.value {
505                    total_sum = Some(sum);
506                } else {
507                    return Err(BenchmarkError::ExpectedUntypedValue(sample.value.clone()));
508                }
509            }
510        }
511
512        match (total_count, total_sum) {
513            (Some(count), Some(sum)) if !buckets.is_empty() => {
514                buckets.sort_by(|a, b| {
515                    a.less_than
516                        .partial_cmp(&b.less_than)
517                        .expect("Comparison should not fail")
518                });
519                Ok(HistogramSnapshot {
520                    buckets,
521                    count,
522                    sum,
523                })
524            }
525            _ => Err(BenchmarkError::IncompleteHistogramData),
526        }
527    }
528
529    fn compute_quantile(
530        buckets: &[HistogramCount],
531        total_count: f64,
532        quantile: f64,
533    ) -> Result<f64, BenchmarkError> {
534        if total_count == 0.0 {
535            // Had no samples in the last 5s.
536            return Err(BenchmarkError::NoDataYetForP99Calculation);
537        }
538        // Compute the target cumulative count.
539        let target = (quantile * total_count).ceil();
540        let mut prev_cumulative = 0.0;
541        let mut prev_bound = 0.0;
542        for bucket in buckets {
543            if bucket.count >= target {
544                let bucket_count = bucket.count - prev_cumulative;
545                if bucket_count == 0.0 {
546                    // Bucket that is supposed to contain the target quantile is empty, unexpectedly.
547                    return Err(BenchmarkError::UnexpectedEmptyBucket);
548                }
549                let fraction = (target - prev_cumulative) / bucket_count;
550                return Ok(prev_bound + (bucket.less_than - prev_bound) * fraction);
551            }
552            prev_cumulative = bucket.count;
553            prev_bound = bucket.less_than;
554        }
555        Err(BenchmarkError::CouldNotComputeQuantile)
556    }
557
558    #[expect(clippy::too_many_arguments)]
559    async fn run_benchmark_internal(
560        chain_idx: usize,
561        chain_id: ChainId,
562        bps: usize,
563        chain_client: ChainClient<Env>,
564        all_chains: Vec<ChainId>,
565        transactions_per_block: usize,
566        fungible_application_id: Option<ApplicationId>,
567        shutdown_notifier: CancellationToken,
568        bps_count: Arc<AtomicUsize>,
569        barrier: Arc<Barrier>,
570        notifier: Arc<Notify>,
571        runtime_control_sender: Option<mpsc::Sender<()>>,
572        delay_between_chains_ms: Option<u64>,
573        single_destination_per_block: bool,
574    ) -> Result<(), BenchmarkError> {
575        barrier.wait().await;
576        if let Some(delay_between_chains_ms) = delay_between_chains_ms {
577            time::sleep(time::Duration::from_millis(
578                (chain_idx as u64) * delay_between_chains_ms,
579            ))
580            .await;
581        }
582        info!("Starting benchmark for chain {:?}", chain_id);
583
584        if let Some(runtime_control_sender) = runtime_control_sender {
585            runtime_control_sender.send(()).await?;
586        }
587
588        let owner = chain_client
589            .identity()
590            .await
591            .map_err(BenchmarkError::ChainClient)?;
592        let mut destination_manager = ChainDestinationManager::new(chain_id, all_chains)?;
593
594        loop {
595            tokio::select! {
596                biased;
597
598                _ = shutdown_notifier.cancelled() => {
599                    info!("Shutdown signal received, stopping benchmark");
600                    break;
601                }
602                result = chain_client.execute_operations(
603                    Self::generate_operations(
604                        owner,
605                        transactions_per_block,
606                        fungible_application_id,
607                        &mut destination_manager,
608                        single_destination_per_block,
609                    ),
610                    vec![]
611                ) => {
612                    result
613                        .map_err(BenchmarkError::ChainClient)?
614                        .expect("should execute block with operations");
615
616                    let current_bps_count = bps_count.fetch_add(1, Ordering::Relaxed) + 1;
617                    if current_bps_count >= bps {
618                        notifier.notified().await;
619                    }
620                }
621            }
622        }
623
624        info!("Exiting task...");
625        Ok(())
626    }
627
628    fn create_operation(
629        fungible_application_id: Option<ApplicationId>,
630        recipient_chain_id: ChainId,
631        owner: AccountOwner,
632        amount: Amount,
633    ) -> Operation {
634        match fungible_application_id {
635            Some(application_id) => {
636                Self::fungible_transfer(application_id, recipient_chain_id, owner, owner, amount)
637            }
638            None => Operation::system(SystemOperation::Transfer {
639                owner: AccountOwner::CHAIN,
640                recipient: Account::chain(recipient_chain_id),
641                amount,
642            }),
643        }
644    }
645
646    /// Generate operations for a single block, randomizing destinations after each full cycle
647    fn generate_operations(
648        owner: AccountOwner,
649        transactions_per_block: usize,
650        fungible_application_id: Option<ApplicationId>,
651        destination_manager: &mut ChainDestinationManager,
652        single_destination_per_block: bool,
653    ) -> Vec<Operation> {
654        let amount = Amount::from_attos(1);
655
656        if single_destination_per_block {
657            let recipient_chain_id = destination_manager.get_next_destination();
658
659            (0..transactions_per_block)
660                .map(|_| {
661                    Self::create_operation(
662                        fungible_application_id,
663                        recipient_chain_id,
664                        owner,
665                        amount,
666                    )
667                })
668                .collect()
669        } else {
670            let mut operations = Vec::with_capacity(transactions_per_block);
671            for _ in 0..transactions_per_block {
672                let recipient_chain_id = destination_manager.get_next_destination();
673
674                operations.push(Self::create_operation(
675                    fungible_application_id,
676                    recipient_chain_id,
677                    owner,
678                    amount,
679                ));
680            }
681            operations
682        }
683    }
684
685    /// Closes the chain that was created for the benchmark.
686    pub async fn close_benchmark_chain(
687        chain_client: &ChainClient<Env>,
688    ) -> Result<(), BenchmarkError> {
689        let start = Instant::now();
690        chain_client
691            .execute_operation(Operation::system(SystemOperation::CloseChain))
692            .await?
693            .expect("Close chain operation should not fail!");
694
695        debug!(
696            "Closed chain {:?} in {} ms",
697            chain_client.chain_id(),
698            start.elapsed().as_millis()
699        );
700
701        Ok(())
702    }
703
704    pub fn get_all_chains(
705        chains_config_path: Option<&Path>,
706        benchmark_chains: &[(ChainId, AccountOwner)],
707    ) -> Result<Vec<ChainId>, BenchmarkError> {
708        let all_chains = if let Some(config_path) = chains_config_path {
709            if !config_path.exists() {
710                return Err(BenchmarkError::ConfigFileNotFound(
711                    config_path.to_path_buf(),
712                ));
713            }
714            let config = BenchmarkConfig::load_from_file(config_path)
715                .map_err(BenchmarkError::ConfigLoadError)?;
716            config.chain_ids
717        } else {
718            benchmark_chains.iter().map(|(id, _)| *id).collect()
719        };
720
721        Ok(all_chains)
722    }
723
724    /// Creates a fungible token transfer operation.
725    pub fn fungible_transfer(
726        application_id: ApplicationId,
727        chain_id: ChainId,
728        sender: AccountOwner,
729        receiver: AccountOwner,
730        amount: Amount,
731    ) -> Operation {
732        let target_account = Account {
733            chain_id,
734            owner: receiver,
735        };
736        let bytes = bcs::to_bytes(&FungibleOperation::Transfer {
737            owner: sender,
738            amount,
739            target_account,
740        })
741        .expect("should serialize fungible token operation");
742        Operation::User {
743            application_id,
744            bytes,
745        }
746    }
747}
748
749struct ChainDestinationManager {
750    source_chain_id: ChainId,
751    destination_index: usize,
752    destination_chains: Vec<ChainId>,
753    rng: SmallRng,
754}
755
756impl ChainDestinationManager {
757    fn new(
758        source_chain_id: ChainId,
759        mut destination_chains: Vec<ChainId>,
760    ) -> Result<Self, BenchmarkError> {
761        let mut rng = SmallRng::from_rng(thread_rng())?;
762        destination_chains.shuffle(&mut rng);
763
764        Ok(Self {
765            source_chain_id,
766            destination_index: 0,
767            destination_chains,
768            rng,
769        })
770    }
771
772    fn get_next_destination(&mut self) -> ChainId {
773        // Check if we've gone through all destinations
774        if self.destination_index >= self.destination_chains.len() {
775            // Reshuffle the destinations for the next cycle
776            self.destination_chains.shuffle(&mut self.rng);
777            self.destination_index = 0;
778        }
779
780        let destination_chain_id = self.destination_chains[self.destination_index];
781        self.destination_index += 1;
782
783        if destination_chain_id == self.source_chain_id {
784            self.get_next_destination()
785        } else {
786            destination_chain_id
787        }
788    }
789}