1use std::{
5 collections::HashMap,
6 path::Path,
7 sync::{
8 atomic::{AtomicUsize, Ordering},
9 Arc,
10 },
11};
12
13use linera_base::{
14 data_types::Amount,
15 identifiers::{Account, AccountOwner, ApplicationId, ChainId},
16 time::Instant,
17};
18use linera_core::{
19 client::{ChainClient, ChainClientError},
20 Environment,
21};
22use linera_execution::{system::SystemOperation, Operation};
23use linera_sdk::abis::fungible::FungibleOperation;
24use num_format::{Locale, ToFormattedString};
25use prometheus_parse::{HistogramCount, Scrape, Value};
26use rand::{rngs::SmallRng, seq::SliceRandom, thread_rng, SeedableRng};
27use serde::{Deserialize, Serialize};
28use tokio::{
29 sync::{mpsc, Barrier, Notify},
30 task, time,
31};
32use tokio_util::sync::CancellationToken;
33use tracing::{debug, error, info, warn, Instrument as _};
34
35use crate::chain_listener::{ChainListener, ClientContext};
36
37const PROXY_LATENCY_P99_THRESHOLD: f64 = 400.0;
38const LATENCY_METRIC_PREFIX: &str = "linera_proxy_request_latency";
39
40#[derive(Debug, thiserror::Error)]
41pub enum BenchmarkError {
42 #[error("Failed to join task: {0}")]
43 JoinError(#[from] task::JoinError),
44 #[error("Chain client error: {0}")]
45 ChainClient(#[from] ChainClientError),
46 #[error("Current histogram count is less than previous histogram count")]
47 HistogramCountMismatch,
48 #[error("Expected histogram value, got {0:?}")]
49 ExpectedHistogramValue(Value),
50 #[error("Expected untyped value, got {0:?}")]
51 ExpectedUntypedValue(Value),
52 #[error("Incomplete histogram data")]
53 IncompleteHistogramData,
54 #[error("Could not compute quantile")]
55 CouldNotComputeQuantile,
56 #[error("Bucket boundaries do not match: {0} vs {1}")]
57 BucketBoundariesDoNotMatch(f64, f64),
58 #[error("Reqwest error: {0}")]
59 Reqwest(#[from] reqwest::Error),
60 #[error("Io error: {0}")]
61 IoError(#[from] std::io::Error),
62 #[error("Previous histogram snapshot does not exist: {0}")]
63 PreviousHistogramSnapshotDoesNotExist(String),
64 #[error("No data available yet to calculate p99")]
65 NoDataYetForP99Calculation,
66 #[error("Unexpected empty bucket")]
67 UnexpectedEmptyBucket,
68 #[error("Failed to send unit message: {0}")]
69 TokioSendUnitError(#[from] mpsc::error::SendError<()>),
70 #[error("Config file not found: {0}")]
71 ConfigFileNotFound(std::path::PathBuf),
72 #[error("Failed to load config file: {0}")]
73 ConfigLoadError(#[from] anyhow::Error),
74 #[error("Could not find enough chains in wallet alone: needed {0}, but only found {1}")]
75 NotEnoughChainsInWallet(usize, usize),
76 #[error("Random number generator error: {0}")]
77 RandError(#[from] rand::Error),
78 #[error("Chain listener startup error")]
79 ChainListenerStartupError,
80}
81
82#[derive(Debug)]
83struct HistogramSnapshot {
84 buckets: Vec<HistogramCount>,
85 count: f64,
86 sum: f64,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
90#[serde(rename_all = "kebab-case")]
91pub struct BenchmarkConfig {
92 pub chain_ids: Vec<ChainId>,
93}
94
95impl BenchmarkConfig {
96 pub fn load_from_file<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
97 let content = std::fs::read_to_string(path)?;
98 let config = serde_yaml::from_str(&content)?;
99 Ok(config)
100 }
101
102 pub fn save_to_file<P: AsRef<Path>>(&self, path: P) -> anyhow::Result<()> {
103 let content = serde_yaml::to_string(self)?;
104 std::fs::write(path, content)?;
105 Ok(())
106 }
107}
108
109pub struct Benchmark<Env: Environment> {
110 _phantom: std::marker::PhantomData<Env>,
111}
112
113impl<Env: Environment> Benchmark<Env> {
114 #[expect(clippy::too_many_arguments)]
115 pub async fn run_benchmark<C: ClientContext<Environment = Env> + 'static>(
116 bps: usize,
117 chain_clients: Vec<ChainClient<Env>>,
118 all_chains: Vec<ChainId>,
119 transactions_per_block: usize,
120 fungible_application_id: Option<ApplicationId>,
121 health_check_endpoints: Option<String>,
122 runtime_in_seconds: Option<u64>,
123 delay_between_chains_ms: Option<u64>,
124 chain_listener: ChainListener<C>,
125 shutdown_notifier: &CancellationToken,
126 single_destination_per_block: bool,
127 ) -> Result<(), BenchmarkError> {
128 let num_chains = chain_clients.len();
129 let bps_counts = (0..num_chains)
130 .map(|_| Arc::new(AtomicUsize::new(0)))
131 .collect::<Vec<_>>();
132 let notifier = Arc::new(Notify::new());
133 let barrier = Arc::new(Barrier::new(num_chains + 1));
134
135 let chain_listener_future = chain_listener
136 .run()
137 .await
138 .map_err(|_| BenchmarkError::ChainListenerStartupError)?;
139 let chain_listener_handle = tokio::spawn(chain_listener_future.in_current_span());
140
141 let bps_control_task = Self::bps_control_task(
142 &barrier,
143 shutdown_notifier,
144 &bps_counts,
145 ¬ifier,
146 transactions_per_block,
147 bps,
148 );
149
150 let (runtime_control_task, runtime_control_sender) =
151 Self::runtime_control_task(shutdown_notifier, runtime_in_seconds, num_chains);
152
153 let bps_initial_share = bps / num_chains;
154 let mut bps_remainder = bps % num_chains;
155 let mut join_set = task::JoinSet::<Result<(), BenchmarkError>>::new();
156 for (chain_idx, chain_client) in chain_clients.into_iter().enumerate() {
157 let chain_id = chain_client.chain_id();
158 let shutdown_notifier_clone = shutdown_notifier.clone();
159 let barrier_clone = barrier.clone();
160 let bps_count_clone = bps_counts[chain_idx].clone();
161 let notifier_clone = notifier.clone();
162 let runtime_control_sender_clone = runtime_control_sender.clone();
163 let all_chains_clone = all_chains.clone();
164 let bps_share = if bps_remainder > 0 {
165 bps_remainder -= 1;
166 bps_initial_share + 1
167 } else {
168 bps_initial_share
169 };
170 join_set.spawn(
171 async move {
172 Box::pin(Self::run_benchmark_internal(
173 chain_idx,
174 chain_id,
175 bps_share,
176 chain_client,
177 all_chains_clone,
178 transactions_per_block,
179 fungible_application_id,
180 shutdown_notifier_clone,
181 bps_count_clone,
182 barrier_clone,
183 notifier_clone,
184 runtime_control_sender_clone,
185 delay_between_chains_ms,
186 single_destination_per_block,
187 ))
188 .await?;
189
190 Ok(())
191 }
192 .instrument(tracing::info_span!("chain_id", chain_id = ?chain_id)),
193 );
194 }
195
196 let metrics_watcher =
197 Self::metrics_watcher(health_check_endpoints, shutdown_notifier).await?;
198
199 while let Some(result) = join_set.join_next().await {
201 let inner_result = result?;
202 if let Err(e) = inner_result {
203 error!("Benchmark task failed: {}", e);
204 shutdown_notifier.cancel();
205 join_set.abort_all();
206 return Err(e);
207 }
208 }
209 info!("All benchmark tasks completed successfully");
210
211 bps_control_task.await?;
212 if let Some(metrics_watcher) = metrics_watcher {
213 metrics_watcher.await??;
214 }
215 if let Some(runtime_control_task) = runtime_control_task {
216 runtime_control_task.await?;
217 }
218
219 if let Err(e) = chain_listener_handle.await? {
220 tracing::error!("chain listener error: {e}");
221 }
222
223 Ok(())
224 }
225
226 fn bps_control_task(
228 barrier: &Arc<Barrier>,
229 shutdown_notifier: &CancellationToken,
230 bps_counts: &[Arc<AtomicUsize>],
231 notifier: &Arc<Notify>,
232 transactions_per_block: usize,
233 bps: usize,
234 ) -> task::JoinHandle<()> {
235 let shutdown_notifier = shutdown_notifier.clone();
236 let bps_counts = bps_counts.to_vec();
237 let notifier = notifier.clone();
238 let barrier = barrier.clone();
239 task::spawn(
240 async move {
241 barrier.wait().await;
242 let mut one_second_interval = time::interval(time::Duration::from_secs(1));
243 loop {
244 if shutdown_notifier.is_cancelled() {
245 info!("Shutdown signal received in bps control task");
246 break;
247 }
248 one_second_interval.tick().await;
249 let current_bps_count: usize = bps_counts
250 .iter()
251 .map(|count| count.swap(0, Ordering::Relaxed))
252 .sum();
253 notifier.notify_waiters();
254 let formatted_current_bps = current_bps_count.to_formatted_string(&Locale::en);
255 let formatted_current_tps = (current_bps_count * transactions_per_block)
256 .to_formatted_string(&Locale::en);
257 let formatted_tps_goal =
258 (bps * transactions_per_block).to_formatted_string(&Locale::en);
259 let formatted_bps_goal = bps.to_formatted_string(&Locale::en);
260 if current_bps_count >= bps {
261 info!(
262 "Achieved {} BPS/{} TPS",
263 formatted_current_bps, formatted_current_tps
264 );
265 } else {
266 warn!(
267 "Failed to achieve {} BPS/{} TPS, only achieved {} BPS/{} TPS",
268 formatted_bps_goal,
269 formatted_tps_goal,
270 formatted_current_bps,
271 formatted_current_tps,
272 );
273 }
274 }
275
276 info!("Exiting bps control task");
277 }
278 .instrument(tracing::info_span!("bps_control")),
279 )
280 }
281
282 async fn metrics_watcher(
283 health_check_endpoints: Option<String>,
284 shutdown_notifier: &CancellationToken,
285 ) -> Result<Option<task::JoinHandle<Result<(), BenchmarkError>>>, BenchmarkError> {
286 if let Some(health_check_endpoints) = health_check_endpoints {
287 let metrics_addresses = health_check_endpoints
288 .split(',')
289 .map(|address| format!("http://{}/metrics", address.trim()))
290 .collect::<Vec<_>>();
291
292 let mut previous_histogram_snapshots: HashMap<String, HistogramSnapshot> =
293 HashMap::new();
294 let scrapes = Self::get_scrapes(&metrics_addresses).await?;
295 for (metrics_address, scrape) in scrapes {
296 previous_histogram_snapshots.insert(
297 metrics_address,
298 Self::parse_histogram(&scrape, LATENCY_METRIC_PREFIX)?,
299 );
300 }
301
302 let shutdown_notifier = shutdown_notifier.clone();
303 let metrics_watcher: task::JoinHandle<Result<(), BenchmarkError>> = tokio::spawn(
304 async move {
305 let mut health_interval = time::interval(time::Duration::from_secs(5));
306 let mut shutdown_interval = time::interval(time::Duration::from_secs(1));
307 loop {
308 tokio::select! {
309 biased;
310 _ = health_interval.tick() => {
311 let result = Self::validators_healthy(&metrics_addresses, &mut previous_histogram_snapshots).await;
312 if let Err(ref err) = result {
313 info!("Shutting down benchmark due to error: {}", err);
314 shutdown_notifier.cancel();
315 break;
316 } else if !result? {
317 info!("Shutting down benchmark due to unhealthy validators");
318 shutdown_notifier.cancel();
319 break;
320 }
321 }
322 _ = shutdown_interval.tick() => {
323 if shutdown_notifier.is_cancelled() {
324 info!("Shutdown signal received, stopping metrics watcher");
325 break;
326 }
327 }
328 }
329 }
330
331 Ok(())
332 }
333 .instrument(tracing::info_span!("metrics_watcher")),
334 );
335
336 Ok(Some(metrics_watcher))
337 } else {
338 Ok(None)
339 }
340 }
341
342 fn runtime_control_task(
343 shutdown_notifier: &CancellationToken,
344 runtime_in_seconds: Option<u64>,
345 num_chain_groups: usize,
346 ) -> (Option<task::JoinHandle<()>>, Option<mpsc::Sender<()>>) {
347 if let Some(runtime_in_seconds) = runtime_in_seconds {
348 let (runtime_control_sender, mut runtime_control_receiver) =
349 mpsc::channel(num_chain_groups);
350 let shutdown_notifier = shutdown_notifier.clone();
351 let runtime_control_task = task::spawn(
352 async move {
353 let mut chains_started = 0;
354 while runtime_control_receiver.recv().await.is_some() {
355 chains_started += 1;
356 if chains_started == num_chain_groups {
357 break;
358 }
359 }
360 time::sleep(time::Duration::from_secs(runtime_in_seconds)).await;
361 shutdown_notifier.cancel();
362 }
363 .instrument(tracing::info_span!("runtime_control")),
364 );
365 (Some(runtime_control_task), Some(runtime_control_sender))
366 } else {
367 (None, None)
368 }
369 }
370
371 async fn validators_healthy(
372 metrics_addresses: &[String],
373 previous_histogram_snapshots: &mut HashMap<String, HistogramSnapshot>,
374 ) -> Result<bool, BenchmarkError> {
375 let scrapes = Self::get_scrapes(metrics_addresses).await?;
376 for (metrics_address, scrape) in scrapes {
377 let histogram = Self::parse_histogram(&scrape, LATENCY_METRIC_PREFIX)?;
378 let diff = Self::diff_histograms(
379 previous_histogram_snapshots.get(&metrics_address).ok_or(
380 BenchmarkError::PreviousHistogramSnapshotDoesNotExist(metrics_address.clone()),
381 )?,
382 &histogram,
383 )?;
384 let p99 = match Self::compute_quantile(&diff.buckets, diff.count, 0.99) {
385 Ok(p99) => p99,
386 Err(BenchmarkError::NoDataYetForP99Calculation) => {
387 info!(
388 "No data available yet to calculate p99 for {}",
389 metrics_address
390 );
391 continue;
392 }
393 Err(e) => {
394 error!("Error computing p99 for {}: {}", metrics_address, e);
395 return Err(e);
396 }
397 };
398
399 let last_bucket_boundary = diff.buckets[diff.buckets.len() - 2].less_than;
400 if p99 == f64::INFINITY {
401 info!(
402 "{} -> Estimated p99 for {} is higher than the last bucket boundary of {:?} ms",
403 metrics_address, LATENCY_METRIC_PREFIX, last_bucket_boundary
404 );
405 } else {
406 info!(
407 "{} -> Estimated p99 for {}: {:.2} ms",
408 metrics_address, LATENCY_METRIC_PREFIX, p99
409 );
410 }
411 if p99 > PROXY_LATENCY_P99_THRESHOLD {
412 if p99 == f64::INFINITY {
413 error!(
414 "Proxy of validator {} unhealthy! Latency p99 is too high, it is higher than \
415 the last bucket boundary of {:.2} ms",
416 metrics_address, last_bucket_boundary
417 );
418 } else {
419 error!(
420 "Proxy of validator {} unhealthy! Latency p99 is too high: {:.2} ms",
421 metrics_address, p99
422 );
423 }
424 return Ok(false);
425 }
426 previous_histogram_snapshots.insert(metrics_address.clone(), histogram);
427 }
428
429 Ok(true)
430 }
431
432 fn diff_histograms(
433 previous: &HistogramSnapshot,
434 current: &HistogramSnapshot,
435 ) -> Result<HistogramSnapshot, BenchmarkError> {
436 if current.count < previous.count {
437 return Err(BenchmarkError::HistogramCountMismatch);
438 }
439 let total_diff = current.count - previous.count;
440 let mut buckets_diff: Vec<HistogramCount> = Vec::new();
441 for (before, after) in previous.buckets.iter().zip(current.buckets.iter()) {
442 let bound_before = before.less_than;
443 let bound_after = after.less_than;
444 let cumulative_before = before.count;
445 let cumulative_after = after.count;
446 if (bound_before - bound_after).abs() > f64::EPSILON {
447 return Err(BenchmarkError::BucketBoundariesDoNotMatch(
448 bound_before,
449 bound_after,
450 ));
451 }
452 let diff = (cumulative_after - cumulative_before).max(0.0);
453 buckets_diff.push(HistogramCount {
454 less_than: bound_after,
455 count: diff,
456 });
457 }
458 Ok(HistogramSnapshot {
459 buckets: buckets_diff,
460 count: total_diff,
461 sum: current.sum - previous.sum,
462 })
463 }
464
465 async fn get_scrapes(
466 metrics_addresses: &[String],
467 ) -> Result<Vec<(String, Scrape)>, BenchmarkError> {
468 let mut scrapes = Vec::new();
469 for metrics_address in metrics_addresses {
470 let response = reqwest::get(metrics_address)
471 .await
472 .map_err(BenchmarkError::Reqwest)?;
473 let metrics = response.text().await.map_err(BenchmarkError::Reqwest)?;
474 let scrape = Scrape::parse(metrics.lines().map(|line| Ok(line.to_owned())))
475 .map_err(BenchmarkError::IoError)?;
476 scrapes.push((metrics_address.clone(), scrape));
477 }
478 Ok(scrapes)
479 }
480
481 fn parse_histogram(
482 scrape: &Scrape,
483 metric_prefix: &str,
484 ) -> Result<HistogramSnapshot, BenchmarkError> {
485 let mut buckets: Vec<HistogramCount> = Vec::new();
486 let mut total_count: Option<f64> = None;
487 let mut total_sum: Option<f64> = None;
488
489 for sample in &scrape.samples {
491 if sample.metric == metric_prefix {
492 if let Value::Histogram(histogram) = &sample.value {
493 buckets.extend(histogram.iter().cloned());
494 } else {
495 return Err(BenchmarkError::ExpectedHistogramValue(sample.value.clone()));
496 }
497 } else if sample.metric == format!("{}_count", metric_prefix) {
498 if let Value::Untyped(count) = sample.value {
499 total_count = Some(count);
500 } else {
501 return Err(BenchmarkError::ExpectedUntypedValue(sample.value.clone()));
502 }
503 } else if sample.metric == format!("{}_sum", metric_prefix) {
504 if let Value::Untyped(sum) = sample.value {
505 total_sum = Some(sum);
506 } else {
507 return Err(BenchmarkError::ExpectedUntypedValue(sample.value.clone()));
508 }
509 }
510 }
511
512 match (total_count, total_sum) {
513 (Some(count), Some(sum)) if !buckets.is_empty() => {
514 buckets.sort_by(|a, b| {
515 a.less_than
516 .partial_cmp(&b.less_than)
517 .expect("Comparison should not fail")
518 });
519 Ok(HistogramSnapshot {
520 buckets,
521 count,
522 sum,
523 })
524 }
525 _ => Err(BenchmarkError::IncompleteHistogramData),
526 }
527 }
528
529 fn compute_quantile(
530 buckets: &[HistogramCount],
531 total_count: f64,
532 quantile: f64,
533 ) -> Result<f64, BenchmarkError> {
534 if total_count == 0.0 {
535 return Err(BenchmarkError::NoDataYetForP99Calculation);
537 }
538 let target = (quantile * total_count).ceil();
540 let mut prev_cumulative = 0.0;
541 let mut prev_bound = 0.0;
542 for bucket in buckets {
543 if bucket.count >= target {
544 let bucket_count = bucket.count - prev_cumulative;
545 if bucket_count == 0.0 {
546 return Err(BenchmarkError::UnexpectedEmptyBucket);
548 }
549 let fraction = (target - prev_cumulative) / bucket_count;
550 return Ok(prev_bound + (bucket.less_than - prev_bound) * fraction);
551 }
552 prev_cumulative = bucket.count;
553 prev_bound = bucket.less_than;
554 }
555 Err(BenchmarkError::CouldNotComputeQuantile)
556 }
557
558 #[expect(clippy::too_many_arguments)]
559 async fn run_benchmark_internal(
560 chain_idx: usize,
561 chain_id: ChainId,
562 bps: usize,
563 chain_client: ChainClient<Env>,
564 all_chains: Vec<ChainId>,
565 transactions_per_block: usize,
566 fungible_application_id: Option<ApplicationId>,
567 shutdown_notifier: CancellationToken,
568 bps_count: Arc<AtomicUsize>,
569 barrier: Arc<Barrier>,
570 notifier: Arc<Notify>,
571 runtime_control_sender: Option<mpsc::Sender<()>>,
572 delay_between_chains_ms: Option<u64>,
573 single_destination_per_block: bool,
574 ) -> Result<(), BenchmarkError> {
575 barrier.wait().await;
576 if let Some(delay_between_chains_ms) = delay_between_chains_ms {
577 time::sleep(time::Duration::from_millis(
578 (chain_idx as u64) * delay_between_chains_ms,
579 ))
580 .await;
581 }
582 info!("Starting benchmark for chain {:?}", chain_id);
583
584 if let Some(runtime_control_sender) = runtime_control_sender {
585 runtime_control_sender.send(()).await?;
586 }
587
588 let owner = chain_client
589 .identity()
590 .await
591 .map_err(BenchmarkError::ChainClient)?;
592 let mut destination_manager = ChainDestinationManager::new(chain_id, all_chains)?;
593
594 loop {
595 tokio::select! {
596 biased;
597
598 _ = shutdown_notifier.cancelled() => {
599 info!("Shutdown signal received, stopping benchmark");
600 break;
601 }
602 result = chain_client.execute_operations(
603 Self::generate_operations(
604 owner,
605 transactions_per_block,
606 fungible_application_id,
607 &mut destination_manager,
608 single_destination_per_block,
609 ),
610 vec![]
611 ) => {
612 result
613 .map_err(BenchmarkError::ChainClient)?
614 .expect("should execute block with operations");
615
616 let current_bps_count = bps_count.fetch_add(1, Ordering::Relaxed) + 1;
617 if current_bps_count >= bps {
618 notifier.notified().await;
619 }
620 }
621 }
622 }
623
624 info!("Exiting task...");
625 Ok(())
626 }
627
628 fn create_operation(
629 fungible_application_id: Option<ApplicationId>,
630 recipient_chain_id: ChainId,
631 owner: AccountOwner,
632 amount: Amount,
633 ) -> Operation {
634 match fungible_application_id {
635 Some(application_id) => {
636 Self::fungible_transfer(application_id, recipient_chain_id, owner, owner, amount)
637 }
638 None => Operation::system(SystemOperation::Transfer {
639 owner: AccountOwner::CHAIN,
640 recipient: Account::chain(recipient_chain_id),
641 amount,
642 }),
643 }
644 }
645
646 fn generate_operations(
648 owner: AccountOwner,
649 transactions_per_block: usize,
650 fungible_application_id: Option<ApplicationId>,
651 destination_manager: &mut ChainDestinationManager,
652 single_destination_per_block: bool,
653 ) -> Vec<Operation> {
654 let amount = Amount::from_attos(1);
655
656 if single_destination_per_block {
657 let recipient_chain_id = destination_manager.get_next_destination();
658
659 (0..transactions_per_block)
660 .map(|_| {
661 Self::create_operation(
662 fungible_application_id,
663 recipient_chain_id,
664 owner,
665 amount,
666 )
667 })
668 .collect()
669 } else {
670 let mut operations = Vec::with_capacity(transactions_per_block);
671 for _ in 0..transactions_per_block {
672 let recipient_chain_id = destination_manager.get_next_destination();
673
674 operations.push(Self::create_operation(
675 fungible_application_id,
676 recipient_chain_id,
677 owner,
678 amount,
679 ));
680 }
681 operations
682 }
683 }
684
685 pub async fn close_benchmark_chain(
687 chain_client: &ChainClient<Env>,
688 ) -> Result<(), BenchmarkError> {
689 let start = Instant::now();
690 chain_client
691 .execute_operation(Operation::system(SystemOperation::CloseChain))
692 .await?
693 .expect("Close chain operation should not fail!");
694
695 debug!(
696 "Closed chain {:?} in {} ms",
697 chain_client.chain_id(),
698 start.elapsed().as_millis()
699 );
700
701 Ok(())
702 }
703
704 pub fn get_all_chains(
705 chains_config_path: Option<&Path>,
706 benchmark_chains: &[(ChainId, AccountOwner)],
707 ) -> Result<Vec<ChainId>, BenchmarkError> {
708 let all_chains = if let Some(config_path) = chains_config_path {
709 if !config_path.exists() {
710 return Err(BenchmarkError::ConfigFileNotFound(
711 config_path.to_path_buf(),
712 ));
713 }
714 let config = BenchmarkConfig::load_from_file(config_path)
715 .map_err(BenchmarkError::ConfigLoadError)?;
716 config.chain_ids
717 } else {
718 benchmark_chains.iter().map(|(id, _)| *id).collect()
719 };
720
721 Ok(all_chains)
722 }
723
724 pub fn fungible_transfer(
726 application_id: ApplicationId,
727 chain_id: ChainId,
728 sender: AccountOwner,
729 receiver: AccountOwner,
730 amount: Amount,
731 ) -> Operation {
732 let target_account = Account {
733 chain_id,
734 owner: receiver,
735 };
736 let bytes = bcs::to_bytes(&FungibleOperation::Transfer {
737 owner: sender,
738 amount,
739 target_account,
740 })
741 .expect("should serialize fungible token operation");
742 Operation::User {
743 application_id,
744 bytes,
745 }
746 }
747}
748
749struct ChainDestinationManager {
750 source_chain_id: ChainId,
751 destination_index: usize,
752 destination_chains: Vec<ChainId>,
753 rng: SmallRng,
754}
755
756impl ChainDestinationManager {
757 fn new(
758 source_chain_id: ChainId,
759 mut destination_chains: Vec<ChainId>,
760 ) -> Result<Self, BenchmarkError> {
761 let mut rng = SmallRng::from_rng(thread_rng())?;
762 destination_chains.shuffle(&mut rng);
763
764 Ok(Self {
765 source_chain_id,
766 destination_index: 0,
767 destination_chains,
768 rng,
769 })
770 }
771
772 fn get_next_destination(&mut self) -> ChainId {
773 if self.destination_index >= self.destination_chains.len() {
775 self.destination_chains.shuffle(&mut self.rng);
777 self.destination_index = 0;
778 }
779
780 let destination_chain_id = self.destination_chains[self.destination_index];
781 self.destination_index += 1;
782
783 if destination_chain_id == self.source_chain_id {
784 self.get_next_destination()
785 } else {
786 destination_chain_id
787 }
788 }
789}