linera_core/client/requests_scheduler/
scheduler.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::BTreeMap,
6    future::Future,
7    sync::{
8        atomic::{AtomicU32, Ordering},
9        Arc,
10    },
11};
12
13use custom_debug_derive::Debug;
14use futures::stream::{FuturesUnordered, StreamExt};
15use linera_base::{
16    crypto::ValidatorPublicKey,
17    data_types::{Blob, BlobContent, BlockHeight},
18    identifiers::{BlobId, ChainId},
19    time::{Duration, Instant},
20};
21use linera_chain::types::ConfirmedBlockCertificate;
22use rand::distributions::{Distribution, WeightedIndex};
23use tracing::instrument;
24
25use super::{
26    cache::{RequestsCache, SubsumingKey},
27    in_flight_tracker::{InFlightMatch, InFlightTracker},
28    node_info::NodeInfo,
29    request::{RequestKey, RequestResult},
30    scoring::ScoringWeights,
31};
32use crate::{
33    client::{
34        communicate_concurrently,
35        requests_scheduler::{in_flight_tracker::Subscribed, request::Cacheable},
36        RequestsSchedulerConfig,
37    },
38    environment::Environment,
39    node::{NodeError, ValidatorNode},
40    remote_node::RemoteNode,
41};
42
43#[cfg(with_metrics)]
44pub(super) mod metrics {
45    use std::sync::LazyLock;
46
47    use linera_base::prometheus_util::{
48        exponential_bucket_latencies, register_histogram_vec, register_int_counter,
49        register_int_counter_vec,
50    };
51    use prometheus::{HistogramVec, IntCounter, IntCounterVec};
52
53    /// Histogram of response times per validator (in milliseconds)
54    pub(super) static VALIDATOR_RESPONSE_TIME: LazyLock<HistogramVec> = LazyLock::new(|| {
55        register_histogram_vec(
56            "requests_scheduler_response_time_ms",
57            "Response time for requests to validators in milliseconds",
58            &["validator"],
59            exponential_bucket_latencies(10000.0), // up to 10 seconds
60        )
61    });
62
63    /// Counter of total requests made to each validator
64    pub(super) static VALIDATOR_REQUEST_TOTAL: LazyLock<IntCounterVec> = LazyLock::new(|| {
65        register_int_counter_vec(
66            "requests_scheduler_request_total",
67            "Total number of requests made to each validator",
68            &["validator"],
69        )
70    });
71
72    /// Counter of successful requests per validator
73    pub(super) static VALIDATOR_REQUEST_SUCCESS: LazyLock<IntCounterVec> = LazyLock::new(|| {
74        register_int_counter_vec(
75            "requests_scheduler_request_success",
76            "Number of successful requests to each validator",
77            &["validator"],
78        )
79    });
80
81    /// Counter for requests that were resolved from the response cache.
82    pub(super) static REQUEST_CACHE_DEDUPLICATION: LazyLock<IntCounter> = LazyLock::new(|| {
83        register_int_counter(
84            "requests_scheduler_request_deduplication_total",
85            "Number of requests that were deduplicated by finding the result in the cache.",
86        )
87    });
88
89    /// Counter for requests that were served from cache
90    pub static REQUEST_CACHE_HIT: LazyLock<IntCounter> = LazyLock::new(|| {
91        register_int_counter(
92            "requests_scheduler_request_cache_hit_total",
93            "Number of requests that were served from cache",
94        )
95    });
96}
97
98/// Manages a pool of validator nodes with intelligent load balancing and performance tracking.
99///
100/// The `RequestsScheduler` maintains performance metrics for each validator node using
101/// Exponential Moving Averages (EMA) and uses these metrics to make intelligent routing
102/// decisions. It prevents node overload through request capacity limits and automatically
103/// retries failed requests on alternative nodes.
104///
105/// # Examples
106///
107/// ```ignore
108/// // Create with default configuration (balanced scoring)
109/// let manager = RequestsScheduler::new(validator_nodes);
110///
111/// // Create with custom configuration prioritizing low latency
112/// let latency_weights = ScoringWeights {
113///     latency: 0.6,
114///     success: 0.3,
115///     load: 0.1,
116/// };
117/// let manager = RequestsScheduler::with_config(
118///     validator_nodes,
119///     latency_weights,               // custom scoring weights
120///     0.2,                           // higher alpha for faster adaptation
121///     3000.0,                        // max expected latency (3 seconds)
122///     Duration::from_secs(60),       // 60 second cache TTL
123///     200,                           // cache up to 200 entries
124///     Duration::from_millis(200),    // max request TTL
125///     Duration::from_millis(150),    // retry delay
126/// );
127/// ```
128#[derive(Debug, Clone)]
129pub struct RequestsScheduler<Env: Environment> {
130    /// Thread-safe map of validator nodes indexed by their public keys.
131    /// Each node is wrapped with EMA-based performance tracking information.
132    nodes: Arc<tokio::sync::RwLock<BTreeMap<ValidatorPublicKey, NodeInfo<Env>>>>,
133    /// Default scoring weights applied to new nodes.
134    weights: ScoringWeights,
135    /// Default EMA smoothing factor for new nodes.
136    alpha: f64,
137    /// Default maximum expected latency in milliseconds for score normalization.
138    max_expected_latency: f64,
139    /// Delay between starting requests to alternative peers.
140    retry_delay: Duration,
141    /// Tracks in-flight requests to deduplicate concurrent requests for the same data.
142    in_flight_tracker: InFlightTracker<RemoteNode<Env::ValidatorNode>>,
143    /// Cache of recently completed requests with their results and timestamps.
144    cache: RequestsCache<RequestKey, RequestResult>,
145}
146
147impl<Env: Environment> RequestsScheduler<Env> {
148    /// Creates a new `RequestsScheduler` with the provided configuration.
149    pub fn new(
150        nodes: impl IntoIterator<Item = RemoteNode<Env::ValidatorNode>>,
151        config: &RequestsSchedulerConfig,
152    ) -> Self {
153        Self::with_config(
154            nodes,
155            ScoringWeights::default(),
156            config.alpha,
157            config.max_accepted_latency_ms,
158            Duration::from_millis(config.cache_ttl_ms),
159            config.cache_max_size,
160            Duration::from_millis(config.max_request_ttl_ms),
161            Duration::from_millis(config.retry_delay_ms),
162        )
163    }
164
165    /// Creates a new `RequestsScheduler` with custom configuration.
166    ///
167    /// # Arguments
168    /// - `nodes`: Initial set of validator nodes
169    /// - `max_requests_per_node`: Maximum concurrent requests per node
170    /// - `weights`: Scoring weights for performance metrics
171    /// - `alpha`: EMA smoothing factor (0 < alpha < 1)
172    /// - `max_expected_latency_ms`: Maximum expected latency for score normalization
173    /// - `cache_ttl`: Time-to-live for cached responses
174    /// - `max_cache_size`: Maximum number of entries in the cache
175    /// - `max_request_ttl`: Maximum latency for an in-flight request before we stop deduplicating it
176    /// - `retry_delay_ms`: Delay in milliseconds between starting requests to different peers.
177    #[expect(clippy::too_many_arguments)]
178    pub fn with_config(
179        nodes: impl IntoIterator<Item = RemoteNode<Env::ValidatorNode>>,
180        weights: ScoringWeights,
181        alpha: f64,
182        max_expected_latency_ms: f64,
183        cache_ttl: Duration,
184        max_cache_size: usize,
185        max_request_ttl: Duration,
186        retry_delay: Duration,
187    ) -> Self {
188        assert!(alpha > 0.0 && alpha < 1.0, "Alpha must be in (0, 1) range");
189        Self {
190            nodes: Arc::new(tokio::sync::RwLock::new(
191                nodes
192                    .into_iter()
193                    .map(|node| {
194                        (
195                            node.public_key,
196                            NodeInfo::with_config(node, weights, alpha, max_expected_latency_ms),
197                        )
198                    })
199                    .collect(),
200            )),
201            weights,
202            alpha,
203            max_expected_latency: max_expected_latency_ms,
204            retry_delay,
205            in_flight_tracker: InFlightTracker::new(max_request_ttl),
206            cache: RequestsCache::new(cache_ttl, max_cache_size),
207        }
208    }
209
210    /// Executes an operation with an automatically selected peer, handling deduplication,
211    /// tracking, and peer selection.
212    ///
213    /// This method provides a high-level API for executing operations against remote nodes
214    /// while leveraging the [`RequestsScheduler`]'s intelligent peer selection, performance tracking,
215    /// and request deduplication capabilities.
216    ///
217    /// # Type Parameters
218    /// - `R`: The inner result type (what the operation returns on success)
219    /// - `F`: The async closure type that takes a `RemoteNode` and returns a future
220    /// - `Fut`: The future type returned by the closure
221    ///
222    /// # Arguments
223    /// - `key`: Unique identifier for request deduplication
224    /// - `operation`: Async closure that takes a selected peer and performs the operation
225    ///
226    /// # Returns
227    /// The result from the operation, potentially from cache or a deduplicated in-flight request
228    ///
229    /// # Example
230    /// ```ignore
231    /// let result: Result<Vec<ConfirmedBlockCertificate>, NodeError> = requests_scheduler
232    ///     .with_best(
233    ///         RequestKey::Certificates { chain_id, start, limit },
234    ///         |peer| async move {
235    ///             peer.download_certificates_from(chain_id, start, limit).await
236    ///         }
237    ///     )
238    ///     .await;
239    /// ```
240    #[allow(unused)]
241    async fn with_best<R, F, Fut>(&self, key: RequestKey, operation: F) -> Result<R, NodeError>
242    where
243        R: Cacheable + Clone + Send + 'static,
244        F: Fn(RemoteNode<Env::ValidatorNode>) -> Fut,
245        Fut: Future<Output = Result<R, NodeError>> + 'static,
246    {
247        // Select the best available peer
248        let peer = self
249            .select_best_peer()
250            .await
251            .ok_or_else(|| NodeError::WorkerError {
252                error: "No validators available".to_string(),
253            })?;
254        self.with_peer(key, peer, operation).await
255    }
256
257    /// Executes an operation with a specific peer.
258    ///
259    /// Similar to [`with_best`](Self::with_best), but uses the provided peer directly
260    /// instead of selecting the best available peer. This is useful when you need to
261    /// query a specific validator node.
262    ///
263    /// # Type Parameters
264    /// - `R`: The inner result type (what the operation returns on success)
265    /// - `F`: The async closure type that takes a `RemoteNode` and returns a future
266    /// - `Fut`: The future type returned by the closure
267    ///
268    /// # Arguments
269    /// - `key`: Unique identifier for request deduplication
270    /// - `peer`: The specific peer to use for the operation
271    /// - `operation`: Async closure that takes the peer and performs the operation
272    ///
273    /// # Returns
274    /// The result from the operation, potentially from cache or a deduplicated in-flight request
275    async fn with_peer<R, F, Fut>(
276        &self,
277        key: RequestKey,
278        peer: RemoteNode<Env::ValidatorNode>,
279        operation: F,
280    ) -> Result<R, NodeError>
281    where
282        R: Cacheable + Clone + Send + 'static,
283        F: Fn(RemoteNode<Env::ValidatorNode>) -> Fut,
284        Fut: Future<Output = Result<R, NodeError>> + 'static,
285    {
286        self.add_peer(peer.clone()).await;
287        self.in_flight_tracker
288            .add_alternative_peer(&key, peer.clone())
289            .await;
290
291        // Clone the nodes Arc so we can move it into the closure
292        let nodes = self.nodes.clone();
293        self.deduplicated_request(key, peer, move |peer| {
294            let fut = operation(peer.clone());
295            let nodes = nodes.clone();
296            async move { Self::track_request(nodes, peer, fut).await }
297        })
298        .await
299    }
300
301    #[instrument(level = "trace", skip_all)]
302    async fn download_blob(
303        &self,
304        peers: &[RemoteNode<Env::ValidatorNode>],
305        blob_id: BlobId,
306        timeout: Duration,
307    ) -> Result<Option<Blob>, NodeError> {
308        let key = RequestKey::Blob(blob_id);
309        communicate_concurrently(
310            peers,
311            async move |peer| {
312                self.with_peer(key, peer, move |peer| async move {
313                    peer.download_blob(blob_id).await
314                })
315                .await
316            },
317            |errors| errors.last().cloned().unwrap(),
318            timeout,
319        )
320        .await
321        .map_err(|(_validator, error)| error)
322    }
323
324    /// Downloads the blobs with the given IDs. This is done in one concurrent task per blob.
325    /// Uses intelligent peer selection based on scores and load balancing.
326    /// Returns `None` if it couldn't find all blobs.
327    #[instrument(level = "trace", skip_all)]
328    pub async fn download_blobs(
329        &self,
330        peers: &[RemoteNode<Env::ValidatorNode>],
331        blob_ids: &[BlobId],
332        timeout: Duration,
333    ) -> Result<Option<Vec<Blob>>, NodeError> {
334        let mut stream = blob_ids
335            .iter()
336            .map(|blob_id| self.download_blob(peers, *blob_id, timeout))
337            .collect::<FuturesUnordered<_>>();
338
339        let mut blobs = Vec::new();
340        while let Some(maybe_blob) = stream.next().await {
341            blobs.push(maybe_blob?);
342        }
343        Ok(blobs.into_iter().collect::<Option<Vec<_>>>())
344    }
345
346    pub async fn download_certificates(
347        &self,
348        peer: &RemoteNode<Env::ValidatorNode>,
349        chain_id: ChainId,
350        start: BlockHeight,
351        limit: u64,
352    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
353        let heights = (start.0..start.0 + limit)
354            .map(BlockHeight)
355            .collect::<Vec<_>>();
356        self.with_peer(
357            RequestKey::Certificates {
358                chain_id,
359                heights: heights.clone(),
360            },
361            peer.clone(),
362            move |peer| {
363                let heights = heights.clone();
364                async move {
365                    Box::pin(peer.download_certificates_by_heights(chain_id, heights)).await
366                }
367            },
368        )
369        .await
370    }
371
372    /// Downloads certificates from any of the given validators, using staggered
373    /// concurrent requests so that slow validators are quickly bypassed.
374    pub async fn download_certificates_from_validators(
375        &self,
376        peers: &[RemoteNode<Env::ValidatorNode>],
377        chain_id: ChainId,
378        start: BlockHeight,
379        limit: u64,
380        timeout: Duration,
381    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
382        if peers.is_empty() {
383            return Err(NodeError::NoValidators);
384        }
385        let heights = (start.0..start.0 + limit)
386            .map(BlockHeight)
387            .collect::<Vec<_>>();
388        let key = RequestKey::Certificates {
389            chain_id,
390            heights: heights.clone(),
391        };
392        communicate_concurrently(
393            peers,
394            async move |peer| {
395                self.with_peer(key, peer, move |peer| {
396                    let heights = heights.clone();
397                    async move {
398                        Box::pin(peer.download_certificates_by_heights(chain_id, heights)).await
399                    }
400                })
401                .await
402            },
403            |errors| errors.last().cloned().unwrap(),
404            timeout,
405        )
406        .await
407        .map_err(|(_validator, error)| error)
408    }
409
410    pub async fn download_certificates_by_heights(
411        &self,
412        peer: &RemoteNode<Env::ValidatorNode>,
413        chain_id: ChainId,
414        heights: Vec<BlockHeight>,
415    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
416        self.with_peer(
417            RequestKey::Certificates {
418                chain_id,
419                heights: heights.clone(),
420            },
421            peer.clone(),
422            move |peer| {
423                let heights = heights.clone();
424                async move {
425                    peer.download_certificates_by_heights(chain_id, heights)
426                        .await
427                }
428            },
429        )
430        .await
431    }
432
433    pub async fn download_certificate_for_blob(
434        &self,
435        peer: &RemoteNode<Env::ValidatorNode>,
436        blob_id: BlobId,
437    ) -> Result<ConfirmedBlockCertificate, NodeError> {
438        self.with_peer(
439            RequestKey::CertificateForBlob(blob_id),
440            peer.clone(),
441            move |peer| async move { peer.download_certificate_for_blob(blob_id).await },
442        )
443        .await
444    }
445
446    pub async fn download_pending_blob(
447        &self,
448        peer: &RemoteNode<Env::ValidatorNode>,
449        chain_id: ChainId,
450        blob_id: BlobId,
451    ) -> Result<BlobContent, NodeError> {
452        self.with_peer(
453            RequestKey::PendingBlob { chain_id, blob_id },
454            peer.clone(),
455            move |peer| async move { peer.node.download_pending_blob(chain_id, blob_id).await },
456        )
457        .await
458    }
459
460    /// Returns the alternative peers registered for an in-flight request, if any.
461    ///
462    /// This can be used to retry a failed request with alternative data sources
463    /// that were registered during request deduplication.
464    pub async fn get_alternative_peers(
465        &self,
466        key: &RequestKey,
467    ) -> Option<Vec<RemoteNode<Env::ValidatorNode>>> {
468        self.in_flight_tracker.get_alternative_peers(key).await
469    }
470
471    /// Wraps a request operation with performance tracking and capacity management.
472    ///
473    /// This method:
474    /// 1. Measures response time
475    /// 2. Updates node metrics based on success/failure
476    ///
477    /// # Arguments
478    /// - `nodes`: Arc to the nodes map for updating metrics
479    /// - `peer`: The remote node to track metrics for
480    /// - `operation`: Future that performs the actual request
481    ///
482    /// # Behavior
483    /// Executes the provided future and tracks metrics for the given peer.
484    async fn track_request<T, Fut>(
485        nodes: Arc<tokio::sync::RwLock<BTreeMap<ValidatorPublicKey, NodeInfo<Env>>>>,
486        peer: RemoteNode<Env::ValidatorNode>,
487        operation: Fut,
488    ) -> Result<T, NodeError>
489    where
490        Fut: Future<Output = Result<T, NodeError>> + 'static,
491    {
492        let start_time = Instant::now();
493        let public_key = peer.public_key;
494
495        // Execute the operation
496        let result = operation.await;
497
498        // Update metrics and release slot
499        let response_time_ms = start_time.elapsed().as_millis() as u64;
500        let is_success = result.is_ok();
501        {
502            let mut nodes_guard = nodes.write().await;
503            if let Some(info) = nodes_guard.get_mut(&public_key) {
504                info.update_metrics(is_success, response_time_ms);
505                let score = info.calculate_score().await;
506                tracing::trace!(
507                    node = %public_key,
508                    address = %info.node.node.address(),
509                    success = %is_success,
510                    response_time_ms = %response_time_ms,
511                    score = %score,
512                    total_requests = %info.total_requests(),
513                    "Request completed"
514                );
515            }
516        }
517
518        // Record Prometheus metrics
519        #[cfg(with_metrics)]
520        {
521            let validator_name = public_key.to_string();
522            metrics::VALIDATOR_RESPONSE_TIME
523                .with_label_values(&[&validator_name])
524                .observe(response_time_ms as f64);
525            metrics::VALIDATOR_REQUEST_TOTAL
526                .with_label_values(&[&validator_name])
527                .inc();
528            if is_success {
529                metrics::VALIDATOR_REQUEST_SUCCESS
530                    .with_label_values(&[&validator_name])
531                    .inc();
532            }
533        }
534
535        result
536    }
537
538    /// Deduplicates concurrent requests for the same data.
539    ///
540    /// If a request for the same key is already in flight, this method waits for
541    /// the existing request to complete and returns its result. Otherwise, it
542    /// executes the operation and broadcasts the result to all waiting callers.
543    ///
544    /// This method also performs **subsumption-based deduplication**: if a larger
545    /// request that contains all the data needed by this request is already cached
546    /// or in flight, we can extract the subset result instead of making a new request.
547    ///
548    /// # Arguments
549    /// - `key`: Unique identifier for the request
550    /// - `operation`: Async closure that performs the actual request
551    ///
552    /// # Returns
553    /// The result from either the in-flight request or the newly executed operation
554    async fn deduplicated_request<T, F, Fut>(
555        &self,
556        key: RequestKey,
557        peer: RemoteNode<Env::ValidatorNode>,
558        operation: F,
559    ) -> Result<T, NodeError>
560    where
561        T: Cacheable + Clone + Send + 'static,
562        F: Fn(RemoteNode<Env::ValidatorNode>) -> Fut,
563        Fut: Future<Output = Result<T, NodeError>> + 'static,
564    {
565        // Check cache for exact or subsuming match
566        if let Some(result) = self.cache.get(&key).await {
567            return Ok(result);
568        }
569
570        // Check if there's an in-flight request (exact or subsuming)
571        if let Some(in_flight_match) = self.in_flight_tracker.try_subscribe(&key).await {
572            match in_flight_match {
573                InFlightMatch::Exact(Subscribed(mut receiver)) => {
574                    tracing::trace!(
575                        ?key,
576                        "deduplicating request (exact match) - joining existing in-flight request"
577                    );
578                    #[cfg(with_metrics)]
579                    metrics::REQUEST_CACHE_DEDUPLICATION.inc();
580                    // Wait for result from existing request
581                    match receiver.recv().await {
582                        Ok(result) => match result.as_ref().clone() {
583                            Ok(res) => match T::try_from(res) {
584                                Ok(converted) => {
585                                    tracing::trace!(
586                                        ?key,
587                                        "received result from deduplicated in-flight request"
588                                    );
589                                    return Ok(converted);
590                                }
591                                Err(_) => {
592                                    tracing::warn!(
593                                        ?key,
594                                        "failed to convert result from deduplicated in-flight request, will execute independently"
595                                    );
596                                }
597                            },
598                            Err(error) => {
599                                tracing::trace!(
600                                    ?key,
601                                    %error,
602                                    "in-flight request failed",
603                                );
604                                // Fall through to execute a new request
605                            }
606                        },
607                        Err(_) => {
608                            tracing::trace!(?key, "in-flight request sender dropped");
609                            // Fall through to execute a new request
610                        }
611                    }
612                }
613                InFlightMatch::Subsuming {
614                    key: subsuming_key,
615                    outcome: Subscribed(mut receiver),
616                } => {
617                    tracing::trace!(
618                    ?key,
619                    subsumed_by = ?subsuming_key,
620                        "deduplicating request (subsumption) - joining larger in-flight request"
621                    );
622                    #[cfg(with_metrics)]
623                    metrics::REQUEST_CACHE_DEDUPLICATION.inc();
624                    // Wait for result from the subsuming request
625                    match receiver.recv().await {
626                        Ok(result) => {
627                            match result.as_ref() {
628                                Ok(res) => {
629                                    if let Some(extracted) =
630                                        key.try_extract_result(&subsuming_key, res)
631                                    {
632                                        tracing::trace!(
633                                            ?key,
634                                            "extracted subset result from larger in-flight request"
635                                        );
636                                        match T::try_from(extracted) {
637                                            Ok(converted) => return Ok(converted),
638                                            Err(_) => {
639                                                tracing::trace!(
640                                                    ?key,
641                                                    "failed to convert extracted result, will execute independently"
642                                                );
643                                            }
644                                        }
645                                    } else {
646                                        // Extraction failed, fall through to execute our own request
647                                        tracing::trace!(
648                                            ?key,
649                                            "failed to extract from subsuming request, will execute independently"
650                                        );
651                                    }
652                                }
653                                Err(error) => {
654                                    tracing::trace!(
655                                        ?key,
656                                        ?error,
657                                        "subsuming in-flight request failed",
658                                    );
659                                    // Fall through to execute our own request
660                                }
661                            }
662                        }
663                        Err(_) => {
664                            tracing::trace!(?key, "subsuming in-flight request sender dropped");
665                        }
666                    }
667                }
668            }
669        };
670
671        // Create new in-flight entry for this request
672        self.in_flight_tracker.insert_new(key.clone()).await;
673
674        // Remove the peer we're about to use from alternatives (it shouldn't retry with itself)
675        self.in_flight_tracker
676            .remove_alternative_peer(&key, &peer)
677            .await;
678
679        // Execute request with staggered parallel - first peer starts immediately,
680        // alternatives are tried after stagger delays (even if first peer is slow but not failing)
681        tracing::trace!(?key, ?peer, "executing staggered parallel request");
682        let result = self
683            .try_staggered_parallel(&key, peer, &operation, self.retry_delay)
684            .await;
685
686        let result_for_broadcast: Result<RequestResult, NodeError> = result.clone().map(Into::into);
687        let shared_result = Arc::new(result_for_broadcast);
688
689        // Broadcast result and clean up
690        self.in_flight_tracker
691            .complete_and_broadcast(&key, shared_result.clone())
692            .await;
693
694        if let Ok(success) = shared_result.as_ref() {
695            self.cache
696                .store(key.clone(), Arc::new(success.clone()))
697                .await;
698        }
699        result
700    }
701
702    /// Tries alternative peers in staggered parallel fashion.
703    ///
704    /// Launches requests starting with the first peer, then dynamically pops alternative peers
705    /// with a stagger delay between each. Returns the first successful result. This provides
706    /// a balance between sequential (slow) and fully parallel (wasteful) approaches.
707    ///
708    /// # Arguments
709    /// - `key`: The request key (for logging and popping alternatives)
710    /// - `first_peer`: The initial peer to try first (at time 0)
711    /// - `operation`: The operation to execute on each peer
712    /// - `staggered_delay_ms`: Delay in milliseconds between starting each subsequent peer
713    ///
714    /// # Returns
715    /// The first successful result, or the last error if all fail
716    async fn try_staggered_parallel<T, F, Fut>(
717        &self,
718        key: &RequestKey,
719        first_peer: RemoteNode<Env::ValidatorNode>,
720        operation: &F,
721        staggered_delay: Duration,
722    ) -> Result<T, NodeError>
723    where
724        T: 'static,
725        F: Fn(RemoteNode<Env::ValidatorNode>) -> Fut,
726        Fut: Future<Output = Result<T, NodeError>> + 'static,
727    {
728        use futures::{
729            future::{select, Either},
730            stream::{FuturesUnordered, StreamExt},
731        };
732        use linera_base::time::timer::sleep;
733
734        let mut futures: FuturesUnordered<Fut> = FuturesUnordered::new();
735        let peer_index = AtomicU32::new(0);
736
737        let push_future = |futures: &mut FuturesUnordered<Fut>, fut: Fut| {
738            futures.push(fut);
739            peer_index.fetch_add(1, Ordering::SeqCst)
740        };
741
742        // Start the first peer immediately (no delay)
743        push_future(&mut futures, operation(first_peer));
744
745        let mut last_error = NodeError::UnexpectedMessage;
746        let mut next_delay = Box::pin(sleep(staggered_delay * peer_index.load(Ordering::SeqCst)));
747
748        // Phase 1: Race between futures completion and delays (while alternatives might exist)
749        loop {
750            // Exit condition: no futures running and can't start any more
751            if futures.is_empty() {
752                if let Some(peer) = self.in_flight_tracker.pop_alternative_peer(key).await {
753                    push_future(&mut futures, operation(peer));
754                    next_delay =
755                        Box::pin(sleep(staggered_delay * peer_index.load(Ordering::SeqCst)));
756                } else {
757                    // No futures and no alternatives - we're done
758                    break;
759                }
760            }
761
762            let next_result = Box::pin(futures.next());
763
764            match select(next_result, next_delay).await {
765                // A request completed
766                Either::Left((Some(result), delay_fut)) => {
767                    // Keep the delay future for next iteration
768                    next_delay = delay_fut;
769
770                    match result {
771                        Ok(value) => {
772                            tracing::trace!(?key, "staggered parallel request succeeded");
773                            return Ok(value);
774                        }
775                        Err(error) => {
776                            tracing::debug!(
777                                ?key,
778                                %error,
779                                "staggered parallel request attempt failed"
780                            );
781                            last_error = error;
782
783                            // Immediately try next alternative
784                            if let Some(peer) =
785                                self.in_flight_tracker.pop_alternative_peer(key).await
786                            {
787                                push_future(&mut futures, operation(peer));
788                                next_delay = Box::pin(sleep(
789                                    staggered_delay * peer_index.load(Ordering::SeqCst),
790                                ));
791                            }
792                        }
793                    }
794                }
795                // All running futures completed
796                Either::Left((None, delay_fut)) => {
797                    // Restore the delay future
798                    next_delay = delay_fut;
799                    // Will check at top of loop if we should try more alternatives
800                    continue;
801                }
802                // Delay elapsed - try to start next peer
803                Either::Right((_, _)) => {
804                    if let Some(peer) = self.in_flight_tracker.pop_alternative_peer(key).await {
805                        push_future(&mut futures, operation(peer));
806                        next_delay =
807                            Box::pin(sleep(staggered_delay * peer_index.load(Ordering::SeqCst)));
808                    } else {
809                        // No more alternatives - break out to phase 2
810                        break;
811                    }
812                }
813            }
814        }
815
816        // Phase 2: No more alternatives, just wait for remaining futures to complete
817        while let Some(result) = futures.next().await {
818            match result {
819                Ok(value) => {
820                    tracing::trace!(?key, "staggered parallel request succeeded");
821                    return Ok(value);
822                }
823                Err(error) => {
824                    tracing::debug!(
825                        ?key,
826                        %error,
827                        "staggered parallel request attempt failed"
828                    );
829                    last_error = error;
830                }
831            }
832        }
833
834        // All attempts failed
835        tracing::debug!(?key, "all staggered parallel retry attempts failed");
836        Err(last_error)
837    }
838
839    /// Returns all peers ordered by their score (highest first).
840    ///
841    /// Only includes peers that can currently accept requests. Each peer is paired
842    /// with its calculated score based on latency, success rate, and availability.
843    ///
844    /// # Returns
845    /// A vector of `(score, peer)` tuples sorted by score in descending order.
846    /// Returns an empty vector if no peers can accept requests.
847    async fn peers_by_score(&self) -> Vec<(f64, RemoteNode<Env::ValidatorNode>)> {
848        let nodes = self.nodes.read().await;
849
850        // Filter nodes that can accept requests and calculate their scores
851        let mut scored_nodes = Vec::new();
852        for info in nodes.values() {
853            let score = info.calculate_score().await;
854            scored_nodes.push((score, info.node.clone()));
855        }
856
857        // Sort by score (highest first)
858        scored_nodes.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
859
860        scored_nodes
861    }
862
863    /// Selects the best available peer using weighted random selection from top performers.
864    ///
865    /// This method:
866    /// 1. Sorts nodes by performance score
867    /// 2. Performs weighted random selection from the top 3 performers
868    ///
869    /// This approach balances between choosing high-performing nodes and distributing
870    /// load across multiple validators to avoid creating hotspots.
871    ///
872    /// Returns `None` if no nodes are available.
873    async fn select_best_peer(&self) -> Option<RemoteNode<Env::ValidatorNode>> {
874        let scored_nodes = self.peers_by_score().await;
875
876        if scored_nodes.is_empty() {
877            return None;
878        }
879
880        // Use weighted random selection from top performers (top 3 or all if less)
881        let top_count = scored_nodes.len().min(3);
882        let top_nodes = &scored_nodes[..top_count];
883
884        // Create weights based on normalized scores
885        // Add small epsilon to prevent zero weights
886        let weights: Vec<f64> = top_nodes.iter().map(|(score, _)| score.max(0.01)).collect();
887
888        if let Ok(dist) = WeightedIndex::new(&weights) {
889            let mut rng = rand::thread_rng();
890            let index = dist.sample(&mut rng);
891            Some(top_nodes[index].1.clone())
892        } else {
893            // Fallback to the best node if weights are invalid
894            tracing::warn!("failed to create weighted distribution, defaulting to best node");
895            Some(scored_nodes[0].1.clone())
896        }
897    }
898
899    /// Adds a new peer to the manager if it doesn't already exist.
900    async fn add_peer(&self, node: RemoteNode<Env::ValidatorNode>) {
901        let mut nodes = self.nodes.write().await;
902        let public_key = node.public_key;
903        nodes.entry(public_key).or_insert_with(|| {
904            NodeInfo::with_config(node, self.weights, self.alpha, self.max_expected_latency)
905        });
906    }
907}
908
909#[cfg(test)]
910mod tests {
911    use std::sync::{
912        atomic::{AtomicUsize, Ordering},
913        Arc,
914    };
915
916    use linera_base::{
917        crypto::{CryptoHash, InMemorySigner},
918        data_types::BlockHeight,
919        identifiers::ChainId,
920        time::Duration,
921    };
922    use linera_chain::types::ConfirmedBlockCertificate;
923    use tokio::sync::oneshot;
924
925    use super::{super::request::RequestKey, *};
926    use crate::{
927        client::requests_scheduler::{MAX_REQUEST_TTL_MS, STAGGERED_DELAY_MS},
928        node::NodeError,
929    };
930
931    type TestEnvironment = crate::environment::Test;
932
933    /// Helper function to create a test RequestsScheduler with custom configuration
934    fn create_test_manager(
935        in_flight_timeout: Duration,
936        cache_ttl: Duration,
937    ) -> Arc<RequestsScheduler<TestEnvironment>> {
938        let mut manager = RequestsScheduler::with_config(
939            vec![], // No actual nodes needed for these tests
940            ScoringWeights::default(),
941            0.1,
942            1000.0,
943            cache_ttl,
944            100,
945            in_flight_timeout,
946            Duration::from_millis(STAGGERED_DELAY_MS),
947        );
948        // Replace the tracker with one using the custom timeout
949        manager.in_flight_tracker = InFlightTracker::new(in_flight_timeout);
950        Arc::new(manager)
951    }
952
953    /// Helper function to create a test request key
954    fn test_key() -> RequestKey {
955        RequestKey::Certificates {
956            chain_id: ChainId(CryptoHash::test_hash("test")),
957            heights: vec![BlockHeight(0), BlockHeight(1)],
958        }
959    }
960
961    /// Helper function to create a dummy peer for testing
962    fn dummy_peer() -> RemoteNode<<TestEnvironment as Environment>::ValidatorNode> {
963        use crate::test_utils::{MemoryStorageBuilder, TestBuilder};
964
965        // Create a minimal test builder to get a validator node
966        let mut builder = futures::executor::block_on(async {
967            TestBuilder::new(
968                MemoryStorageBuilder::default(),
969                1,
970                0,
971                linera_base::crypto::InMemorySigner::new(None),
972            )
973            .await
974            .unwrap()
975        });
976
977        let node = builder.node(0);
978        let public_key = node.name();
979        RemoteNode { public_key, node }
980    }
981
982    #[tokio::test]
983    async fn test_cache_hit_returns_cached_result() {
984        // Create a manager with standard cache TTL
985        let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60));
986        let key = test_key();
987        let peer = dummy_peer();
988
989        // Track how many times the operation is executed
990        let execution_count = Arc::new(AtomicUsize::new(0));
991        let execution_count_clone = execution_count.clone();
992
993        // First call - should execute the operation and cache the result
994        let result1: Result<Vec<ConfirmedBlockCertificate>, NodeError> = manager
995            .deduplicated_request(key.clone(), peer.clone(), |_| {
996                let count = execution_count_clone.clone();
997                async move {
998                    count.fetch_add(1, Ordering::SeqCst);
999                    Ok(vec![])
1000                }
1001            })
1002            .await;
1003
1004        assert!(result1.is_ok());
1005        assert_eq!(execution_count.load(Ordering::SeqCst), 1);
1006
1007        // Second call - should return cached result without executing the operation
1008        let execution_count_clone2 = execution_count.clone();
1009        let result2: Result<Vec<ConfirmedBlockCertificate>, NodeError> = manager
1010            .deduplicated_request(key.clone(), peer.clone(), |_| {
1011                let count = execution_count_clone2.clone();
1012                async move {
1013                    count.fetch_add(1, Ordering::SeqCst);
1014                    Ok(vec![])
1015                }
1016            })
1017            .await;
1018
1019        assert_eq!(result1, result2);
1020        // Operation should still only have been executed once (cache hit)
1021        assert_eq!(execution_count.load(Ordering::SeqCst), 1);
1022    }
1023
1024    #[tokio::test]
1025    async fn test_in_flight_request_deduplication() {
1026        let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60));
1027        let key = test_key();
1028        let peer = dummy_peer();
1029
1030        // Track how many times the operation is executed
1031        let execution_count = Arc::new(AtomicUsize::new(0));
1032
1033        // Create a channel to control when the first operation completes
1034        let (tx, rx) = oneshot::channel();
1035        let rx = Arc::new(tokio::sync::Mutex::new(Some(rx)));
1036
1037        // Start first request (will be slow - waits for signal)
1038        let manager_clone = Arc::clone(&manager);
1039        let key_clone = key.clone();
1040        let execution_count_clone = execution_count.clone();
1041        let rx_clone = Arc::clone(&rx);
1042        let peer_clone = peer.clone();
1043        let first_request = tokio::spawn(async move {
1044            manager_clone
1045                .deduplicated_request(key_clone, peer_clone, |_| {
1046                    let count = execution_count_clone.clone();
1047                    let rx = Arc::clone(&rx_clone);
1048                    async move {
1049                        count.fetch_add(1, Ordering::SeqCst);
1050                        // Wait for signal before completing
1051                        if let Some(receiver) = rx.lock().await.take() {
1052                            receiver.await.unwrap();
1053                        }
1054                        Ok(vec![])
1055                    }
1056                })
1057                .await
1058        });
1059
1060        // Start second request - should deduplicate and wait for the first
1061        let execution_count_clone2 = execution_count.clone();
1062        let second_request = tokio::spawn(async move {
1063            manager
1064                .deduplicated_request(key, peer, |_| {
1065                    let count = execution_count_clone2.clone();
1066                    async move {
1067                        count.fetch_add(1, Ordering::SeqCst);
1068                        Ok(vec![])
1069                    }
1070                })
1071                .await
1072        });
1073
1074        // Signal the first request to complete
1075        tx.send(()).unwrap();
1076
1077        // Both requests should complete successfully
1078        let result1: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1079            first_request.await.unwrap();
1080        let result2: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1081            second_request.await.unwrap();
1082
1083        assert!(result1.is_ok());
1084        assert_eq!(result1, result2);
1085
1086        // Operation should only have been executed once (deduplication worked)
1087        assert_eq!(execution_count.load(Ordering::SeqCst), 1);
1088    }
1089
1090    #[tokio::test]
1091    async fn test_multiple_subscribers_all_notified() {
1092        let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60));
1093        let key = test_key();
1094        let peer = dummy_peer();
1095
1096        // Track how many times the operation is executed
1097        let execution_count = Arc::new(AtomicUsize::new(0));
1098
1099        // Create a channel to control when the operation completes
1100        let (tx, rx) = oneshot::channel();
1101        let rx = Arc::new(tokio::sync::Mutex::new(Some(rx)));
1102
1103        // Start first request (will be slow - waits for signal)
1104        let manager_clone1 = Arc::clone(&manager);
1105        let key_clone1 = key.clone();
1106        let execution_count_clone = execution_count.clone();
1107        let rx_clone = Arc::clone(&rx);
1108        let peer_clone = peer.clone();
1109        let first_request = tokio::spawn(async move {
1110            manager_clone1
1111                .deduplicated_request(key_clone1, peer_clone, |_| {
1112                    let count = execution_count_clone.clone();
1113                    let rx = Arc::clone(&rx_clone);
1114                    async move {
1115                        count.fetch_add(1, Ordering::SeqCst);
1116                        if let Some(receiver) = rx.lock().await.take() {
1117                            receiver.await.unwrap();
1118                        }
1119                        Ok(vec![])
1120                    }
1121                })
1122                .await
1123        });
1124
1125        // Start multiple additional requests - all should deduplicate
1126        let mut handles = vec![];
1127        for _ in 0..5 {
1128            let manager_clone = Arc::clone(&manager);
1129            let key_clone = key.clone();
1130            let execution_count_clone = execution_count.clone();
1131            let peer_clone = peer.clone();
1132            let handle = tokio::spawn(async move {
1133                manager_clone
1134                    .deduplicated_request(key_clone, peer_clone, |_| {
1135                        let count = execution_count_clone.clone();
1136                        async move {
1137                            count.fetch_add(1, Ordering::SeqCst);
1138                            Ok(vec![])
1139                        }
1140                    })
1141                    .await
1142            });
1143            handles.push(handle);
1144        }
1145
1146        // Signal the first request to complete
1147        tx.send(()).unwrap();
1148
1149        // First request should complete successfully
1150        let result: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1151            first_request.await.unwrap();
1152        assert!(result.is_ok());
1153
1154        // All subscriber requests should also complete successfully
1155        for handle in handles {
1156            assert_eq!(handle.await.unwrap(), result);
1157        }
1158
1159        // Operation should only have been executed once (all requests were deduplicated)
1160        assert_eq!(execution_count.load(Ordering::SeqCst), 1);
1161    }
1162
1163    #[tokio::test]
1164    async fn test_timeout_triggers_new_request() {
1165        // Create a manager with a very short in-flight timeout
1166        let manager = create_test_manager(Duration::from_millis(50), Duration::from_secs(60));
1167
1168        let key = test_key();
1169        let peer = dummy_peer();
1170
1171        // Track how many times the operation is executed
1172        let execution_count = Arc::new(AtomicUsize::new(0));
1173
1174        // Create a channel to control when the first operation completes
1175        let (tx, rx) = oneshot::channel();
1176        let rx = Arc::new(tokio::sync::Mutex::new(Some(rx)));
1177
1178        // Start first request (will be slow - waits for signal)
1179        let manager_clone = Arc::clone(&manager);
1180        let key_clone = key.clone();
1181        let execution_count_clone = execution_count.clone();
1182        let rx_clone = Arc::clone(&rx);
1183        let peer_clone = peer.clone();
1184        let first_request = tokio::spawn(async move {
1185            manager_clone
1186                .deduplicated_request(key_clone, peer_clone, |_| {
1187                    let count = execution_count_clone.clone();
1188                    let rx = Arc::clone(&rx_clone);
1189                    async move {
1190                        count.fetch_add(1, Ordering::SeqCst);
1191                        if let Some(receiver) = rx.lock().await.take() {
1192                            receiver.await.unwrap();
1193                        }
1194                        Ok(vec![])
1195                    }
1196                })
1197                .await
1198        });
1199
1200        // Wait for the timeout to elapse
1201        tokio::time::sleep(Duration::from_millis(MAX_REQUEST_TTL_MS + 1)).await;
1202
1203        // Start second request - should NOT deduplicate because first request exceeded timeout
1204        let execution_count_clone2 = execution_count.clone();
1205        let second_request = tokio::spawn(async move {
1206            manager
1207                .deduplicated_request(key, peer, |_| {
1208                    let count = execution_count_clone2.clone();
1209                    async move {
1210                        count.fetch_add(1, Ordering::SeqCst);
1211                        Ok(vec![])
1212                    }
1213                })
1214                .await
1215        });
1216
1217        // Wait for second request to complete
1218        let result2: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1219            second_request.await.unwrap();
1220        assert!(result2.is_ok());
1221
1222        // Complete the first request
1223        tx.send(()).unwrap();
1224        let result1: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1225            first_request.await.unwrap();
1226        assert!(result1.is_ok());
1227
1228        // Operation should have been executed twice (timeout triggered new request)
1229        assert_eq!(execution_count.load(Ordering::SeqCst), 2);
1230    }
1231
1232    #[tokio::test]
1233    async fn test_alternative_peers_registered_on_deduplication() {
1234        use linera_base::identifiers::BlobType;
1235
1236        use crate::test_utils::{MemoryStorageBuilder, TestBuilder};
1237
1238        // Create a test environment with three validators
1239        let mut builder = TestBuilder::new(
1240            MemoryStorageBuilder::default(),
1241            3,
1242            0,
1243            InMemorySigner::new(None),
1244        )
1245        .await
1246        .unwrap();
1247
1248        // Get validator nodes
1249        let nodes: Vec<_> = (0..3)
1250            .map(|i| {
1251                let node = builder.node(i);
1252                let public_key = node.name();
1253                RemoteNode { public_key, node }
1254            })
1255            .collect();
1256
1257        // Create a RequestsScheduler
1258        let manager: Arc<RequestsScheduler<TestEnvironment>> =
1259            Arc::new(RequestsScheduler::with_config(
1260                nodes.clone(),
1261                ScoringWeights::default(),
1262                0.1,
1263                1000.0,
1264                Duration::from_secs(60),
1265                100,
1266                Duration::from_millis(MAX_REQUEST_TTL_MS),
1267                Duration::from_millis(STAGGERED_DELAY_MS),
1268            ));
1269
1270        let key = RequestKey::Blob(BlobId::new(
1271            CryptoHash::test_hash("test_blob"),
1272            BlobType::Data,
1273        ));
1274
1275        // Create a channel to control when first request completes
1276        let (tx, rx) = oneshot::channel();
1277        let rx = Arc::new(tokio::sync::Mutex::new(Some(rx)));
1278
1279        // Start first request with node 0 (will block until signaled)
1280        let manager_clone = Arc::clone(&manager);
1281        let node_clone = nodes[0].clone();
1282        let key_clone = key.clone();
1283        let rx_clone = Arc::clone(&rx);
1284        let first_request = tokio::spawn(async move {
1285            manager_clone
1286                .with_peer(key_clone, node_clone, move |_peer| {
1287                    let rx = Arc::clone(&rx_clone);
1288                    async move {
1289                        // Wait for signal
1290                        if let Some(receiver) = rx.lock().await.take() {
1291                            receiver.await.unwrap();
1292                        }
1293                        Ok(None) // Return Option<Blob>
1294                    }
1295                })
1296                .await
1297        });
1298
1299        // Give first request time to start and become in-flight
1300        tokio::time::sleep(Duration::from_millis(100)).await;
1301
1302        // Start second and third requests with different nodes
1303        // These should register as alternatives and wait for the first request
1304        let handles: Vec<_> = vec![nodes[1].clone(), nodes[2].clone()]
1305            .into_iter()
1306            .map(|node| {
1307                let manager_clone = Arc::clone(&manager);
1308                let key_clone = key.clone();
1309                tokio::spawn(async move {
1310                    manager_clone
1311                        .with_peer(key_clone, node, |_peer| async move {
1312                            Ok(None) // Return Option<Blob>
1313                        })
1314                        .await
1315                })
1316            })
1317            .collect();
1318
1319        // Give time for alternative peers to register
1320        tokio::time::sleep(Duration::from_millis(100)).await;
1321
1322        // Alternatives are being popped as staggered parallel runs.
1323        // The first request is blocked waiting for the signal, so staggered parallel has started
1324        // and may have already popped one or both alternatives. We just verify that at least
1325        // one alternative was registered (before being popped).
1326        // This test primarily validates that alternatives can be registered during deduplication.
1327
1328        // Signal first request to complete
1329        tx.send(()).unwrap();
1330
1331        // Wait for all requests to complete
1332        let _result1 = first_request.await.unwrap();
1333        for handle in handles {
1334            handle.await.unwrap().ok();
1335        }
1336
1337        // After completion, the in-flight entry should be removed
1338        tokio::time::sleep(Duration::from_millis(50)).await;
1339        let alt_peers = manager.get_alternative_peers(&key).await;
1340        assert!(
1341            alt_peers.is_none(),
1342            "Expected in-flight entry to be removed after completion"
1343        );
1344    }
1345
1346    #[tokio::test]
1347    async fn test_staggered_parallel_retry_on_failure() {
1348        use std::sync::atomic::{AtomicU64, Ordering};
1349
1350        use crate::test_utils::{MemoryStorageBuilder, TestBuilder};
1351
1352        // Create a test environment with four validators
1353        let mut builder = TestBuilder::new(
1354            MemoryStorageBuilder::default(),
1355            4,
1356            0,
1357            InMemorySigner::new(None),
1358        )
1359        .await
1360        .unwrap();
1361
1362        // Get validator nodes
1363        let nodes: Vec<_> = (0..4)
1364            .map(|i| {
1365                let node = builder.node(i);
1366                let public_key = node.name();
1367                RemoteNode { public_key, node }
1368            })
1369            .collect();
1370
1371        let staggered_delay = Duration::from_millis(100);
1372
1373        // Store public keys for comparison
1374        let node0_key = nodes[0].public_key;
1375        let node2_key = nodes[2].public_key;
1376
1377        // Create a RequestsScheduler
1378        let manager: Arc<RequestsScheduler<TestEnvironment>> =
1379            Arc::new(RequestsScheduler::with_config(
1380                nodes.clone(),
1381                ScoringWeights::default(),
1382                0.1,
1383                1000.0,
1384                Duration::from_secs(60),
1385                100,
1386                Duration::from_millis(MAX_REQUEST_TTL_MS),
1387                staggered_delay,
1388            ));
1389
1390        let key = test_key();
1391
1392        // Track when each peer is called
1393        let call_times = Arc::new(tokio::sync::Mutex::new(Vec::new()));
1394        let start_time = Instant::now();
1395
1396        // Track call count per peer
1397        let call_count = Arc::new(AtomicU64::new(0));
1398
1399        let call_times_clone = Arc::clone(&call_times);
1400        let call_count_clone = Arc::clone(&call_count);
1401
1402        // Test the staggered parallel retry logic directly
1403        let operation = |peer: RemoteNode<<TestEnvironment as Environment>::ValidatorNode>| {
1404            let times = Arc::clone(&call_times_clone);
1405            let count = Arc::clone(&call_count_clone);
1406            let start = start_time;
1407            async move {
1408                let elapsed = Instant::now().duration_since(start);
1409                times.lock().await.push((peer.public_key, elapsed));
1410                count.fetch_add(1, Ordering::SeqCst);
1411
1412                if peer.public_key == node0_key {
1413                    // Node 0 fails quickly
1414                    Err(NodeError::UnexpectedMessage)
1415                } else if peer.public_key == node2_key {
1416                    // Node 2 succeeds after a delay
1417                    tokio::time::sleep(staggered_delay / 2).await;
1418                    Ok(vec![])
1419                } else {
1420                    // Other nodes take longer or fail
1421                    tokio::time::sleep(staggered_delay * 2).await;
1422                    Err(NodeError::UnexpectedMessage)
1423                }
1424            }
1425        };
1426
1427        // Setup: Insert in-flight entry and register alternative peers
1428        manager.in_flight_tracker.insert_new(key.clone()).await;
1429        // Register nodes 3, 2, 1 as alternatives (will be popped in reverse: 1, 2, 3)
1430        for node in nodes.iter().skip(1).rev() {
1431            manager
1432                .in_flight_tracker
1433                .add_alternative_peer(&key, node.clone())
1434                .await;
1435        }
1436
1437        // Use node 0 as first peer, alternatives will be popped: node 1, then 2, then 3
1438        let result: Result<Vec<ConfirmedBlockCertificate>, NodeError> = manager
1439            .try_staggered_parallel(&key, nodes[0].clone(), &operation, staggered_delay)
1440            .await;
1441
1442        // Should succeed with result from node 2
1443        assert!(
1444            result.is_ok(),
1445            "Expected request to succeed with alternative peer"
1446        );
1447
1448        // Verify timing: calls should be staggered, not sequential
1449        let times = call_times.lock().await;
1450        // Can't test exactly 2 b/c we sleep _inside_ the operation and increase right at the start of it.
1451        assert!(
1452            times.len() >= 2,
1453            "Should have tried at least 2 peers, got {}",
1454            times.len()
1455        );
1456
1457        // First call should be at ~0ms
1458        assert!(
1459            times[0].1.as_millis() < 50,
1460            "First peer should be called immediately, was called at {}ms",
1461            times[0].1.as_millis()
1462        );
1463
1464        // Second call should start immediately after first fails (aggressive retry)
1465        // When node 0 fails immediately, we immediately start node 1
1466        if times.len() > 1 {
1467            let delay = times[1].1.as_millis();
1468            assert!(
1469                delay < 50,
1470                "Second peer should be called immediately on first failure, got {delay}ms"
1471            );
1472        }
1473
1474        // Total time should be significantly less than sequential (which would be
1475        // ~650ms: 200ms + 200ms + 50ms + 200ms). With parallel staggered retry:
1476        // node0 fails immediately, node1 starts immediately, the next delay is
1477        // 200ms (peer_index=2), node2 starts at ~200ms and succeeds at ~250ms.
1478        let total_time = Instant::now().duration_since(start_time).as_millis();
1479        assert!(
1480            total_time < 500,
1481            "Total time should be less than 500ms (sequential would be ~650ms), got {total_time}ms"
1482        );
1483    }
1484}