linera_core/client/requests_scheduler/
scheduler.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::BTreeMap,
6    future::Future,
7    sync::{
8        atomic::{AtomicU32, Ordering},
9        Arc,
10    },
11};
12
13use custom_debug_derive::Debug;
14use futures::stream::{FuturesUnordered, StreamExt};
15use linera_base::{
16    crypto::ValidatorPublicKey,
17    data_types::{Blob, BlobContent, BlockHeight},
18    identifiers::{BlobId, ChainId},
19    time::{Duration, Instant},
20};
21use linera_chain::types::ConfirmedBlockCertificate;
22use rand::{
23    distributions::{Distribution, WeightedIndex},
24    prelude::SliceRandom as _,
25};
26use tracing::instrument;
27
28use super::{
29    cache::{RequestsCache, SubsumingKey},
30    in_flight_tracker::{InFlightMatch, InFlightTracker},
31    node_info::NodeInfo,
32    request::{RequestKey, RequestResult},
33    scoring::ScoringWeights,
34};
35use crate::{
36    client::{
37        communicate_concurrently,
38        requests_scheduler::{in_flight_tracker::Subscribed, request::Cacheable},
39        RequestsSchedulerConfig,
40    },
41    environment::Environment,
42    node::{NodeError, ValidatorNode},
43    remote_node::RemoteNode,
44};
45
46#[cfg(with_metrics)]
47pub(super) mod metrics {
48    use std::sync::LazyLock;
49
50    use linera_base::prometheus_util::{
51        exponential_bucket_latencies, register_histogram_vec, register_int_counter,
52        register_int_counter_vec,
53    };
54    use prometheus::{HistogramVec, IntCounter, IntCounterVec};
55
56    /// Histogram of response times per validator (in milliseconds)
57    pub(super) static VALIDATOR_RESPONSE_TIME: LazyLock<HistogramVec> = LazyLock::new(|| {
58        register_histogram_vec(
59            "requests_scheduler_response_time_ms",
60            "Response time for requests to validators in milliseconds",
61            &["validator"],
62            exponential_bucket_latencies(10000.0), // up to 10 seconds
63        )
64    });
65
66    /// Counter of total requests made to each validator
67    pub(super) static VALIDATOR_REQUEST_TOTAL: LazyLock<IntCounterVec> = LazyLock::new(|| {
68        register_int_counter_vec(
69            "requests_scheduler_request_total",
70            "Total number of requests made to each validator",
71            &["validator"],
72        )
73    });
74
75    /// Counter of successful requests per validator
76    pub(super) static VALIDATOR_REQUEST_SUCCESS: LazyLock<IntCounterVec> = LazyLock::new(|| {
77        register_int_counter_vec(
78            "requests_scheduler_request_success",
79            "Number of successful requests to each validator",
80            &["validator"],
81        )
82    });
83
84    /// Counter for requests that were resolved from the response cache.
85    pub(super) static REQUEST_CACHE_DEDUPLICATION: LazyLock<IntCounter> = LazyLock::new(|| {
86        register_int_counter(
87            "requests_scheduler_request_deduplication_total",
88            "Number of requests that were deduplicated by finding the result in the cache.",
89        )
90    });
91
92    /// Counter for requests that were served from cache
93    pub static REQUEST_CACHE_HIT: LazyLock<IntCounter> = LazyLock::new(|| {
94        register_int_counter(
95            "requests_scheduler_request_cache_hit_total",
96            "Number of requests that were served from cache",
97        )
98    });
99}
100
101/// Manages a pool of validator nodes with intelligent load balancing and performance tracking.
102///
103/// The `RequestsScheduler` maintains performance metrics for each validator node using
104/// Exponential Moving Averages (EMA) and uses these metrics to make intelligent routing
105/// decisions. It prevents node overload through request capacity limits and automatically
106/// retries failed requests on alternative nodes.
107///
108/// # Examples
109///
110/// ```ignore
111/// // Create with default configuration (balanced scoring)
112/// let manager = RequestsScheduler::new(validator_nodes);
113///
114/// // Create with custom configuration prioritizing low latency
115/// let latency_weights = ScoringWeights {
116///     latency: 0.6,
117///     success: 0.3,
118///     load: 0.1,
119/// };
120/// let manager = RequestsScheduler::with_config(
121///     validator_nodes,
122///     15,                      // max 15 concurrent requests per node
123///     latency_weights,         // custom scoring weights
124///     0.2,                     // higher alpha for faster adaptation
125///     3000.0,                  // max expected latency (3 seconds)
126///     Duration::from_secs(60), // 60 second cache TTL
127///     200,                     // cache up to 200 entries
128/// );
129/// ```
130#[derive(Debug, Clone)]
131pub struct RequestsScheduler<Env: Environment> {
132    /// Thread-safe map of validator nodes indexed by their public keys.
133    /// Each node is wrapped with EMA-based performance tracking information.
134    nodes: Arc<tokio::sync::RwLock<BTreeMap<ValidatorPublicKey, NodeInfo<Env>>>>,
135    /// Default scoring weights applied to new nodes.
136    weights: ScoringWeights,
137    /// Default EMA smoothing factor for new nodes.
138    alpha: f64,
139    /// Default maximum expected latency in milliseconds for score normalization.
140    max_expected_latency: f64,
141    /// Delay between starting requests to alternative peers.
142    retry_delay: Duration,
143    /// Tracks in-flight requests to deduplicate concurrent requests for the same data.
144    in_flight_tracker: InFlightTracker<RemoteNode<Env::ValidatorNode>>,
145    /// Cache of recently completed requests with their results and timestamps.
146    cache: RequestsCache<RequestKey, RequestResult>,
147}
148
149impl<Env: Environment> RequestsScheduler<Env> {
150    /// Creates a new `RequestsScheduler` with the provided configuration.
151    pub fn new(
152        nodes: impl IntoIterator<Item = RemoteNode<Env::ValidatorNode>>,
153        config: RequestsSchedulerConfig,
154    ) -> Self {
155        Self::with_config(
156            nodes,
157            ScoringWeights::default(),
158            config.alpha,
159            config.max_accepted_latency_ms,
160            Duration::from_millis(config.cache_ttl_ms),
161            config.cache_max_size,
162            Duration::from_millis(config.max_request_ttl_ms),
163            Duration::from_millis(config.retry_delay_ms),
164        )
165    }
166
167    /// Creates a new `RequestsScheduler` with custom configuration.
168    ///
169    /// # Arguments
170    /// - `nodes`: Initial set of validator nodes
171    /// - `max_requests_per_node`: Maximum concurrent requests per node
172    /// - `weights`: Scoring weights for performance metrics
173    /// - `alpha`: EMA smoothing factor (0 < alpha < 1)
174    /// - `max_expected_latency_ms`: Maximum expected latency for score normalization
175    /// - `cache_ttl`: Time-to-live for cached responses
176    /// - `max_cache_size`: Maximum number of entries in the cache
177    /// - `max_request_ttl`: Maximum latency for an in-flight request before we stop deduplicating it
178    /// - `retry_delay_ms`: Delay in milliseconds between starting requests to different peers.
179    #[allow(clippy::too_many_arguments)]
180    pub fn with_config(
181        nodes: impl IntoIterator<Item = RemoteNode<Env::ValidatorNode>>,
182        weights: ScoringWeights,
183        alpha: f64,
184        max_expected_latency_ms: f64,
185        cache_ttl: Duration,
186        max_cache_size: usize,
187        max_request_ttl: Duration,
188        retry_delay: Duration,
189    ) -> Self {
190        assert!(alpha > 0.0 && alpha < 1.0, "Alpha must be in (0, 1) range");
191        Self {
192            nodes: Arc::new(tokio::sync::RwLock::new(
193                nodes
194                    .into_iter()
195                    .map(|node| {
196                        (
197                            node.public_key,
198                            NodeInfo::with_config(node, weights, alpha, max_expected_latency_ms),
199                        )
200                    })
201                    .collect(),
202            )),
203            weights,
204            alpha,
205            max_expected_latency: max_expected_latency_ms,
206            retry_delay,
207            in_flight_tracker: InFlightTracker::new(max_request_ttl),
208            cache: RequestsCache::new(cache_ttl, max_cache_size),
209        }
210    }
211
212    /// Executes an operation with an automatically selected peer, handling deduplication,
213    /// tracking, and peer selection.
214    ///
215    /// This method provides a high-level API for executing operations against remote nodes
216    /// while leveraging the [`RequestsScheduler`]'s intelligent peer selection, performance tracking,
217    /// and request deduplication capabilities.
218    ///
219    /// # Type Parameters
220    /// - `R`: The inner result type (what the operation returns on success)
221    /// - `F`: The async closure type that takes a `RemoteNode` and returns a future
222    /// - `Fut`: The future type returned by the closure
223    ///
224    /// # Arguments
225    /// - `key`: Unique identifier for request deduplication
226    /// - `operation`: Async closure that takes a selected peer and performs the operation
227    ///
228    /// # Returns
229    /// The result from the operation, potentially from cache or a deduplicated in-flight request
230    ///
231    /// # Example
232    /// ```ignore
233    /// let result: Result<Vec<ConfirmedBlockCertificate>, NodeError> = requests_scheduler
234    ///     .with_best(
235    ///         RequestKey::Certificates { chain_id, start, limit },
236    ///         |peer| async move {
237    ///             peer.download_certificates_from(chain_id, start, limit).await
238    ///         }
239    ///     )
240    ///     .await;
241    /// ```
242    #[allow(unused)]
243    async fn with_best<R, F, Fut>(&self, key: RequestKey, operation: F) -> Result<R, NodeError>
244    where
245        R: Cacheable + Clone + Send + 'static,
246        F: Fn(RemoteNode<Env::ValidatorNode>) -> Fut,
247        Fut: Future<Output = Result<R, NodeError>> + 'static,
248    {
249        // Select the best available peer
250        let peer = self
251            .select_best_peer()
252            .await
253            .ok_or_else(|| NodeError::WorkerError {
254                error: "No validators available".to_string(),
255            })?;
256        self.with_peer(key, peer, operation).await
257    }
258
259    /// Executes an operation with a specific peer.
260    ///
261    /// Similar to [`with_best`](Self::with_best), but uses the provided peer directly
262    /// instead of selecting the best available peer. This is useful when you need to
263    /// query a specific validator node.
264    ///
265    /// # Type Parameters
266    /// - `R`: The inner result type (what the operation returns on success)
267    /// - `F`: The async closure type that takes a `RemoteNode` and returns a future
268    /// - `Fut`: The future type returned by the closure
269    ///
270    /// # Arguments
271    /// - `key`: Unique identifier for request deduplication
272    /// - `peer`: The specific peer to use for the operation
273    /// - `operation`: Async closure that takes the peer and performs the operation
274    ///
275    /// # Returns
276    /// The result from the operation, potentially from cache or a deduplicated in-flight request
277    async fn with_peer<R, F, Fut>(
278        &self,
279        key: RequestKey,
280        peer: RemoteNode<Env::ValidatorNode>,
281        operation: F,
282    ) -> Result<R, NodeError>
283    where
284        R: Cacheable + Clone + Send + 'static,
285        F: Fn(RemoteNode<Env::ValidatorNode>) -> Fut,
286        Fut: Future<Output = Result<R, NodeError>> + 'static,
287    {
288        self.add_peer(peer.clone()).await;
289        self.in_flight_tracker
290            .add_alternative_peer(&key, peer.clone())
291            .await;
292
293        // Clone the nodes Arc so we can move it into the closure
294        let nodes = self.nodes.clone();
295        self.deduplicated_request(key, peer, move |peer| {
296            let fut = operation(peer.clone());
297            let nodes = nodes.clone();
298            async move { Self::track_request(nodes, peer, fut).await }
299        })
300        .await
301    }
302
303    #[instrument(level = "trace", skip_all)]
304    async fn download_blob(
305        &self,
306        peers: &[RemoteNode<Env::ValidatorNode>],
307        blob_id: BlobId,
308        timeout: Duration,
309    ) -> Result<Option<Blob>, NodeError> {
310        let key = RequestKey::Blob(blob_id);
311        let mut peers = peers.to_vec();
312        peers.shuffle(&mut rand::thread_rng());
313        communicate_concurrently(
314            &peers,
315            async move |peer| {
316                self.with_peer(key, peer, move |peer| async move {
317                    peer.download_blob(blob_id).await
318                })
319                .await
320            },
321            |errors| errors.last().cloned().unwrap(),
322            timeout,
323        )
324        .await
325        .map_err(|(_validator, error)| error)
326    }
327
328    /// Downloads the blobs with the given IDs. This is done in one concurrent task per blob.
329    /// Uses intelligent peer selection based on scores and load balancing.
330    /// Returns `None` if it couldn't find all blobs.
331    #[instrument(level = "trace", skip_all)]
332    pub async fn download_blobs(
333        &self,
334        peers: &[RemoteNode<Env::ValidatorNode>],
335        blob_ids: &[BlobId],
336        timeout: Duration,
337    ) -> Result<Option<Vec<Blob>>, NodeError> {
338        let mut stream = blob_ids
339            .iter()
340            .map(|blob_id| self.download_blob(peers, *blob_id, timeout))
341            .collect::<FuturesUnordered<_>>();
342
343        let mut blobs = Vec::new();
344        while let Some(maybe_blob) = stream.next().await {
345            blobs.push(maybe_blob?);
346        }
347        Ok(blobs.into_iter().collect::<Option<Vec<_>>>())
348    }
349
350    pub async fn download_certificates(
351        &self,
352        peer: &RemoteNode<Env::ValidatorNode>,
353        chain_id: ChainId,
354        start: BlockHeight,
355        limit: u64,
356    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
357        let heights = (start.0..start.0 + limit)
358            .map(BlockHeight)
359            .collect::<Vec<_>>();
360        self.with_peer(
361            RequestKey::Certificates {
362                chain_id,
363                heights: heights.clone(),
364            },
365            peer.clone(),
366            move |peer| {
367                let heights = heights.clone();
368                async move {
369                    Box::pin(peer.download_certificates_by_heights(chain_id, heights)).await
370                }
371            },
372        )
373        .await
374    }
375
376    pub async fn download_certificates_by_heights(
377        &self,
378        peer: &RemoteNode<Env::ValidatorNode>,
379        chain_id: ChainId,
380        heights: Vec<BlockHeight>,
381    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
382        self.with_peer(
383            RequestKey::Certificates {
384                chain_id,
385                heights: heights.clone(),
386            },
387            peer.clone(),
388            move |peer| {
389                let heights = heights.clone();
390                async move {
391                    peer.download_certificates_by_heights(chain_id, heights)
392                        .await
393                }
394            },
395        )
396        .await
397    }
398
399    pub async fn download_certificate_for_blob(
400        &self,
401        peer: &RemoteNode<Env::ValidatorNode>,
402        blob_id: BlobId,
403    ) -> Result<ConfirmedBlockCertificate, NodeError> {
404        self.with_peer(
405            RequestKey::CertificateForBlob(blob_id),
406            peer.clone(),
407            move |peer| async move { peer.download_certificate_for_blob(blob_id).await },
408        )
409        .await
410    }
411
412    pub async fn download_pending_blob(
413        &self,
414        peer: &RemoteNode<Env::ValidatorNode>,
415        chain_id: ChainId,
416        blob_id: BlobId,
417    ) -> Result<BlobContent, NodeError> {
418        self.with_peer(
419            RequestKey::PendingBlob { chain_id, blob_id },
420            peer.clone(),
421            move |peer| async move { peer.node.download_pending_blob(chain_id, blob_id).await },
422        )
423        .await
424    }
425
426    /// Returns the alternative peers registered for an in-flight request, if any.
427    ///
428    /// This can be used to retry a failed request with alternative data sources
429    /// that were registered during request deduplication.
430    pub async fn get_alternative_peers(
431        &self,
432        key: &RequestKey,
433    ) -> Option<Vec<RemoteNode<Env::ValidatorNode>>> {
434        self.in_flight_tracker.get_alternative_peers(key).await
435    }
436
437    /// Returns current performance metrics for all managed nodes.
438    ///
439    /// Each entry contains:
440    /// - Performance score (f64, normalized 0.0-1.0)
441    /// - EMA success rate (f64, 0.0-1.0)
442    /// - Total requests processed (u64)
443    ///
444    /// Useful for monitoring and debugging node performance.
445    pub async fn get_node_scores(&self) -> BTreeMap<ValidatorPublicKey, (f64, f64, u64)> {
446        let nodes = self.nodes.read().await;
447        let mut result = BTreeMap::new();
448
449        for (key, info) in nodes.iter() {
450            let score = info.calculate_score().await;
451            result.insert(
452                *key,
453                (score, info.ema_success_rate(), info.total_requests()),
454            );
455        }
456
457        result
458    }
459
460    /// Wraps a request operation with performance tracking and capacity management.
461    ///
462    /// This method:
463    /// 1. Measures response time
464    /// 2. Updates node metrics based on success/failure
465    ///
466    /// # Arguments
467    /// - `nodes`: Arc to the nodes map for updating metrics
468    /// - `peer`: The remote node to track metrics for
469    /// - `operation`: Future that performs the actual request
470    ///
471    /// # Behavior
472    /// Executes the provided future and tracks metrics for the given peer.
473    async fn track_request<T, Fut>(
474        nodes: Arc<tokio::sync::RwLock<BTreeMap<ValidatorPublicKey, NodeInfo<Env>>>>,
475        peer: RemoteNode<Env::ValidatorNode>,
476        operation: Fut,
477    ) -> Result<T, NodeError>
478    where
479        Fut: Future<Output = Result<T, NodeError>> + 'static,
480    {
481        let start_time = Instant::now();
482        let public_key = peer.public_key;
483
484        // Execute the operation
485        let result = operation.await;
486
487        // Update metrics and release slot
488        let response_time_ms = start_time.elapsed().as_millis() as u64;
489        let is_success = result.is_ok();
490        {
491            let mut nodes_guard = nodes.write().await;
492            if let Some(info) = nodes_guard.get_mut(&public_key) {
493                info.update_metrics(is_success, response_time_ms);
494                let score = info.calculate_score().await;
495                tracing::trace!(
496                    node = %public_key,
497                    address = %info.node.node.address(),
498                    success = %is_success,
499                    response_time_ms = %response_time_ms,
500                    score = %score,
501                    total_requests = %info.total_requests(),
502                    "Request completed"
503                );
504            }
505        }
506
507        // Record Prometheus metrics
508        #[cfg(with_metrics)]
509        {
510            let validator_name = public_key.to_string();
511            metrics::VALIDATOR_RESPONSE_TIME
512                .with_label_values(&[&validator_name])
513                .observe(response_time_ms as f64);
514            metrics::VALIDATOR_REQUEST_TOTAL
515                .with_label_values(&[&validator_name])
516                .inc();
517            if is_success {
518                metrics::VALIDATOR_REQUEST_SUCCESS
519                    .with_label_values(&[&validator_name])
520                    .inc();
521            }
522        }
523
524        result
525    }
526
527    /// Deduplicates concurrent requests for the same data.
528    ///
529    /// If a request for the same key is already in flight, this method waits for
530    /// the existing request to complete and returns its result. Otherwise, it
531    /// executes the operation and broadcasts the result to all waiting callers.
532    ///
533    /// This method also performs **subsumption-based deduplication**: if a larger
534    /// request that contains all the data needed by this request is already cached
535    /// or in flight, we can extract the subset result instead of making a new request.
536    ///
537    /// # Arguments
538    /// - `key`: Unique identifier for the request
539    /// - `operation`: Async closure that performs the actual request
540    ///
541    /// # Returns
542    /// The result from either the in-flight request or the newly executed operation
543    async fn deduplicated_request<T, F, Fut>(
544        &self,
545        key: RequestKey,
546        peer: RemoteNode<Env::ValidatorNode>,
547        operation: F,
548    ) -> Result<T, NodeError>
549    where
550        T: Cacheable + Clone + Send + 'static,
551        F: Fn(RemoteNode<Env::ValidatorNode>) -> Fut,
552        Fut: Future<Output = Result<T, NodeError>> + 'static,
553    {
554        // Check cache for exact or subsuming match
555        if let Some(result) = self.cache.get(&key).await {
556            return Ok(result);
557        }
558
559        // Check if there's an in-flight request (exact or subsuming)
560        if let Some(in_flight_match) = self.in_flight_tracker.try_subscribe(&key).await {
561            match in_flight_match {
562                InFlightMatch::Exact(Subscribed(mut receiver)) => {
563                    tracing::trace!(
564                        ?key,
565                        "deduplicating request (exact match) - joining existing in-flight request"
566                    );
567                    #[cfg(with_metrics)]
568                    metrics::REQUEST_CACHE_DEDUPLICATION.inc();
569                    // Wait for result from existing request
570                    match receiver.recv().await {
571                        Ok(result) => match result.as_ref().clone() {
572                            Ok(res) => match T::try_from(res) {
573                                Ok(converted) => {
574                                    tracing::trace!(
575                                        ?key,
576                                        "received result from deduplicated in-flight request"
577                                    );
578                                    return Ok(converted);
579                                }
580                                Err(_) => {
581                                    tracing::warn!(
582                                        ?key,
583                                        "failed to convert result from deduplicated in-flight request, will execute independently"
584                                    );
585                                }
586                            },
587                            Err(error) => {
588                                tracing::trace!(
589                                    ?key,
590                                    %error,
591                                    "in-flight request failed",
592                                );
593                                // Fall through to execute a new request
594                            }
595                        },
596                        Err(_) => {
597                            tracing::trace!(?key, "in-flight request sender dropped");
598                            // Fall through to execute a new request
599                        }
600                    }
601                }
602                InFlightMatch::Subsuming {
603                    key: subsuming_key,
604                    outcome: Subscribed(mut receiver),
605                } => {
606                    tracing::trace!(
607                    ?key,
608                    subsumed_by = ?subsuming_key,
609                        "deduplicating request (subsumption) - joining larger in-flight request"
610                    );
611                    #[cfg(with_metrics)]
612                    metrics::REQUEST_CACHE_DEDUPLICATION.inc();
613                    // Wait for result from the subsuming request
614                    match receiver.recv().await {
615                        Ok(result) => {
616                            match result.as_ref() {
617                                Ok(res) => {
618                                    if let Some(extracted) =
619                                        key.try_extract_result(&subsuming_key, res)
620                                    {
621                                        tracing::trace!(
622                                            ?key,
623                                            "extracted subset result from larger in-flight request"
624                                        );
625                                        match T::try_from(extracted) {
626                                            Ok(converted) => return Ok(converted),
627                                            Err(_) => {
628                                                tracing::trace!(
629                                                    ?key,
630                                                    "failed to convert extracted result, will execute independently"
631                                                );
632                                            }
633                                        }
634                                    } else {
635                                        // Extraction failed, fall through to execute our own request
636                                        tracing::trace!(
637                                            ?key,
638                                            "failed to extract from subsuming request, will execute independently"
639                                        );
640                                    }
641                                }
642                                Err(error) => {
643                                    tracing::trace!(
644                                        ?key,
645                                        ?error,
646                                        "subsuming in-flight request failed",
647                                    );
648                                    // Fall through to execute our own request
649                                }
650                            }
651                        }
652                        Err(_) => {
653                            tracing::trace!(?key, "subsuming in-flight request sender dropped");
654                        }
655                    }
656                }
657            }
658        };
659
660        // Create new in-flight entry for this request
661        self.in_flight_tracker.insert_new(key.clone()).await;
662
663        // Remove the peer we're about to use from alternatives (it shouldn't retry with itself)
664        self.in_flight_tracker
665            .remove_alternative_peer(&key, &peer)
666            .await;
667
668        // Execute request with staggered parallel - first peer starts immediately,
669        // alternatives are tried after stagger delays (even if first peer is slow but not failing)
670        tracing::trace!(?key, ?peer, "executing staggered parallel request");
671        let result = self
672            .try_staggered_parallel(&key, peer, &operation, self.retry_delay)
673            .await;
674
675        let result_for_broadcast: Result<RequestResult, NodeError> = result.clone().map(Into::into);
676        let shared_result = Arc::new(result_for_broadcast);
677
678        // Broadcast result and clean up
679        self.in_flight_tracker
680            .complete_and_broadcast(&key, shared_result.clone())
681            .await;
682
683        if let Ok(success) = shared_result.as_ref() {
684            self.cache
685                .store(key.clone(), Arc::new(success.clone()))
686                .await;
687        }
688        result
689    }
690
691    /// Tries alternative peers in staggered parallel fashion.
692    ///
693    /// Launches requests starting with the first peer, then dynamically pops alternative peers
694    /// with a stagger delay between each. Returns the first successful result. This provides
695    /// a balance between sequential (slow) and fully parallel (wasteful) approaches.
696    ///
697    /// # Arguments
698    /// - `key`: The request key (for logging and popping alternatives)
699    /// - `first_peer`: The initial peer to try first (at time 0)
700    /// - `operation`: The operation to execute on each peer
701    /// - `staggered_delay_ms`: Delay in milliseconds between starting each subsequent peer
702    ///
703    /// # Returns
704    /// The first successful result, or the last error if all fail
705    async fn try_staggered_parallel<T, F, Fut>(
706        &self,
707        key: &RequestKey,
708        first_peer: RemoteNode<Env::ValidatorNode>,
709        operation: &F,
710        staggered_delay: Duration,
711    ) -> Result<T, NodeError>
712    where
713        T: 'static,
714        F: Fn(RemoteNode<Env::ValidatorNode>) -> Fut,
715        Fut: Future<Output = Result<T, NodeError>> + 'static,
716    {
717        use futures::{
718            future::{select, Either},
719            stream::{FuturesUnordered, StreamExt},
720        };
721        use linera_base::time::timer::sleep;
722
723        let mut futures: FuturesUnordered<Fut> = FuturesUnordered::new();
724        let peer_index = AtomicU32::new(0);
725
726        let push_future = |futures: &mut FuturesUnordered<Fut>, fut: Fut| {
727            futures.push(fut);
728            peer_index.fetch_add(1, Ordering::SeqCst)
729        };
730
731        // Start the first peer immediately (no delay)
732        push_future(&mut futures, operation(first_peer));
733
734        let mut last_error = NodeError::UnexpectedMessage;
735        let mut next_delay = Box::pin(sleep(staggered_delay * peer_index.load(Ordering::SeqCst)));
736
737        // Phase 1: Race between futures completion and delays (while alternatives might exist)
738        loop {
739            // Exit condition: no futures running and can't start any more
740            if futures.is_empty() {
741                if let Some(peer) = self.in_flight_tracker.pop_alternative_peer(key).await {
742                    push_future(&mut futures, operation(peer));
743                    next_delay =
744                        Box::pin(sleep(staggered_delay * peer_index.load(Ordering::SeqCst)));
745                } else {
746                    // No futures and no alternatives - we're done
747                    break;
748                }
749            }
750
751            let next_result = Box::pin(futures.next());
752
753            match select(next_result, next_delay).await {
754                // A request completed
755                Either::Left((Some(result), delay_fut)) => {
756                    // Keep the delay future for next iteration
757                    next_delay = delay_fut;
758
759                    match result {
760                        Ok(value) => {
761                            tracing::trace!(?key, "staggered parallel request succeeded");
762                            return Ok(value);
763                        }
764                        Err(error) => {
765                            tracing::debug!(
766                                ?key,
767                                %error,
768                                "staggered parallel request attempt failed"
769                            );
770                            last_error = error;
771
772                            // Immediately try next alternative
773                            if let Some(peer) =
774                                self.in_flight_tracker.pop_alternative_peer(key).await
775                            {
776                                push_future(&mut futures, operation(peer));
777                                next_delay = Box::pin(sleep(
778                                    staggered_delay * peer_index.load(Ordering::SeqCst),
779                                ));
780                            }
781                        }
782                    }
783                }
784                // All running futures completed
785                Either::Left((None, delay_fut)) => {
786                    // Restore the delay future
787                    next_delay = delay_fut;
788                    // Will check at top of loop if we should try more alternatives
789                    continue;
790                }
791                // Delay elapsed - try to start next peer
792                Either::Right((_, _)) => {
793                    if let Some(peer) = self.in_flight_tracker.pop_alternative_peer(key).await {
794                        push_future(&mut futures, operation(peer));
795                        next_delay =
796                            Box::pin(sleep(staggered_delay * peer_index.load(Ordering::SeqCst)));
797                    } else {
798                        // No more alternatives - break out to phase 2
799                        break;
800                    }
801                }
802            }
803        }
804
805        // Phase 2: No more alternatives, just wait for remaining futures to complete
806        while let Some(result) = futures.next().await {
807            match result {
808                Ok(value) => {
809                    tracing::trace!(?key, "staggered parallel request succeeded");
810                    return Ok(value);
811                }
812                Err(error) => {
813                    tracing::debug!(
814                        ?key,
815                        %error,
816                        "staggered parallel request attempt failed"
817                    );
818                    last_error = error;
819                }
820            }
821        }
822
823        // All attempts failed
824        tracing::debug!(?key, "all staggered parallel retry attempts failed");
825        Err(last_error)
826    }
827
828    /// Returns all peers ordered by their score (highest first).
829    ///
830    /// Only includes peers that can currently accept requests. Each peer is paired
831    /// with its calculated score based on latency, success rate, and availability.
832    ///
833    /// # Returns
834    /// A vector of `(score, peer)` tuples sorted by score in descending order.
835    /// Returns an empty vector if no peers can accept requests.
836    async fn peers_by_score(&self) -> Vec<(f64, RemoteNode<Env::ValidatorNode>)> {
837        let nodes = self.nodes.read().await;
838
839        // Filter nodes that can accept requests and calculate their scores
840        let mut scored_nodes = Vec::new();
841        for info in nodes.values() {
842            let score = info.calculate_score().await;
843            scored_nodes.push((score, info.node.clone()));
844        }
845
846        // Sort by score (highest first)
847        scored_nodes.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
848
849        scored_nodes
850    }
851
852    /// Selects the best available peer using weighted random selection from top performers.
853    ///
854    /// This method:
855    /// 1. Filters nodes that have available request capacity
856    /// 2. Sorts them by performance score
857    /// 3. Performs weighted random selection from the top 3 performers
858    ///
859    /// This approach balances between choosing high-performing nodes and distributing
860    /// load across multiple validators to avoid creating hotspots.
861    ///
862    /// Returns `None` if no nodes are available or all are at capacity.
863    async fn select_best_peer(&self) -> Option<RemoteNode<Env::ValidatorNode>> {
864        let scored_nodes = self.peers_by_score().await;
865
866        if scored_nodes.is_empty() {
867            return None;
868        }
869
870        // Use weighted random selection from top performers (top 3 or all if less)
871        let top_count = scored_nodes.len().min(3);
872        let top_nodes = &scored_nodes[..top_count];
873
874        // Create weights based on normalized scores
875        // Add small epsilon to prevent zero weights
876        let weights: Vec<f64> = top_nodes.iter().map(|(score, _)| score.max(0.01)).collect();
877
878        if let Ok(dist) = WeightedIndex::new(&weights) {
879            let mut rng = rand::thread_rng();
880            let index = dist.sample(&mut rng);
881            Some(top_nodes[index].1.clone())
882        } else {
883            // Fallback to the best node if weights are invalid
884            tracing::warn!("failed to create weighted distribution, defaulting to best node");
885            Some(scored_nodes[0].1.clone())
886        }
887    }
888
889    /// Adds a new peer to the manager if it doesn't already exist.
890    async fn add_peer(&self, node: RemoteNode<Env::ValidatorNode>) {
891        let mut nodes = self.nodes.write().await;
892        let public_key = node.public_key;
893        nodes.entry(public_key).or_insert_with(|| {
894            NodeInfo::with_config(node, self.weights, self.alpha, self.max_expected_latency)
895        });
896    }
897}
898
899#[cfg(test)]
900mod tests {
901    use std::sync::{
902        atomic::{AtomicUsize, Ordering},
903        Arc,
904    };
905
906    use linera_base::{
907        crypto::{CryptoHash, InMemorySigner},
908        data_types::BlockHeight,
909        identifiers::ChainId,
910        time::Duration,
911    };
912    use linera_chain::types::ConfirmedBlockCertificate;
913    use tokio::sync::oneshot;
914
915    use super::{super::request::RequestKey, *};
916    use crate::{
917        client::requests_scheduler::{MAX_REQUEST_TTL_MS, STAGGERED_DELAY_MS},
918        node::NodeError,
919    };
920
921    type TestEnvironment = crate::environment::Test;
922
923    /// Helper function to create a test RequestsScheduler with custom configuration
924    fn create_test_manager(
925        in_flight_timeout: Duration,
926        cache_ttl: Duration,
927    ) -> Arc<RequestsScheduler<TestEnvironment>> {
928        let mut manager = RequestsScheduler::with_config(
929            vec![], // No actual nodes needed for these tests
930            ScoringWeights::default(),
931            0.1,
932            1000.0,
933            cache_ttl,
934            100,
935            in_flight_timeout,
936            Duration::from_millis(STAGGERED_DELAY_MS),
937        );
938        // Replace the tracker with one using the custom timeout
939        manager.in_flight_tracker = InFlightTracker::new(in_flight_timeout);
940        Arc::new(manager)
941    }
942
943    /// Helper function to create a test result
944    fn test_result_ok() -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
945        Ok(vec![])
946    }
947
948    /// Helper function to create a test request key
949    fn test_key() -> RequestKey {
950        RequestKey::Certificates {
951            chain_id: ChainId(CryptoHash::test_hash("test")),
952            heights: vec![BlockHeight(0), BlockHeight(1)],
953        }
954    }
955
956    /// Helper function to create a dummy peer for testing
957    fn dummy_peer() -> RemoteNode<<TestEnvironment as Environment>::ValidatorNode> {
958        use crate::test_utils::{MemoryStorageBuilder, TestBuilder};
959
960        // Create a minimal test builder to get a validator node
961        let mut builder = futures::executor::block_on(async {
962            TestBuilder::new(
963                MemoryStorageBuilder::default(),
964                1,
965                0,
966                linera_base::crypto::InMemorySigner::new(None),
967            )
968            .await
969            .unwrap()
970        });
971
972        let node = builder.node(0);
973        let public_key = node.name();
974        RemoteNode { public_key, node }
975    }
976
977    #[tokio::test]
978    async fn test_cache_hit_returns_cached_result() {
979        // Create a manager with standard cache TTL
980        let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60));
981        let key = test_key();
982        let peer = dummy_peer();
983
984        // Track how many times the operation is executed
985        let execution_count = Arc::new(AtomicUsize::new(0));
986        let execution_count_clone = execution_count.clone();
987
988        // First call - should execute the operation and cache the result
989        let result1: Result<Vec<ConfirmedBlockCertificate>, NodeError> = manager
990            .deduplicated_request(key.clone(), peer.clone(), |_| {
991                let count = execution_count_clone.clone();
992                async move {
993                    count.fetch_add(1, Ordering::SeqCst);
994                    test_result_ok()
995                }
996            })
997            .await;
998
999        assert!(result1.is_ok());
1000        assert_eq!(execution_count.load(Ordering::SeqCst), 1);
1001
1002        // Second call - should return cached result without executing the operation
1003        let execution_count_clone2 = execution_count.clone();
1004        let result2: Result<Vec<ConfirmedBlockCertificate>, NodeError> = manager
1005            .deduplicated_request(key.clone(), peer.clone(), |_| {
1006                let count = execution_count_clone2.clone();
1007                async move {
1008                    count.fetch_add(1, Ordering::SeqCst);
1009                    test_result_ok()
1010                }
1011            })
1012            .await;
1013
1014        assert_eq!(result1, result2);
1015        // Operation should still only have been executed once (cache hit)
1016        assert_eq!(execution_count.load(Ordering::SeqCst), 1);
1017    }
1018
1019    #[tokio::test]
1020    async fn test_in_flight_request_deduplication() {
1021        let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60));
1022        let key = test_key();
1023        let peer = dummy_peer();
1024
1025        // Track how many times the operation is executed
1026        let execution_count = Arc::new(AtomicUsize::new(0));
1027
1028        // Create a channel to control when the first operation completes
1029        let (tx, rx) = oneshot::channel();
1030        let rx = Arc::new(tokio::sync::Mutex::new(Some(rx)));
1031
1032        // Start first request (will be slow - waits for signal)
1033        let manager_clone = Arc::clone(&manager);
1034        let key_clone = key.clone();
1035        let execution_count_clone = execution_count.clone();
1036        let rx_clone = Arc::clone(&rx);
1037        let peer_clone = peer.clone();
1038        let first_request = tokio::spawn(async move {
1039            manager_clone
1040                .deduplicated_request(key_clone, peer_clone, |_| {
1041                    let count = execution_count_clone.clone();
1042                    let rx = Arc::clone(&rx_clone);
1043                    async move {
1044                        count.fetch_add(1, Ordering::SeqCst);
1045                        // Wait for signal before completing
1046                        if let Some(receiver) = rx.lock().await.take() {
1047                            receiver.await.unwrap();
1048                        }
1049                        test_result_ok()
1050                    }
1051                })
1052                .await
1053        });
1054
1055        // Start second request - should deduplicate and wait for the first
1056        let execution_count_clone2 = execution_count.clone();
1057        let second_request = tokio::spawn(async move {
1058            manager
1059                .deduplicated_request(key, peer, |_| {
1060                    let count = execution_count_clone2.clone();
1061                    async move {
1062                        count.fetch_add(1, Ordering::SeqCst);
1063                        test_result_ok()
1064                    }
1065                })
1066                .await
1067        });
1068
1069        // Signal the first request to complete
1070        tx.send(()).unwrap();
1071
1072        // Both requests should complete successfully
1073        let result1: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1074            first_request.await.unwrap();
1075        let result2: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1076            second_request.await.unwrap();
1077
1078        assert!(result1.is_ok());
1079        assert_eq!(result1, result2);
1080
1081        // Operation should only have been executed once (deduplication worked)
1082        assert_eq!(execution_count.load(Ordering::SeqCst), 1);
1083    }
1084
1085    #[tokio::test]
1086    async fn test_multiple_subscribers_all_notified() {
1087        let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60));
1088        let key = test_key();
1089        let peer = dummy_peer();
1090
1091        // Track how many times the operation is executed
1092        let execution_count = Arc::new(AtomicUsize::new(0));
1093
1094        // Create a channel to control when the operation completes
1095        let (tx, rx) = oneshot::channel();
1096        let rx = Arc::new(tokio::sync::Mutex::new(Some(rx)));
1097
1098        // Start first request (will be slow - waits for signal)
1099        let manager_clone1 = Arc::clone(&manager);
1100        let key_clone1 = key.clone();
1101        let execution_count_clone = execution_count.clone();
1102        let rx_clone = Arc::clone(&rx);
1103        let peer_clone = peer.clone();
1104        let first_request = tokio::spawn(async move {
1105            manager_clone1
1106                .deduplicated_request(key_clone1, peer_clone, |_| {
1107                    let count = execution_count_clone.clone();
1108                    let rx = Arc::clone(&rx_clone);
1109                    async move {
1110                        count.fetch_add(1, Ordering::SeqCst);
1111                        if let Some(receiver) = rx.lock().await.take() {
1112                            receiver.await.unwrap();
1113                        }
1114                        test_result_ok()
1115                    }
1116                })
1117                .await
1118        });
1119
1120        // Start multiple additional requests - all should deduplicate
1121        let mut handles = vec![];
1122        for _ in 0..5 {
1123            let manager_clone = Arc::clone(&manager);
1124            let key_clone = key.clone();
1125            let execution_count_clone = execution_count.clone();
1126            let peer_clone = peer.clone();
1127            let handle = tokio::spawn(async move {
1128                manager_clone
1129                    .deduplicated_request(key_clone, peer_clone, |_| {
1130                        let count = execution_count_clone.clone();
1131                        async move {
1132                            count.fetch_add(1, Ordering::SeqCst);
1133                            test_result_ok()
1134                        }
1135                    })
1136                    .await
1137            });
1138            handles.push(handle);
1139        }
1140
1141        // Signal the first request to complete
1142        tx.send(()).unwrap();
1143
1144        // First request should complete successfully
1145        let result: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1146            first_request.await.unwrap();
1147        assert!(result.is_ok());
1148
1149        // All subscriber requests should also complete successfully
1150        for handle in handles {
1151            assert_eq!(handle.await.unwrap(), result);
1152        }
1153
1154        // Operation should only have been executed once (all requests were deduplicated)
1155        assert_eq!(execution_count.load(Ordering::SeqCst), 1);
1156    }
1157
1158    #[tokio::test]
1159    async fn test_timeout_triggers_new_request() {
1160        // Create a manager with a very short in-flight timeout
1161        let manager = create_test_manager(Duration::from_millis(50), Duration::from_secs(60));
1162
1163        let key = test_key();
1164        let peer = dummy_peer();
1165
1166        // Track how many times the operation is executed
1167        let execution_count = Arc::new(AtomicUsize::new(0));
1168
1169        // Create a channel to control when the first operation completes
1170        let (tx, rx) = oneshot::channel();
1171        let rx = Arc::new(tokio::sync::Mutex::new(Some(rx)));
1172
1173        // Start first request (will be slow - waits for signal)
1174        let manager_clone = Arc::clone(&manager);
1175        let key_clone = key.clone();
1176        let execution_count_clone = execution_count.clone();
1177        let rx_clone = Arc::clone(&rx);
1178        let peer_clone = peer.clone();
1179        let first_request = tokio::spawn(async move {
1180            manager_clone
1181                .deduplicated_request(key_clone, peer_clone, |_| {
1182                    let count = execution_count_clone.clone();
1183                    let rx = Arc::clone(&rx_clone);
1184                    async move {
1185                        count.fetch_add(1, Ordering::SeqCst);
1186                        if let Some(receiver) = rx.lock().await.take() {
1187                            receiver.await.unwrap();
1188                        }
1189                        test_result_ok()
1190                    }
1191                })
1192                .await
1193        });
1194
1195        // Wait for the timeout to elapse
1196        tokio::time::sleep(Duration::from_millis(MAX_REQUEST_TTL_MS + 1)).await;
1197
1198        // Start second request - should NOT deduplicate because first request exceeded timeout
1199        let execution_count_clone2 = execution_count.clone();
1200        let second_request = tokio::spawn(async move {
1201            manager
1202                .deduplicated_request(key, peer, |_| {
1203                    let count = execution_count_clone2.clone();
1204                    async move {
1205                        count.fetch_add(1, Ordering::SeqCst);
1206                        test_result_ok()
1207                    }
1208                })
1209                .await
1210        });
1211
1212        // Wait for second request to complete
1213        let result2: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1214            second_request.await.unwrap();
1215        assert!(result2.is_ok());
1216
1217        // Complete the first request
1218        tx.send(()).unwrap();
1219        let result1: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1220            first_request.await.unwrap();
1221        assert!(result1.is_ok());
1222
1223        // Operation should have been executed twice (timeout triggered new request)
1224        assert_eq!(execution_count.load(Ordering::SeqCst), 2);
1225    }
1226
1227    #[tokio::test]
1228    async fn test_alternative_peers_registered_on_deduplication() {
1229        use linera_base::identifiers::BlobType;
1230
1231        use crate::test_utils::{MemoryStorageBuilder, TestBuilder};
1232
1233        // Create a test environment with three validators
1234        let mut builder = TestBuilder::new(
1235            MemoryStorageBuilder::default(),
1236            3,
1237            0,
1238            InMemorySigner::new(None),
1239        )
1240        .await
1241        .unwrap();
1242
1243        // Get validator nodes
1244        let nodes: Vec<_> = (0..3)
1245            .map(|i| {
1246                let node = builder.node(i);
1247                let public_key = node.name();
1248                RemoteNode { public_key, node }
1249            })
1250            .collect();
1251
1252        // Create a RequestsScheduler
1253        let manager: Arc<RequestsScheduler<TestEnvironment>> =
1254            Arc::new(RequestsScheduler::with_config(
1255                nodes.clone(),
1256                ScoringWeights::default(),
1257                0.1,
1258                1000.0,
1259                Duration::from_secs(60),
1260                100,
1261                Duration::from_millis(MAX_REQUEST_TTL_MS),
1262                Duration::from_millis(STAGGERED_DELAY_MS),
1263            ));
1264
1265        let key = RequestKey::Blob(BlobId::new(
1266            CryptoHash::test_hash("test_blob"),
1267            BlobType::Data,
1268        ));
1269
1270        // Create a channel to control when first request completes
1271        let (tx, rx) = oneshot::channel();
1272        let rx = Arc::new(tokio::sync::Mutex::new(Some(rx)));
1273
1274        // Start first request with node 0 (will block until signaled)
1275        let manager_clone = Arc::clone(&manager);
1276        let node_clone = nodes[0].clone();
1277        let key_clone = key.clone();
1278        let rx_clone = Arc::clone(&rx);
1279        let first_request = tokio::spawn(async move {
1280            manager_clone
1281                .with_peer(key_clone, node_clone, move |_peer| {
1282                    let rx = Arc::clone(&rx_clone);
1283                    async move {
1284                        // Wait for signal
1285                        if let Some(receiver) = rx.lock().await.take() {
1286                            receiver.await.unwrap();
1287                        }
1288                        Ok(None) // Return Option<Blob>
1289                    }
1290                })
1291                .await
1292        });
1293
1294        // Give first request time to start and become in-flight
1295        tokio::time::sleep(Duration::from_millis(100)).await;
1296
1297        // Start second and third requests with different nodes
1298        // These should register as alternatives and wait for the first request
1299        let handles: Vec<_> = vec![nodes[1].clone(), nodes[2].clone()]
1300            .into_iter()
1301            .map(|node| {
1302                let manager_clone = Arc::clone(&manager);
1303                let key_clone = key.clone();
1304                tokio::spawn(async move {
1305                    manager_clone
1306                        .with_peer(key_clone, node, |_peer| async move {
1307                            Ok(None) // Return Option<Blob>
1308                        })
1309                        .await
1310                })
1311            })
1312            .collect();
1313
1314        // Give time for alternative peers to register
1315        tokio::time::sleep(Duration::from_millis(100)).await;
1316
1317        // Alternatives are being popped as staggered parallel runs.
1318        // The first request is blocked waiting for the signal, so staggered parallel has started
1319        // and may have already popped one or both alternatives. We just verify that at least
1320        // one alternative was registered (before being popped).
1321        // This test primarily validates that alternatives can be registered during deduplication.
1322
1323        // Signal first request to complete
1324        tx.send(()).unwrap();
1325
1326        // Wait for all requests to complete
1327        let _result1 = first_request.await.unwrap();
1328        for handle in handles {
1329            let _ = handle.await.unwrap();
1330        }
1331
1332        // After completion, the in-flight entry should be removed
1333        tokio::time::sleep(Duration::from_millis(50)).await;
1334        let alt_peers = manager.get_alternative_peers(&key).await;
1335        assert!(
1336            alt_peers.is_none(),
1337            "Expected in-flight entry to be removed after completion"
1338        );
1339    }
1340
1341    #[tokio::test]
1342    async fn test_staggered_parallel_retry_on_failure() {
1343        use std::sync::atomic::{AtomicU64, Ordering};
1344
1345        use crate::test_utils::{MemoryStorageBuilder, TestBuilder};
1346
1347        // Create a test environment with four validators
1348        let mut builder = TestBuilder::new(
1349            MemoryStorageBuilder::default(),
1350            4,
1351            0,
1352            InMemorySigner::new(None),
1353        )
1354        .await
1355        .unwrap();
1356
1357        // Get validator nodes
1358        let nodes: Vec<_> = (0..4)
1359            .map(|i| {
1360                let node = builder.node(i);
1361                let public_key = node.name();
1362                RemoteNode { public_key, node }
1363            })
1364            .collect();
1365
1366        let staggered_delay = Duration::from_millis(10);
1367
1368        // Store public keys for comparison
1369        let node0_key = nodes[0].public_key;
1370        let node2_key = nodes[2].public_key;
1371
1372        // Create a RequestsScheduler
1373        let manager: Arc<RequestsScheduler<TestEnvironment>> =
1374            Arc::new(RequestsScheduler::with_config(
1375                nodes.clone(),
1376                ScoringWeights::default(),
1377                0.1,
1378                1000.0,
1379                Duration::from_secs(60),
1380                100,
1381                Duration::from_millis(MAX_REQUEST_TTL_MS),
1382                staggered_delay,
1383            ));
1384
1385        let key = test_key();
1386
1387        // Track when each peer is called
1388        let call_times = Arc::new(tokio::sync::Mutex::new(Vec::new()));
1389        let start_time = Instant::now();
1390
1391        // Track call count per peer
1392        let call_count = Arc::new(AtomicU64::new(0));
1393
1394        let call_times_clone = Arc::clone(&call_times);
1395        let call_count_clone = Arc::clone(&call_count);
1396
1397        // Test the staggered parallel retry logic directly
1398        let operation = |peer: RemoteNode<<TestEnvironment as Environment>::ValidatorNode>| {
1399            let times = Arc::clone(&call_times_clone);
1400            let count = Arc::clone(&call_count_clone);
1401            let start = start_time;
1402            async move {
1403                let elapsed = Instant::now().duration_since(start);
1404                times.lock().await.push((peer.public_key, elapsed));
1405                count.fetch_add(1, Ordering::SeqCst);
1406
1407                if peer.public_key == node0_key {
1408                    // Node 0 fails quickly
1409                    Err(NodeError::UnexpectedMessage)
1410                } else if peer.public_key == node2_key {
1411                    // Node 2 succeeds after a delay
1412                    tokio::time::sleep(staggered_delay / 2).await;
1413                    Ok(vec![])
1414                } else {
1415                    // Other nodes take longer or fail
1416                    tokio::time::sleep(staggered_delay * 2).await;
1417                    Err(NodeError::UnexpectedMessage)
1418                }
1419            }
1420        };
1421
1422        // Setup: Insert in-flight entry and register alternative peers
1423        manager.in_flight_tracker.insert_new(key.clone()).await;
1424        // Register nodes 3, 2, 1 as alternatives (will be popped in reverse: 1, 2, 3)
1425        for node in nodes.iter().skip(1).rev() {
1426            manager
1427                .in_flight_tracker
1428                .add_alternative_peer(&key, node.clone())
1429                .await;
1430        }
1431
1432        // Use node 0 as first peer, alternatives will be popped: node 1, then 2, then 3
1433        let result: Result<Vec<ConfirmedBlockCertificate>, NodeError> = manager
1434            .try_staggered_parallel(&key, nodes[0].clone(), &operation, staggered_delay)
1435            .await;
1436
1437        // Should succeed with result from node 2
1438        assert!(
1439            result.is_ok(),
1440            "Expected request to succeed with alternative peer"
1441        );
1442
1443        // Verify timing: calls should be staggered, not sequential
1444        let times = call_times.lock().await;
1445        // Can't test exactly 2 b/c we sleep _inside_ the operation and increase right at the start of it.
1446        assert!(
1447            times.len() >= 2,
1448            "Should have tried at least 2 peers, got {}",
1449            times.len()
1450        );
1451
1452        // First call should be at ~0ms
1453        assert!(
1454            times[0].1.as_millis() < 10,
1455            "First peer should be called immediately, was called at {}ms",
1456            times[0].1.as_millis()
1457        );
1458
1459        // Second call should start immediately after first fails (aggressive retry)
1460        // When node 0 fails immediately, we immediately start node 1
1461        if times.len() > 1 {
1462            let delay = times[1].1.as_millis();
1463            assert!(
1464                delay < 10,
1465                "Second peer should be called immediately on first failure, got {}ms",
1466                delay
1467            );
1468        }
1469
1470        // Total time should be significantly less than sequential.
1471        // With aggressive retry: node0 fails immediately (~0ms), node1 starts immediately,
1472        // node1 takes 20ms (and might fail or timeout), node2 starts at ~10ms (scheduled delay)
1473        // and succeeds at ~15ms total (10ms + 5ms internal delay)
1474        let total_time = Instant::now().duration_since(start_time).as_millis();
1475        assert!(
1476            total_time < 50,
1477            "Total time should be less than 50ms, got {}ms",
1478            total_time
1479        );
1480    }
1481}