1use std::{
5 collections::HashMap,
6 path::Path,
7 sync::{
8 atomic::{AtomicUsize, Ordering},
9 Arc,
10 },
11};
12
13use linera_base::{
14 data_types::{Amount, Timestamp},
15 identifiers::{Account, AccountOwner, ApplicationId, ChainId},
16 time::Instant,
17};
18use linera_core::{
19 client::chain_client::{self, ChainClient},
20 data_types::ClientOutcome,
21 Environment,
22};
23use linera_execution::{system::SystemOperation, Operation};
24use linera_sdk::abis::fungible::FungibleOperation;
25use num_format::{Locale, ToFormattedString};
26use prometheus_parse::{HistogramCount, Scrape, Value};
27use rand::{rngs::SmallRng, seq::SliceRandom, thread_rng, SeedableRng};
28use serde::{Deserialize, Serialize};
29use tokio::{
30 sync::{mpsc, Barrier, Notify},
31 task, time,
32};
33use tokio_util::sync::CancellationToken;
34use tracing::{debug, error, info, warn, Instrument as _};
35
36use crate::chain_listener::{ChainListener, ClientContext};
37
38pub trait OperationGenerator: Send + 'static {
47 fn generate_operations(&mut self, owner: AccountOwner, count: usize) -> Vec<Operation>;
49}
50
51pub struct NativeFungibleTransferGenerator {
53 source_chain_id: ChainId,
54 destination_chains: Vec<ChainId>,
55 destination_index: usize,
56 rng: SmallRng,
57 single_destination_per_block: bool,
58}
59
60impl NativeFungibleTransferGenerator {
61 pub fn new(
62 source_chain_id: ChainId,
63 mut destination_chains: Vec<ChainId>,
64 single_destination_per_block: bool,
65 ) -> Result<Self, BenchmarkError> {
66 if destination_chains.is_empty() {
68 destination_chains.push(source_chain_id);
69 }
70 let mut rng = SmallRng::from_rng(thread_rng())?;
71 destination_chains.shuffle(&mut rng);
72 Ok(Self {
73 source_chain_id,
74 destination_chains,
75 destination_index: 0,
76 rng,
77 single_destination_per_block,
78 })
79 }
80
81 fn next_destination(&mut self) -> ChainId {
82 if self.destination_index >= self.destination_chains.len() {
83 self.destination_chains.shuffle(&mut self.rng);
84 self.destination_index = 0;
85 }
86 let destination_chain_id = self.destination_chains[self.destination_index];
87 self.destination_index += 1;
88 if destination_chain_id == self.source_chain_id && self.destination_chains.len() > 1 {
90 self.next_destination()
91 } else {
92 destination_chain_id
93 }
94 }
95}
96
97impl OperationGenerator for NativeFungibleTransferGenerator {
98 fn generate_operations(&mut self, _owner: AccountOwner, count: usize) -> Vec<Operation> {
99 let amount = Amount::from_attos(1);
100 if self.single_destination_per_block {
101 let recipient = self.next_destination();
102 (0..count)
103 .map(|_| {
104 Operation::system(SystemOperation::Transfer {
105 owner: AccountOwner::CHAIN,
106 recipient: Account::chain(recipient),
107 amount,
108 })
109 })
110 .collect()
111 } else {
112 (0..count)
113 .map(|_| {
114 let recipient = self.next_destination();
115 Operation::system(SystemOperation::Transfer {
116 owner: AccountOwner::CHAIN,
117 recipient: Account::chain(recipient),
118 amount,
119 })
120 })
121 .collect()
122 }
123 }
124}
125
126pub struct FungibleTransferGenerator {
128 application_id: ApplicationId,
129 source_chain_id: ChainId,
130 destination_chains: Vec<ChainId>,
131 destination_index: usize,
132 rng: SmallRng,
133 single_destination_per_block: bool,
134}
135
136impl FungibleTransferGenerator {
137 pub fn new(
138 application_id: ApplicationId,
139 source_chain_id: ChainId,
140 mut destination_chains: Vec<ChainId>,
141 single_destination_per_block: bool,
142 ) -> Result<Self, BenchmarkError> {
143 if destination_chains.is_empty() {
145 destination_chains.push(source_chain_id);
146 }
147 let mut rng = SmallRng::from_rng(thread_rng())?;
148 destination_chains.shuffle(&mut rng);
149 Ok(Self {
150 application_id,
151 source_chain_id,
152 destination_chains,
153 destination_index: 0,
154 rng,
155 single_destination_per_block,
156 })
157 }
158
159 fn next_destination(&mut self) -> ChainId {
160 if self.destination_index >= self.destination_chains.len() {
161 self.destination_chains.shuffle(&mut self.rng);
162 self.destination_index = 0;
163 }
164 let destination_chain_id = self.destination_chains[self.destination_index];
165 self.destination_index += 1;
166 if destination_chain_id == self.source_chain_id && self.destination_chains.len() > 1 {
168 self.next_destination()
169 } else {
170 destination_chain_id
171 }
172 }
173}
174
175impl OperationGenerator for FungibleTransferGenerator {
176 fn generate_operations(&mut self, owner: AccountOwner, count: usize) -> Vec<Operation> {
177 let amount = Amount::from_attos(1);
178 if self.single_destination_per_block {
179 let recipient = self.next_destination();
180 (0..count)
181 .map(|_| fungible_transfer(self.application_id, recipient, owner, owner, amount))
182 .collect()
183 } else {
184 (0..count)
185 .map(|_| {
186 let recipient = self.next_destination();
187 fungible_transfer(self.application_id, recipient, owner, owner, amount)
188 })
189 .collect()
190 }
191 }
192}
193
194const PROXY_LATENCY_P99_THRESHOLD: f64 = 400.0;
195const LATENCY_METRIC_PREFIX: &str = "linera_proxy_request_latency";
196
197#[derive(Debug, thiserror::Error)]
198pub enum BenchmarkError {
199 #[error("Failed to join task: {0}")]
200 JoinError(#[from] task::JoinError),
201 #[error("Chain client error: {0}")]
202 ChainClient(#[from] chain_client::Error),
203 #[error("Current histogram count is less than previous histogram count")]
204 HistogramCountMismatch,
205 #[error("Expected histogram value, got {0:?}")]
206 ExpectedHistogramValue(Value),
207 #[error("Expected untyped value, got {0:?}")]
208 ExpectedUntypedValue(Value),
209 #[error("Incomplete histogram data")]
210 IncompleteHistogramData,
211 #[error("Could not compute quantile")]
212 CouldNotComputeQuantile,
213 #[error("Bucket boundaries do not match: {0} vs {1}")]
214 BucketBoundariesDoNotMatch(f64, f64),
215 #[error("Reqwest error: {0}")]
216 Reqwest(#[from] reqwest::Error),
217 #[error("Io error: {0}")]
218 IoError(#[from] std::io::Error),
219 #[error("Previous histogram snapshot does not exist: {0}")]
220 PreviousHistogramSnapshotDoesNotExist(String),
221 #[error("No data available yet to calculate p99")]
222 NoDataYetForP99Calculation,
223 #[error("Unexpected empty bucket")]
224 UnexpectedEmptyBucket,
225 #[error("Failed to send unit message: {0}")]
226 TokioSendUnitError(#[from] mpsc::error::SendError<()>),
227 #[error("Config file not found: {0}")]
228 ConfigFileNotFound(std::path::PathBuf),
229 #[error("Failed to load config file: {0}")]
230 ConfigLoadError(#[from] anyhow::Error),
231 #[error("Could not find enough chains in wallet alone: needed {0}, but only found {1}")]
232 NotEnoughChainsInWallet(usize, usize),
233 #[error("Random number generator error: {0}")]
234 RandError(#[from] rand::Error),
235 #[error("Chain listener startup error")]
236 ChainListenerStartupError,
237}
238
239#[derive(Debug)]
240struct HistogramSnapshot {
241 buckets: Vec<HistogramCount>,
242 count: f64,
243 sum: f64,
244}
245
246#[derive(Debug, Clone, Serialize, Deserialize)]
247#[serde(rename_all = "kebab-case")]
248pub struct BenchmarkConfig {
249 pub chain_ids: Vec<ChainId>,
250}
251
252impl BenchmarkConfig {
253 pub fn load_from_file<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
254 let content = std::fs::read_to_string(path)?;
255 let config = serde_yaml::from_str(&content)?;
256 Ok(config)
257 }
258
259 pub fn save_to_file<P: AsRef<Path>>(&self, path: P) -> anyhow::Result<()> {
260 let content = serde_yaml::to_string(self)?;
261 std::fs::write(path, content)?;
262 Ok(())
263 }
264}
265
266pub struct Benchmark<Env: Environment> {
267 _phantom: std::marker::PhantomData<Env>,
268}
269
270impl<Env: Environment> Benchmark<Env> {
271 #[expect(clippy::too_many_arguments)]
276 pub async fn run_benchmark<C: ClientContext<Environment = Env> + 'static>(
277 bps: usize,
278 chain_clients: Vec<ChainClient<Env>>,
279 generators: Vec<Box<dyn OperationGenerator>>,
280 transactions_per_block: usize,
281 health_check_endpoints: Option<String>,
282 runtime_in_seconds: Option<u64>,
283 delay_between_chains_ms: Option<u64>,
284 chain_listener: ChainListener<C>,
285 shutdown_notifier: &CancellationToken,
286 ) -> Result<(), BenchmarkError> {
287 assert_eq!(
288 chain_clients.len(),
289 generators.len(),
290 "Must have one generator per chain client"
291 );
292 let num_chains = chain_clients.len();
293 let bps_counts = (0..num_chains)
294 .map(|_| Arc::new(AtomicUsize::new(0)))
295 .collect::<Vec<_>>();
296 let notifier = Arc::new(Notify::new());
297 let barrier = Arc::new(Barrier::new(num_chains + 1));
298
299 let chain_listener_future = chain_listener
300 .run()
301 .await
302 .map_err(|_| BenchmarkError::ChainListenerStartupError)?;
303 let chain_listener_handle = tokio::spawn(chain_listener_future.in_current_span());
304
305 let bps_control_task = Self::bps_control_task(
306 &barrier,
307 shutdown_notifier,
308 &bps_counts,
309 ¬ifier,
310 transactions_per_block,
311 bps,
312 );
313
314 let (runtime_control_task, runtime_control_sender) =
315 Self::runtime_control_task(shutdown_notifier, runtime_in_seconds, num_chains);
316
317 let bps_initial_share = bps / num_chains;
318 let mut bps_remainder = bps % num_chains;
319 let mut join_set = task::JoinSet::<Result<(), BenchmarkError>>::new();
320 for (chain_idx, (chain_client, generator)) in
321 chain_clients.into_iter().zip(generators).enumerate()
322 {
323 let chain_id = chain_client.chain_id();
324 let shutdown_notifier_clone = shutdown_notifier.clone();
325 let barrier_clone = barrier.clone();
326 let bps_count_clone = bps_counts[chain_idx].clone();
327 let notifier_clone = notifier.clone();
328 let runtime_control_sender_clone = runtime_control_sender.clone();
329 let bps_share = if bps_remainder > 0 {
330 bps_remainder -= 1;
331 bps_initial_share + 1
332 } else {
333 bps_initial_share
334 };
335 join_set.spawn(
336 async move {
337 Box::pin(Self::run_benchmark_internal(
338 chain_idx,
339 chain_id,
340 bps_share,
341 chain_client,
342 generator,
343 transactions_per_block,
344 shutdown_notifier_clone,
345 bps_count_clone,
346 barrier_clone,
347 notifier_clone,
348 runtime_control_sender_clone,
349 delay_between_chains_ms,
350 ))
351 .await?;
352
353 Ok(())
354 }
355 .instrument(tracing::info_span!("chain_id", chain_id = ?chain_id)),
356 );
357 }
358
359 let metrics_watcher =
360 Self::metrics_watcher(health_check_endpoints, shutdown_notifier).await?;
361
362 while let Some(result) = join_set.join_next().await {
364 let inner_result = result?;
365 if let Err(e) = inner_result {
366 error!("Benchmark task failed: {}", e);
367 shutdown_notifier.cancel();
368 join_set.abort_all();
369 return Err(e);
370 }
371 }
372 info!("All benchmark tasks completed successfully");
373
374 bps_control_task.await?;
375 if let Some(metrics_watcher) = metrics_watcher {
376 metrics_watcher.await??;
377 }
378 if let Some(runtime_control_task) = runtime_control_task {
379 runtime_control_task.await?;
380 }
381
382 if let Err(e) = chain_listener_handle.await? {
383 tracing::error!("chain listener error: {e}");
384 }
385
386 Ok(())
387 }
388
389 fn bps_control_task(
391 barrier: &Arc<Barrier>,
392 shutdown_notifier: &CancellationToken,
393 bps_counts: &[Arc<AtomicUsize>],
394 notifier: &Arc<Notify>,
395 transactions_per_block: usize,
396 bps: usize,
397 ) -> task::JoinHandle<()> {
398 let shutdown_notifier = shutdown_notifier.clone();
399 let bps_counts = bps_counts.to_vec();
400 let notifier = notifier.clone();
401 let barrier = barrier.clone();
402 task::spawn(
403 async move {
404 barrier.wait().await;
405 let mut one_second_interval = time::interval(time::Duration::from_secs(1));
406 loop {
407 if shutdown_notifier.is_cancelled() {
408 info!("Shutdown signal received in bps control task");
409 break;
410 }
411 one_second_interval.tick().await;
412 let current_bps_count: usize = bps_counts
413 .iter()
414 .map(|count| count.swap(0, Ordering::Relaxed))
415 .sum();
416 notifier.notify_waiters();
417 let formatted_current_bps = current_bps_count.to_formatted_string(&Locale::en);
418 let formatted_current_tps = (current_bps_count * transactions_per_block)
419 .to_formatted_string(&Locale::en);
420 let formatted_tps_goal =
421 (bps * transactions_per_block).to_formatted_string(&Locale::en);
422 let formatted_bps_goal = bps.to_formatted_string(&Locale::en);
423 if current_bps_count >= bps {
424 info!(
425 "Achieved {} BPS/{} TPS",
426 formatted_current_bps, formatted_current_tps
427 );
428 } else {
429 warn!(
430 "Failed to achieve {} BPS/{} TPS, only achieved {} BPS/{} TPS",
431 formatted_bps_goal,
432 formatted_tps_goal,
433 formatted_current_bps,
434 formatted_current_tps,
435 );
436 }
437 }
438
439 info!("Exiting bps control task");
440 }
441 .instrument(tracing::info_span!("bps_control")),
442 )
443 }
444
445 async fn metrics_watcher(
446 health_check_endpoints: Option<String>,
447 shutdown_notifier: &CancellationToken,
448 ) -> Result<Option<task::JoinHandle<Result<(), BenchmarkError>>>, BenchmarkError> {
449 if let Some(health_check_endpoints) = health_check_endpoints {
450 let metrics_addresses = health_check_endpoints
451 .split(',')
452 .map(|address| format!("http://{}/metrics", address.trim()))
453 .collect::<Vec<_>>();
454
455 let mut previous_histogram_snapshots: HashMap<String, HistogramSnapshot> =
456 HashMap::new();
457 let scrapes = Self::get_scrapes(&metrics_addresses).await?;
458 for (metrics_address, scrape) in scrapes {
459 previous_histogram_snapshots.insert(
460 metrics_address,
461 Self::parse_histogram(&scrape, LATENCY_METRIC_PREFIX)?,
462 );
463 }
464
465 let shutdown_notifier = shutdown_notifier.clone();
466 let metrics_watcher: task::JoinHandle<Result<(), BenchmarkError>> = tokio::spawn(
467 async move {
468 let mut health_interval = time::interval(time::Duration::from_secs(5));
469 let mut shutdown_interval = time::interval(time::Duration::from_secs(1));
470 loop {
471 tokio::select! {
472 biased;
473 _ = health_interval.tick() => {
474 let result = Self::validators_healthy(&metrics_addresses, &mut previous_histogram_snapshots).await;
475 if let Err(ref err) = result {
476 info!("Shutting down benchmark due to error: {}", err);
477 shutdown_notifier.cancel();
478 break;
479 } else if !result? {
480 info!("Shutting down benchmark due to unhealthy validators");
481 shutdown_notifier.cancel();
482 break;
483 }
484 }
485 _ = shutdown_interval.tick() => {
486 if shutdown_notifier.is_cancelled() {
487 info!("Shutdown signal received, stopping metrics watcher");
488 break;
489 }
490 }
491 }
492 }
493
494 Ok(())
495 }
496 .instrument(tracing::info_span!("metrics_watcher")),
497 );
498
499 Ok(Some(metrics_watcher))
500 } else {
501 Ok(None)
502 }
503 }
504
505 fn runtime_control_task(
506 shutdown_notifier: &CancellationToken,
507 runtime_in_seconds: Option<u64>,
508 num_chain_groups: usize,
509 ) -> (Option<task::JoinHandle<()>>, Option<mpsc::Sender<()>>) {
510 if let Some(runtime_in_seconds) = runtime_in_seconds {
511 let (runtime_control_sender, mut runtime_control_receiver) =
512 mpsc::channel(num_chain_groups);
513 let shutdown_notifier = shutdown_notifier.clone();
514 let runtime_control_task = task::spawn(
515 async move {
516 let mut chains_started = 0;
517 while runtime_control_receiver.recv().await.is_some() {
518 chains_started += 1;
519 if chains_started == num_chain_groups {
520 break;
521 }
522 }
523 time::sleep(time::Duration::from_secs(runtime_in_seconds)).await;
524 shutdown_notifier.cancel();
525 }
526 .instrument(tracing::info_span!("runtime_control")),
527 );
528 (Some(runtime_control_task), Some(runtime_control_sender))
529 } else {
530 (None, None)
531 }
532 }
533
534 async fn validators_healthy(
535 metrics_addresses: &[String],
536 previous_histogram_snapshots: &mut HashMap<String, HistogramSnapshot>,
537 ) -> Result<bool, BenchmarkError> {
538 let scrapes = Self::get_scrapes(metrics_addresses).await?;
539 for (metrics_address, scrape) in scrapes {
540 let histogram = Self::parse_histogram(&scrape, LATENCY_METRIC_PREFIX)?;
541 let diff = Self::diff_histograms(
542 previous_histogram_snapshots.get(&metrics_address).ok_or(
543 BenchmarkError::PreviousHistogramSnapshotDoesNotExist(metrics_address.clone()),
544 )?,
545 &histogram,
546 )?;
547 let p99 = match Self::compute_quantile(&diff.buckets, diff.count, 0.99) {
548 Ok(p99) => p99,
549 Err(BenchmarkError::NoDataYetForP99Calculation) => {
550 info!(
551 "No data available yet to calculate p99 for {}",
552 metrics_address
553 );
554 continue;
555 }
556 Err(e) => {
557 error!("Error computing p99 for {}: {}", metrics_address, e);
558 return Err(e);
559 }
560 };
561
562 let last_bucket_boundary = diff.buckets[diff.buckets.len() - 2].less_than;
563 if p99 == f64::INFINITY {
564 info!(
565 "{} -> Estimated p99 for {} is higher than the last bucket boundary of {:?} ms",
566 metrics_address, LATENCY_METRIC_PREFIX, last_bucket_boundary
567 );
568 } else {
569 info!(
570 "{} -> Estimated p99 for {}: {:.2} ms",
571 metrics_address, LATENCY_METRIC_PREFIX, p99
572 );
573 }
574 if p99 > PROXY_LATENCY_P99_THRESHOLD {
575 if p99 == f64::INFINITY {
576 error!(
577 "Proxy of validator {} unhealthy! Latency p99 is too high, it is higher than \
578 the last bucket boundary of {:.2} ms",
579 metrics_address, last_bucket_boundary
580 );
581 } else {
582 error!(
583 "Proxy of validator {} unhealthy! Latency p99 is too high: {:.2} ms",
584 metrics_address, p99
585 );
586 }
587 return Ok(false);
588 }
589 previous_histogram_snapshots.insert(metrics_address.clone(), histogram);
590 }
591
592 Ok(true)
593 }
594
595 fn diff_histograms(
596 previous: &HistogramSnapshot,
597 current: &HistogramSnapshot,
598 ) -> Result<HistogramSnapshot, BenchmarkError> {
599 if current.count < previous.count {
600 return Err(BenchmarkError::HistogramCountMismatch);
601 }
602 let total_diff = current.count - previous.count;
603 let mut buckets_diff: Vec<HistogramCount> = Vec::new();
604 for (before, after) in previous.buckets.iter().zip(current.buckets.iter()) {
605 let bound_before = before.less_than;
606 let bound_after = after.less_than;
607 let cumulative_before = before.count;
608 let cumulative_after = after.count;
609 if (bound_before - bound_after).abs() > f64::EPSILON {
610 return Err(BenchmarkError::BucketBoundariesDoNotMatch(
611 bound_before,
612 bound_after,
613 ));
614 }
615 let diff = (cumulative_after - cumulative_before).max(0.0);
616 buckets_diff.push(HistogramCount {
617 less_than: bound_after,
618 count: diff,
619 });
620 }
621 Ok(HistogramSnapshot {
622 buckets: buckets_diff,
623 count: total_diff,
624 sum: current.sum - previous.sum,
625 })
626 }
627
628 async fn get_scrapes(
629 metrics_addresses: &[String],
630 ) -> Result<Vec<(String, Scrape)>, BenchmarkError> {
631 let mut scrapes = Vec::new();
632 for metrics_address in metrics_addresses {
633 let response = reqwest::get(metrics_address)
634 .await
635 .map_err(BenchmarkError::Reqwest)?;
636 let metrics = response.text().await.map_err(BenchmarkError::Reqwest)?;
637 let scrape = Scrape::parse(metrics.lines().map(|line| Ok(line.to_owned())))
638 .map_err(BenchmarkError::IoError)?;
639 scrapes.push((metrics_address.clone(), scrape));
640 }
641 Ok(scrapes)
642 }
643
644 fn parse_histogram(
645 scrape: &Scrape,
646 metric_prefix: &str,
647 ) -> Result<HistogramSnapshot, BenchmarkError> {
648 let mut buckets: Vec<HistogramCount> = Vec::new();
649 let mut total_count: Option<f64> = None;
650 let mut total_sum: Option<f64> = None;
651
652 for sample in &scrape.samples {
654 if sample.metric == metric_prefix {
655 if let Value::Histogram(histogram) = &sample.value {
656 buckets.extend(histogram.iter().cloned());
657 } else {
658 return Err(BenchmarkError::ExpectedHistogramValue(sample.value.clone()));
659 }
660 } else if sample.metric == format!("{}_count", metric_prefix) {
661 if let Value::Untyped(count) = sample.value {
662 total_count = Some(count);
663 } else {
664 return Err(BenchmarkError::ExpectedUntypedValue(sample.value.clone()));
665 }
666 } else if sample.metric == format!("{}_sum", metric_prefix) {
667 if let Value::Untyped(sum) = sample.value {
668 total_sum = Some(sum);
669 } else {
670 return Err(BenchmarkError::ExpectedUntypedValue(sample.value.clone()));
671 }
672 }
673 }
674
675 match (total_count, total_sum) {
676 (Some(count), Some(sum)) if !buckets.is_empty() => {
677 buckets.sort_by(|a, b| {
678 a.less_than
679 .partial_cmp(&b.less_than)
680 .expect("Comparison should not fail")
681 });
682 Ok(HistogramSnapshot {
683 buckets,
684 count,
685 sum,
686 })
687 }
688 _ => Err(BenchmarkError::IncompleteHistogramData),
689 }
690 }
691
692 fn compute_quantile(
693 buckets: &[HistogramCount],
694 total_count: f64,
695 quantile: f64,
696 ) -> Result<f64, BenchmarkError> {
697 if total_count == 0.0 {
698 return Err(BenchmarkError::NoDataYetForP99Calculation);
700 }
701 let target = (quantile * total_count).ceil();
703 let mut prev_cumulative = 0.0;
704 let mut prev_bound = 0.0;
705 for bucket in buckets {
706 if bucket.count >= target {
707 let bucket_count = bucket.count - prev_cumulative;
708 if bucket_count == 0.0 {
709 return Err(BenchmarkError::UnexpectedEmptyBucket);
711 }
712 let fraction = (target - prev_cumulative) / bucket_count;
713 return Ok(prev_bound + (bucket.less_than - prev_bound) * fraction);
714 }
715 prev_cumulative = bucket.count;
716 prev_bound = bucket.less_than;
717 }
718 Err(BenchmarkError::CouldNotComputeQuantile)
719 }
720
721 #[expect(clippy::too_many_arguments)]
722 async fn run_benchmark_internal(
723 chain_idx: usize,
724 chain_id: ChainId,
725 bps: usize,
726 chain_client: ChainClient<Env>,
727 mut generator: Box<dyn OperationGenerator>,
728 transactions_per_block: usize,
729 shutdown_notifier: CancellationToken,
730 bps_count: Arc<AtomicUsize>,
731 barrier: Arc<Barrier>,
732 notifier: Arc<Notify>,
733 runtime_control_sender: Option<mpsc::Sender<()>>,
734 delay_between_chains_ms: Option<u64>,
735 ) -> Result<(), BenchmarkError> {
736 barrier.wait().await;
737 if let Some(delay_between_chains_ms) = delay_between_chains_ms {
738 time::sleep(time::Duration::from_millis(
739 (chain_idx as u64) * delay_between_chains_ms,
740 ))
741 .await;
742 }
743 info!("Starting benchmark for chain {:?}", chain_id);
744
745 if let Some(runtime_control_sender) = runtime_control_sender {
746 runtime_control_sender.send(()).await?;
747 }
748
749 let owner = chain_client
750 .identity()
751 .await
752 .map_err(BenchmarkError::ChainClient)?;
753
754 loop {
755 tokio::select! {
756 biased;
757
758 _ = shutdown_notifier.cancelled() => {
759 info!("Shutdown signal received, stopping benchmark");
760 break;
761 }
762 result = chain_client.execute_operations(
763 generator.generate_operations(owner, transactions_per_block),
764 vec![]
765 ) => {
766 result
767 .map_err(BenchmarkError::ChainClient)?
768 .expect("should execute block with operations");
769
770 let current_bps_count = bps_count.fetch_add(1, Ordering::Relaxed) + 1;
771 if current_bps_count >= bps {
772 notifier.notified().await;
773 }
774 }
775 }
776 }
777
778 info!("Exiting task...");
779 Ok(())
780 }
781
782 pub async fn close_benchmark_chain(
784 chain_client: &ChainClient<Env>,
785 ) -> Result<(), BenchmarkError> {
786 let start = Instant::now();
787 loop {
788 let result = chain_client
789 .execute_operation(Operation::system(SystemOperation::CloseChain))
790 .await?;
791 match result {
792 ClientOutcome::Committed(_) => break,
793 ClientOutcome::Conflict(certificate) => {
794 info!(
795 "Conflict while closing chain {:?}: {}. Retrying...",
796 chain_client.chain_id(),
797 certificate.hash()
798 );
799 }
800 ClientOutcome::WaitForTimeout(timeout) => {
801 info!(
802 "Waiting for timeout while closing chain {:?}: {}",
803 chain_client.chain_id(),
804 timeout
805 );
806 linera_base::time::timer::sleep(
807 timeout.timestamp.duration_since(Timestamp::now()),
808 )
809 .await;
810 }
811 }
812 }
813
814 debug!(
815 "Closed chain {:?} in {} ms",
816 chain_client.chain_id(),
817 start.elapsed().as_millis()
818 );
819
820 Ok(())
821 }
822
823 pub fn get_all_chains(
824 chains_config_path: Option<&Path>,
825 benchmark_chains: &[(ChainId, AccountOwner)],
826 ) -> Result<Vec<ChainId>, BenchmarkError> {
827 let all_chains = if let Some(config_path) = chains_config_path {
828 if !config_path.exists() {
829 return Err(BenchmarkError::ConfigFileNotFound(
830 config_path.to_path_buf(),
831 ));
832 }
833 let config = BenchmarkConfig::load_from_file(config_path)
834 .map_err(BenchmarkError::ConfigLoadError)?;
835 config.chain_ids
836 } else {
837 benchmark_chains.iter().map(|(id, _)| *id).collect()
838 };
839
840 Ok(all_chains)
841 }
842}
843
844pub fn fungible_transfer(
845 application_id: ApplicationId,
846 chain_id: ChainId,
847 sender: AccountOwner,
848 receiver: AccountOwner,
849 amount: Amount,
850) -> Operation {
851 let target_account = Account {
852 chain_id,
853 owner: receiver,
854 };
855 let bytes = bcs::to_bytes(&FungibleOperation::Transfer {
856 owner: sender,
857 amount,
858 target_account,
859 })
860 .expect("should serialize fungible token operation");
861 Operation::User {
862 application_id,
863 bytes,
864 }
865}