Skip to main content

linera_core/client/requests_scheduler/
scheduler.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::BTreeMap,
6    future::Future,
7    sync::{
8        atomic::{AtomicU32, Ordering},
9        Arc,
10    },
11};
12
13use custom_debug_derive::Debug;
14use futures::stream::{FuturesUnordered, StreamExt};
15use linera_base::{
16    crypto::ValidatorPublicKey,
17    data_types::{Blob, BlobContent, BlockHeight},
18    identifiers::{BlobId, ChainId},
19    time::{Duration, Instant},
20};
21use linera_chain::types::ConfirmedBlockCertificate;
22use rand::distributions::{Distribution, WeightedIndex};
23use tracing::{instrument, warn};
24
25use super::{
26    cache::{RequestsCache, SubsumingKey},
27    in_flight_tracker::{InFlightMatch, InFlightTracker},
28    node_info::NodeInfo,
29    request::{RequestKey, RequestResult},
30    scoring::ScoringWeights,
31};
32use crate::{
33    client::{
34        communicate_concurrently,
35        requests_scheduler::{in_flight_tracker::Subscribed, request::Cacheable},
36        RequestsSchedulerConfig,
37    },
38    environment::Environment,
39    node::{NodeError, ValidatorNode},
40    remote_node::RemoteNode,
41};
42
43#[cfg(with_metrics)]
44pub(super) mod metrics {
45    use std::sync::LazyLock;
46
47    use linera_base::prometheus_util::{
48        exponential_bucket_latencies, register_histogram_vec, register_int_counter,
49        register_int_counter_vec,
50    };
51    use prometheus::{HistogramVec, IntCounter, IntCounterVec};
52
53    /// Histogram of response times per validator (in milliseconds)
54    pub(super) static VALIDATOR_RESPONSE_TIME: LazyLock<HistogramVec> = LazyLock::new(|| {
55        register_histogram_vec(
56            "requests_scheduler_response_time_ms",
57            "Response time for requests to validators in milliseconds",
58            &["validator"],
59            exponential_bucket_latencies(10000.0), // up to 10 seconds
60        )
61    });
62
63    /// Counter of total requests made to each validator
64    pub(super) static VALIDATOR_REQUEST_TOTAL: LazyLock<IntCounterVec> = LazyLock::new(|| {
65        register_int_counter_vec(
66            "requests_scheduler_request_total",
67            "Total number of requests made to each validator",
68            &["validator"],
69        )
70    });
71
72    /// Counter of successful requests per validator
73    pub(super) static VALIDATOR_REQUEST_SUCCESS: LazyLock<IntCounterVec> = LazyLock::new(|| {
74        register_int_counter_vec(
75            "requests_scheduler_request_success",
76            "Number of successful requests to each validator",
77            &["validator"],
78        )
79    });
80
81    /// Counter for requests that were resolved from the response cache.
82    pub(super) static REQUEST_CACHE_DEDUPLICATION: LazyLock<IntCounter> = LazyLock::new(|| {
83        register_int_counter(
84            "requests_scheduler_request_deduplication_total",
85            "Number of requests that were deduplicated by finding the result in the cache.",
86        )
87    });
88
89    /// Counter for requests that were served from cache
90    pub static REQUEST_CACHE_HIT: LazyLock<IntCounter> = LazyLock::new(|| {
91        register_int_counter(
92            "requests_scheduler_request_cache_hit_total",
93            "Number of requests that were served from cache",
94        )
95    });
96}
97
98/// Manages a pool of validator nodes with intelligent load balancing and performance tracking.
99///
100/// The `RequestsScheduler` maintains performance metrics for each validator node using
101/// Exponential Moving Averages (EMA) and uses these metrics to make intelligent routing
102/// decisions. It prevents node overload through request capacity limits and automatically
103/// retries failed requests on alternative nodes.
104///
105/// # Examples
106///
107/// ```ignore
108/// // Create with default configuration (balanced scoring)
109/// let manager = RequestsScheduler::new(validator_nodes);
110///
111/// // Create with custom configuration prioritizing low latency
112/// let latency_weights = ScoringWeights {
113///     latency: 0.6,
114///     success: 0.3,
115///     load: 0.1,
116/// };
117/// let manager = RequestsScheduler::with_config(
118///     validator_nodes,
119///     latency_weights,               // custom scoring weights
120///     0.2,                           // higher alpha for faster adaptation
121///     3000.0,                        // max expected latency (3 seconds)
122///     Duration::from_secs(60),       // 60 second cache TTL
123///     200,                           // cache up to 200 entries
124///     Duration::from_millis(200),    // max request TTL
125///     Duration::from_millis(150),    // retry delay
126/// );
127/// ```
128#[derive(Debug, Clone)]
129pub struct RequestsScheduler<Env: Environment> {
130    /// Thread-safe map of validator nodes indexed by their public keys.
131    /// Each node is wrapped with EMA-based performance tracking information.
132    nodes: Arc<tokio::sync::RwLock<BTreeMap<ValidatorPublicKey, NodeInfo<Env>>>>,
133    /// Default scoring weights applied to new nodes.
134    weights: ScoringWeights,
135    /// Default EMA smoothing factor for new nodes.
136    alpha: f64,
137    /// Default maximum expected latency in milliseconds for score normalization.
138    max_expected_latency: f64,
139    /// Delay between starting requests to alternative peers.
140    retry_delay: Duration,
141    /// Tracks in-flight requests to deduplicate concurrent requests for the same data.
142    in_flight_tracker: InFlightTracker<RemoteNode<Env::ValidatorNode>>,
143    /// Cache of recently completed requests with their results and timestamps.
144    cache: RequestsCache<RequestKey, RequestResult>,
145}
146
147impl<Env: Environment> RequestsScheduler<Env> {
148    /// Creates a new `RequestsScheduler` with the provided configuration.
149    pub fn new(
150        nodes: impl IntoIterator<Item = RemoteNode<Env::ValidatorNode>>,
151        config: &RequestsSchedulerConfig,
152    ) -> Self {
153        Self::with_config(
154            nodes,
155            ScoringWeights::default(),
156            config.alpha,
157            config.max_accepted_latency_ms,
158            Duration::from_millis(config.cache_ttl_ms),
159            config.cache_max_size,
160            Duration::from_millis(config.max_request_ttl_ms),
161            Duration::from_millis(config.retry_delay_ms),
162        )
163    }
164
165    /// Creates a new `RequestsScheduler` with custom configuration.
166    ///
167    /// # Arguments
168    /// - `nodes`: Initial set of validator nodes
169    /// - `max_requests_per_node`: Maximum concurrent requests per node
170    /// - `weights`: Scoring weights for performance metrics
171    /// - `alpha`: EMA smoothing factor (0 < alpha < 1)
172    /// - `max_expected_latency_ms`: Maximum expected latency for score normalization
173    /// - `cache_ttl`: Time-to-live for cached responses
174    /// - `max_cache_size`: Maximum number of entries in the cache
175    /// - `max_request_ttl`: Maximum latency for an in-flight request before we stop deduplicating it
176    /// - `retry_delay_ms`: Delay in milliseconds between starting requests to different peers.
177    #[expect(clippy::too_many_arguments)]
178    pub fn with_config(
179        nodes: impl IntoIterator<Item = RemoteNode<Env::ValidatorNode>>,
180        weights: ScoringWeights,
181        alpha: f64,
182        max_expected_latency_ms: f64,
183        cache_ttl: Duration,
184        max_cache_size: usize,
185        max_request_ttl: Duration,
186        retry_delay: Duration,
187    ) -> Self {
188        assert!(alpha > 0.0 && alpha < 1.0, "Alpha must be in (0, 1) range");
189        Self {
190            nodes: Arc::new(tokio::sync::RwLock::new(
191                nodes
192                    .into_iter()
193                    .map(|node| {
194                        (
195                            node.public_key,
196                            NodeInfo::with_config(node, weights, alpha, max_expected_latency_ms),
197                        )
198                    })
199                    .collect(),
200            )),
201            weights,
202            alpha,
203            max_expected_latency: max_expected_latency_ms,
204            retry_delay,
205            in_flight_tracker: InFlightTracker::new(max_request_ttl),
206            cache: RequestsCache::new(cache_ttl, max_cache_size),
207        }
208    }
209
210    /// Executes an operation with an automatically selected peer, handling deduplication,
211    /// tracking, and peer selection.
212    ///
213    /// This method provides a high-level API for executing operations against remote nodes
214    /// while leveraging the [`RequestsScheduler`]'s intelligent peer selection, performance tracking,
215    /// and request deduplication capabilities.
216    ///
217    /// # Type Parameters
218    /// - `R`: The inner result type (what the operation returns on success)
219    /// - `F`: The async closure type that takes a `RemoteNode` and returns a future
220    /// - `Fut`: The future type returned by the closure
221    ///
222    /// # Arguments
223    /// - `key`: Unique identifier for request deduplication
224    /// - `operation`: Async closure that takes a selected peer and performs the operation
225    ///
226    /// # Returns
227    /// The result from the operation, potentially from cache or a deduplicated in-flight request
228    ///
229    /// # Example
230    /// ```ignore
231    /// let result: Result<Vec<ConfirmedBlockCertificate>, NodeError> = requests_scheduler
232    ///     .with_best(
233    ///         RequestKey::Certificates { chain_id, start, limit },
234    ///         |peer| async move {
235    ///             peer.download_certificates_from(chain_id, start, limit).await
236    ///         }
237    ///     )
238    ///     .await;
239    /// ```
240    #[allow(unused)]
241    async fn with_best<R, F, Fut>(&self, key: RequestKey, operation: F) -> Result<R, NodeError>
242    where
243        R: Cacheable + Clone + Send + 'static,
244        F: Fn(RemoteNode<Env::ValidatorNode>) -> Fut,
245        Fut: Future<Output = Result<R, NodeError>> + 'static,
246    {
247        // Select the best available peer
248        let peer = self
249            .select_best_peer()
250            .await
251            .ok_or_else(|| NodeError::WorkerError {
252                error: "No validators available".to_string(),
253            })?;
254        self.with_peer(key, peer, operation).await
255    }
256
257    /// Executes an operation with a specific peer.
258    ///
259    /// Similar to [`with_best`](Self::with_best), but uses the provided peer directly
260    /// instead of selecting the best available peer. This is useful when you need to
261    /// query a specific validator node.
262    ///
263    /// # Type Parameters
264    /// - `R`: The inner result type (what the operation returns on success)
265    /// - `F`: The async closure type that takes a `RemoteNode` and returns a future
266    /// - `Fut`: The future type returned by the closure
267    ///
268    /// # Arguments
269    /// - `key`: Unique identifier for request deduplication
270    /// - `peer`: The specific peer to use for the operation
271    /// - `operation`: Async closure that takes the peer and performs the operation
272    ///
273    /// # Returns
274    /// The result from the operation, potentially from cache or a deduplicated in-flight request
275    async fn with_peer<R, F, Fut>(
276        &self,
277        key: RequestKey,
278        peer: RemoteNode<Env::ValidatorNode>,
279        operation: F,
280    ) -> Result<R, NodeError>
281    where
282        R: Cacheable + Clone + Send + 'static,
283        F: Fn(RemoteNode<Env::ValidatorNode>) -> Fut,
284        Fut: Future<Output = Result<R, NodeError>> + 'static,
285    {
286        self.add_peer(peer.clone()).await;
287        self.in_flight_tracker
288            .add_alternative_peer(&key, peer.clone())
289            .await;
290
291        // Clone the nodes Arc so we can move it into the closure
292        let nodes = self.nodes.clone();
293        self.deduplicated_request(key, peer, move |peer| {
294            let fut = operation(peer.clone());
295            let nodes = nodes.clone();
296            async move { Self::track_request(nodes, peer, fut).await }
297        })
298        .await
299    }
300
301    #[instrument(level = "trace", skip_all)]
302    async fn download_blob(
303        &self,
304        peers: &[RemoteNode<Env::ValidatorNode>],
305        blob_id: BlobId,
306        timeout: Duration,
307    ) -> Result<Option<Blob>, NodeError> {
308        let key = RequestKey::Blob(blob_id);
309        communicate_concurrently(
310            peers,
311            async move |peer| {
312                self.with_peer(key, peer, move |peer| async move {
313                    peer.download_blob(blob_id).await
314                })
315                .await
316            },
317            timeout,
318        )
319        .await
320        .map_err(|errors| {
321            for (validator, error) in &errors {
322                warn!(
323                    %validator,
324                    %blob_id,
325                    %error,
326                    "failed to download blob from validator",
327                );
328            }
329            errors
330                .into_iter()
331                .last()
332                .map_or(NodeError::NoValidators, |(_, error)| error)
333        })
334    }
335
336    /// Downloads the blobs with the given IDs. This is done in one concurrent task per blob.
337    /// Uses intelligent peer selection based on scores and load balancing.
338    /// Returns `None` if it couldn't find all blobs.
339    #[instrument(level = "trace", skip_all)]
340    pub async fn download_blobs(
341        &self,
342        peers: &[RemoteNode<Env::ValidatorNode>],
343        blob_ids: &[BlobId],
344        timeout: Duration,
345    ) -> Result<Option<Vec<Blob>>, NodeError> {
346        let mut stream = blob_ids
347            .iter()
348            .map(|blob_id| self.download_blob(peers, *blob_id, timeout))
349            .collect::<FuturesUnordered<_>>();
350
351        let mut blobs = Vec::new();
352        while let Some(maybe_blob) = stream.next().await {
353            blobs.push(maybe_blob?);
354        }
355        Ok(blobs.into_iter().collect::<Option<Vec<_>>>())
356    }
357
358    pub async fn download_certificates(
359        &self,
360        peer: &RemoteNode<Env::ValidatorNode>,
361        chain_id: ChainId,
362        start: BlockHeight,
363        limit: u64,
364    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
365        let heights = (start.0..start.0 + limit)
366            .map(BlockHeight)
367            .collect::<Vec<_>>();
368        self.with_peer(
369            RequestKey::Certificates {
370                chain_id,
371                heights: heights.clone(),
372            },
373            peer.clone(),
374            move |peer| {
375                let heights = heights.clone();
376                async move {
377                    Box::pin(peer.download_certificates_by_heights(chain_id, heights)).await
378                }
379            },
380        )
381        .await
382    }
383
384    /// Downloads certificates from any of the given validators, using staggered
385    /// concurrent requests so that slow validators are quickly bypassed.
386    pub async fn download_certificates_from_validators(
387        &self,
388        peers: &[RemoteNode<Env::ValidatorNode>],
389        chain_id: ChainId,
390        start: BlockHeight,
391        limit: u64,
392        timeout: Duration,
393    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
394        if peers.is_empty() {
395            return Err(NodeError::NoValidators);
396        }
397        let heights = (start.0..start.0 + limit)
398            .map(BlockHeight)
399            .collect::<Vec<_>>();
400        let key = RequestKey::Certificates {
401            chain_id,
402            heights: heights.clone(),
403        };
404        communicate_concurrently(
405            peers,
406            async move |peer| {
407                self.with_peer(key, peer, move |peer| {
408                    let heights = heights.clone();
409                    async move {
410                        Box::pin(peer.download_certificates_by_heights(chain_id, heights)).await
411                    }
412                })
413                .await
414            },
415            timeout,
416        )
417        .await
418        .map_err(|errors| {
419            for (validator, error) in &errors {
420                warn!(
421                    %validator,
422                    %chain_id,
423                    %error,
424                    "failed to download certificates from validator",
425                );
426            }
427            errors
428                .into_iter()
429                .last()
430                .map_or(NodeError::NoValidators, |(_, error)| error)
431        })
432    }
433
434    pub async fn download_certificates_by_heights(
435        &self,
436        peer: &RemoteNode<Env::ValidatorNode>,
437        chain_id: ChainId,
438        heights: Vec<BlockHeight>,
439    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
440        self.with_peer(
441            RequestKey::Certificates {
442                chain_id,
443                heights: heights.clone(),
444            },
445            peer.clone(),
446            move |peer| {
447                let heights = heights.clone();
448                async move {
449                    peer.download_certificates_by_heights(chain_id, heights)
450                        .await
451                }
452            },
453        )
454        .await
455    }
456
457    pub async fn download_certificate_for_blob(
458        &self,
459        peer: &RemoteNode<Env::ValidatorNode>,
460        blob_id: BlobId,
461    ) -> Result<ConfirmedBlockCertificate, NodeError> {
462        self.with_peer(
463            RequestKey::CertificateForBlob(blob_id),
464            peer.clone(),
465            move |peer| async move { peer.download_certificate_for_blob(blob_id).await },
466        )
467        .await
468    }
469
470    pub async fn download_pending_blob(
471        &self,
472        peer: &RemoteNode<Env::ValidatorNode>,
473        chain_id: ChainId,
474        blob_id: BlobId,
475    ) -> Result<BlobContent, NodeError> {
476        self.with_peer(
477            RequestKey::PendingBlob { chain_id, blob_id },
478            peer.clone(),
479            move |peer| async move { peer.node.download_pending_blob(chain_id, blob_id).await },
480        )
481        .await
482    }
483
484    /// Returns the alternative peers registered for an in-flight request, if any.
485    ///
486    /// This can be used to retry a failed request with alternative data sources
487    /// that were registered during request deduplication.
488    pub async fn get_alternative_peers(
489        &self,
490        key: &RequestKey,
491    ) -> Option<Vec<RemoteNode<Env::ValidatorNode>>> {
492        self.in_flight_tracker.get_alternative_peers(key).await
493    }
494
495    /// Wraps a request operation with performance tracking and capacity management.
496    ///
497    /// This method:
498    /// 1. Measures response time
499    /// 2. Updates node metrics based on success/failure
500    ///
501    /// # Arguments
502    /// - `nodes`: Arc to the nodes map for updating metrics
503    /// - `peer`: The remote node to track metrics for
504    /// - `operation`: Future that performs the actual request
505    ///
506    /// # Behavior
507    /// Executes the provided future and tracks metrics for the given peer.
508    async fn track_request<T, Fut>(
509        nodes: Arc<tokio::sync::RwLock<BTreeMap<ValidatorPublicKey, NodeInfo<Env>>>>,
510        peer: RemoteNode<Env::ValidatorNode>,
511        operation: Fut,
512    ) -> Result<T, NodeError>
513    where
514        Fut: Future<Output = Result<T, NodeError>> + 'static,
515    {
516        let start_time = Instant::now();
517        let public_key = peer.public_key;
518
519        // Execute the operation
520        let result = operation.await;
521
522        // Update metrics and release slot
523        #[expect(
524            clippy::cast_possible_truncation,
525            reason = "elapsed millis fits in u64 for any realistic measurement window"
526        )]
527        let response_time_ms = start_time.elapsed().as_millis() as u64;
528        let is_success = result.is_ok();
529        {
530            let mut nodes_guard = nodes.write().await;
531            if let Some(info) = nodes_guard.get_mut(&public_key) {
532                info.update_metrics(is_success, response_time_ms);
533                let score = info.calculate_score().await;
534                tracing::trace!(
535                    node = %public_key,
536                    address = %info.node.node.address(),
537                    success = %is_success,
538                    response_time_ms = %response_time_ms,
539                    score = %score,
540                    total_requests = %info.total_requests(),
541                    "Request completed"
542                );
543            }
544        }
545
546        // Record Prometheus metrics
547        #[cfg(with_metrics)]
548        {
549            let validator_name = public_key.to_string();
550            metrics::VALIDATOR_RESPONSE_TIME
551                .with_label_values(&[&validator_name])
552                .observe(response_time_ms as f64);
553            metrics::VALIDATOR_REQUEST_TOTAL
554                .with_label_values(&[&validator_name])
555                .inc();
556            if is_success {
557                metrics::VALIDATOR_REQUEST_SUCCESS
558                    .with_label_values(&[&validator_name])
559                    .inc();
560            }
561        }
562
563        result
564    }
565
566    /// Deduplicates concurrent requests for the same data.
567    ///
568    /// If a request for the same key is already in flight, this method waits for
569    /// the existing request to complete and returns its result. Otherwise, it
570    /// executes the operation and broadcasts the result to all waiting callers.
571    ///
572    /// This method also performs **subsumption-based deduplication**: if a larger
573    /// request that contains all the data needed by this request is already cached
574    /// or in flight, we can extract the subset result instead of making a new request.
575    ///
576    /// # Arguments
577    /// - `key`: Unique identifier for the request
578    /// - `operation`: Async closure that performs the actual request
579    ///
580    /// # Returns
581    /// The result from either the in-flight request or the newly executed operation
582    async fn deduplicated_request<T, F, Fut>(
583        &self,
584        key: RequestKey,
585        peer: RemoteNode<Env::ValidatorNode>,
586        operation: F,
587    ) -> Result<T, NodeError>
588    where
589        T: Cacheable + Clone + Send + 'static,
590        F: Fn(RemoteNode<Env::ValidatorNode>) -> Fut,
591        Fut: Future<Output = Result<T, NodeError>> + 'static,
592    {
593        // Check cache for exact or subsuming match
594        if let Some(result) = self.cache.get(&key).await {
595            return Ok(result);
596        }
597
598        // Check if there's an in-flight request (exact or subsuming)
599        if let Some(in_flight_match) = self.in_flight_tracker.try_subscribe(&key).await {
600            match in_flight_match {
601                InFlightMatch::Exact(Subscribed(mut receiver)) => {
602                    tracing::trace!(
603                        ?key,
604                        "deduplicating request (exact match) - joining existing in-flight request"
605                    );
606                    #[cfg(with_metrics)]
607                    metrics::REQUEST_CACHE_DEDUPLICATION.inc();
608                    // Wait for result from existing request
609                    match receiver.recv().await {
610                        Ok(result) => match result.as_ref().clone() {
611                            Ok(res) => match T::try_from(res) {
612                                Ok(converted) => {
613                                    tracing::trace!(
614                                        ?key,
615                                        "received result from deduplicated in-flight request"
616                                    );
617                                    return Ok(converted);
618                                }
619                                Err(_) => {
620                                    tracing::warn!(
621                                        ?key,
622                                        "failed to convert result from deduplicated in-flight request, will execute independently"
623                                    );
624                                }
625                            },
626                            Err(error) => {
627                                tracing::trace!(
628                                    ?key,
629                                    %error,
630                                    "in-flight request failed",
631                                );
632                                // Fall through to execute a new request
633                            }
634                        },
635                        Err(_) => {
636                            tracing::trace!(?key, "in-flight request sender dropped");
637                            // Fall through to execute a new request
638                        }
639                    }
640                }
641                InFlightMatch::Subsuming {
642                    key: subsuming_key,
643                    outcome: Subscribed(mut receiver),
644                } => {
645                    tracing::trace!(
646                    ?key,
647                    subsumed_by = ?subsuming_key,
648                        "deduplicating request (subsumption) - joining larger in-flight request"
649                    );
650                    #[cfg(with_metrics)]
651                    metrics::REQUEST_CACHE_DEDUPLICATION.inc();
652                    // Wait for result from the subsuming request
653                    match receiver.recv().await {
654                        Ok(result) => {
655                            match result.as_ref() {
656                                Ok(res) => {
657                                    if let Some(extracted) =
658                                        key.try_extract_result(&subsuming_key, res)
659                                    {
660                                        tracing::trace!(
661                                            ?key,
662                                            "extracted subset result from larger in-flight request"
663                                        );
664                                        match T::try_from(extracted) {
665                                            Ok(converted) => return Ok(converted),
666                                            Err(_) => {
667                                                tracing::trace!(
668                                                    ?key,
669                                                    "failed to convert extracted result, will execute independently"
670                                                );
671                                            }
672                                        }
673                                    } else {
674                                        // Extraction failed, fall through to execute our own request
675                                        tracing::trace!(
676                                            ?key,
677                                            "failed to extract from subsuming request, will execute independently"
678                                        );
679                                    }
680                                }
681                                Err(error) => {
682                                    tracing::trace!(
683                                        ?key,
684                                        ?error,
685                                        "subsuming in-flight request failed",
686                                    );
687                                    // Fall through to execute our own request
688                                }
689                            }
690                        }
691                        Err(_) => {
692                            tracing::trace!(?key, "subsuming in-flight request sender dropped");
693                        }
694                    }
695                }
696            }
697        };
698
699        // Create new in-flight entry for this request
700        self.in_flight_tracker.insert_new(key.clone()).await;
701
702        // Remove the peer we're about to use from alternatives (it shouldn't retry with itself)
703        self.in_flight_tracker
704            .remove_alternative_peer(&key, &peer)
705            .await;
706
707        // Execute request with staggered parallel - first peer starts immediately,
708        // alternatives are tried after stagger delays (even if first peer is slow but not failing)
709        tracing::trace!(?key, ?peer, "executing staggered parallel request");
710        let result = self
711            .try_staggered_parallel(&key, peer, &operation, self.retry_delay)
712            .await;
713
714        let result_for_broadcast: Result<RequestResult, NodeError> = result.clone().map(Into::into);
715        let shared_result = Arc::new(result_for_broadcast);
716
717        // Broadcast result and clean up
718        self.in_flight_tracker
719            .complete_and_broadcast(&key, shared_result.clone())
720            .await;
721
722        if let Ok(success) = shared_result.as_ref() {
723            self.cache
724                .store(key.clone(), Arc::new(success.clone()))
725                .await;
726        }
727        result
728    }
729
730    /// Tries alternative peers in staggered parallel fashion.
731    ///
732    /// Launches requests starting with the first peer, then dynamically pops alternative peers
733    /// with a stagger delay between each. Returns the first successful result. This provides
734    /// a balance between sequential (slow) and fully parallel (wasteful) approaches.
735    ///
736    /// # Arguments
737    /// - `key`: The request key (for logging and popping alternatives)
738    /// - `first_peer`: The initial peer to try first (at time 0)
739    /// - `operation`: The operation to execute on each peer
740    /// - `staggered_delay_ms`: Delay in milliseconds between starting each subsequent peer
741    ///
742    /// # Returns
743    /// The first successful result, or the last error if all fail
744    async fn try_staggered_parallel<T, F, Fut>(
745        &self,
746        key: &RequestKey,
747        first_peer: RemoteNode<Env::ValidatorNode>,
748        operation: &F,
749        staggered_delay: Duration,
750    ) -> Result<T, NodeError>
751    where
752        T: 'static,
753        F: Fn(RemoteNode<Env::ValidatorNode>) -> Fut,
754        Fut: Future<Output = Result<T, NodeError>> + 'static,
755    {
756        use futures::{
757            future::{select, Either},
758            stream::{FuturesUnordered, StreamExt},
759        };
760        use linera_base::time::timer::sleep;
761
762        let mut futures: FuturesUnordered<Fut> = FuturesUnordered::new();
763        let peer_index = AtomicU32::new(0);
764
765        let push_future = |futures: &mut FuturesUnordered<Fut>, fut: Fut| {
766            futures.push(fut);
767            peer_index.fetch_add(1, Ordering::SeqCst)
768        };
769
770        // Start the first peer immediately (no delay)
771        push_future(&mut futures, operation(first_peer));
772
773        let mut last_error = NodeError::UnexpectedMessage;
774        let mut next_delay = Box::pin(sleep(staggered_delay * peer_index.load(Ordering::SeqCst)));
775
776        // Phase 1: Race between futures completion and delays (while alternatives might exist)
777        loop {
778            // Exit condition: no futures running and can't start any more
779            if futures.is_empty() {
780                if let Some(peer) = self.in_flight_tracker.pop_alternative_peer(key).await {
781                    push_future(&mut futures, operation(peer));
782                    next_delay =
783                        Box::pin(sleep(staggered_delay * peer_index.load(Ordering::SeqCst)));
784                } else {
785                    // No futures and no alternatives - we're done
786                    break;
787                }
788            }
789
790            let next_result = Box::pin(futures.next());
791
792            match select(next_result, next_delay).await {
793                // A request completed
794                Either::Left((Some(result), delay_fut)) => {
795                    // Keep the delay future for next iteration
796                    next_delay = delay_fut;
797
798                    match result {
799                        Ok(value) => {
800                            tracing::trace!(?key, "staggered parallel request succeeded");
801                            return Ok(value);
802                        }
803                        Err(error) => {
804                            tracing::debug!(
805                                ?key,
806                                %error,
807                                "staggered parallel request attempt failed"
808                            );
809                            last_error = error;
810
811                            // Immediately try next alternative
812                            if let Some(peer) =
813                                self.in_flight_tracker.pop_alternative_peer(key).await
814                            {
815                                push_future(&mut futures, operation(peer));
816                                next_delay = Box::pin(sleep(
817                                    staggered_delay * peer_index.load(Ordering::SeqCst),
818                                ));
819                            }
820                        }
821                    }
822                }
823                // All running futures completed
824                Either::Left((None, delay_fut)) => {
825                    // Restore the delay future
826                    next_delay = delay_fut;
827                    // Will check at top of loop if we should try more alternatives
828                    continue;
829                }
830                // Delay elapsed - try to start next peer
831                Either::Right((_, _)) => {
832                    if let Some(peer) = self.in_flight_tracker.pop_alternative_peer(key).await {
833                        push_future(&mut futures, operation(peer));
834                        next_delay =
835                            Box::pin(sleep(staggered_delay * peer_index.load(Ordering::SeqCst)));
836                    } else {
837                        // No more alternatives - break out to phase 2
838                        break;
839                    }
840                }
841            }
842        }
843
844        // Phase 2: No more alternatives, just wait for remaining futures to complete
845        while let Some(result) = futures.next().await {
846            match result {
847                Ok(value) => {
848                    tracing::trace!(?key, "staggered parallel request succeeded");
849                    return Ok(value);
850                }
851                Err(error) => {
852                    tracing::debug!(
853                        ?key,
854                        %error,
855                        "staggered parallel request attempt failed"
856                    );
857                    last_error = error;
858                }
859            }
860        }
861
862        // All attempts failed
863        tracing::debug!(?key, "all staggered parallel retry attempts failed");
864        Err(last_error)
865    }
866
867    /// Returns all peers ordered by their score (highest first).
868    ///
869    /// Only includes peers that can currently accept requests. Each peer is paired
870    /// with its calculated score based on latency, success rate, and availability.
871    ///
872    /// # Returns
873    /// A vector of `(score, peer)` tuples sorted by score in descending order.
874    /// Returns an empty vector if no peers can accept requests.
875    async fn peers_by_score(&self) -> Vec<(f64, RemoteNode<Env::ValidatorNode>)> {
876        let nodes = self.nodes.read().await;
877
878        // Filter nodes that can accept requests and calculate their scores
879        let mut scored_nodes = Vec::new();
880        for info in nodes.values() {
881            let score = info.calculate_score().await;
882            scored_nodes.push((score, info.node.clone()));
883        }
884
885        // Sort by score (highest first)
886        scored_nodes.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
887
888        scored_nodes
889    }
890
891    /// Selects the best available peer using weighted random selection from top performers.
892    ///
893    /// This method:
894    /// 1. Sorts nodes by performance score
895    /// 2. Performs weighted random selection from the top 3 performers
896    ///
897    /// This approach balances between choosing high-performing nodes and distributing
898    /// load across multiple validators to avoid creating hotspots.
899    ///
900    /// Returns `None` if no nodes are available.
901    async fn select_best_peer(&self) -> Option<RemoteNode<Env::ValidatorNode>> {
902        let scored_nodes = self.peers_by_score().await;
903
904        if scored_nodes.is_empty() {
905            return None;
906        }
907
908        // Use weighted random selection from top performers (top 3 or all if less)
909        let top_count = scored_nodes.len().min(3);
910        let top_nodes = &scored_nodes[..top_count];
911
912        // Create weights based on normalized scores
913        // Add small epsilon to prevent zero weights
914        let weights: Vec<f64> = top_nodes.iter().map(|(score, _)| score.max(0.01)).collect();
915
916        if let Ok(dist) = WeightedIndex::new(&weights) {
917            let mut rng = rand::thread_rng();
918            let index = dist.sample(&mut rng);
919            Some(top_nodes[index].1.clone())
920        } else {
921            // Fallback to the best node if weights are invalid
922            tracing::warn!("failed to create weighted distribution, defaulting to best node");
923            Some(scored_nodes[0].1.clone())
924        }
925    }
926
927    /// Adds a new peer to the manager if it doesn't already exist.
928    async fn add_peer(&self, node: RemoteNode<Env::ValidatorNode>) {
929        let mut nodes = self.nodes.write().await;
930        let public_key = node.public_key;
931        nodes.entry(public_key).or_insert_with(|| {
932            NodeInfo::with_config(node, self.weights, self.alpha, self.max_expected_latency)
933        });
934    }
935}
936
937#[cfg(test)]
938mod tests {
939    use std::sync::{
940        atomic::{AtomicUsize, Ordering},
941        Arc,
942    };
943
944    use linera_base::{
945        crypto::{CryptoHash, InMemorySigner},
946        data_types::BlockHeight,
947        identifiers::ChainId,
948        time::Duration,
949    };
950    use linera_chain::types::ConfirmedBlockCertificate;
951    use tokio::sync::oneshot;
952
953    use super::{super::request::RequestKey, *};
954    use crate::{
955        client::requests_scheduler::{MAX_REQUEST_TTL_MS, STAGGERED_DELAY_MS},
956        node::NodeError,
957    };
958
959    type TestEnvironment = crate::environment::Test;
960
961    /// Helper function to create a test RequestsScheduler with custom configuration
962    fn create_test_manager(
963        in_flight_timeout: Duration,
964        cache_ttl: Duration,
965    ) -> Arc<RequestsScheduler<TestEnvironment>> {
966        let mut manager = RequestsScheduler::with_config(
967            vec![], // No actual nodes needed for these tests
968            ScoringWeights::default(),
969            0.1,
970            1000.0,
971            cache_ttl,
972            100,
973            in_flight_timeout,
974            Duration::from_millis(STAGGERED_DELAY_MS),
975        );
976        // Replace the tracker with one using the custom timeout
977        manager.in_flight_tracker = InFlightTracker::new(in_flight_timeout);
978        Arc::new(manager)
979    }
980
981    /// Helper function to create a test request key
982    fn test_key() -> RequestKey {
983        RequestKey::Certificates {
984            chain_id: ChainId(CryptoHash::test_hash("test")),
985            heights: vec![BlockHeight(0), BlockHeight(1)],
986        }
987    }
988
989    /// Helper function to create a dummy peer for testing
990    fn dummy_peer() -> RemoteNode<<TestEnvironment as Environment>::ValidatorNode> {
991        use crate::test_utils::{MemoryStorageBuilder, TestBuilder};
992
993        // Create a minimal test builder to get a validator node
994        let mut builder = futures::executor::block_on(async {
995            TestBuilder::new(
996                MemoryStorageBuilder::default(),
997                1,
998                0,
999                linera_base::crypto::InMemorySigner::new(None),
1000            )
1001            .await
1002            .unwrap()
1003        });
1004
1005        let node = builder.node(0);
1006        let public_key = node.name();
1007        RemoteNode { public_key, node }
1008    }
1009
1010    #[tokio::test]
1011    async fn test_cache_hit_returns_cached_result() {
1012        // Create a manager with standard cache TTL
1013        let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60));
1014        let key = test_key();
1015        let peer = dummy_peer();
1016
1017        // Track how many times the operation is executed
1018        let execution_count = Arc::new(AtomicUsize::new(0));
1019        let execution_count_clone = execution_count.clone();
1020
1021        // First call - should execute the operation and cache the result
1022        let result1: Result<Vec<ConfirmedBlockCertificate>, NodeError> = manager
1023            .deduplicated_request(key.clone(), peer.clone(), |_| {
1024                let count = execution_count_clone.clone();
1025                async move {
1026                    count.fetch_add(1, Ordering::SeqCst);
1027                    Ok(vec![])
1028                }
1029            })
1030            .await;
1031
1032        assert!(result1.is_ok());
1033        assert_eq!(execution_count.load(Ordering::SeqCst), 1);
1034
1035        // Second call - should return cached result without executing the operation
1036        let execution_count_clone2 = execution_count.clone();
1037        let result2: Result<Vec<ConfirmedBlockCertificate>, NodeError> = manager
1038            .deduplicated_request(key.clone(), peer.clone(), |_| {
1039                let count = execution_count_clone2.clone();
1040                async move {
1041                    count.fetch_add(1, Ordering::SeqCst);
1042                    Ok(vec![])
1043                }
1044            })
1045            .await;
1046
1047        assert_eq!(result1, result2);
1048        // Operation should still only have been executed once (cache hit)
1049        assert_eq!(execution_count.load(Ordering::SeqCst), 1);
1050    }
1051
1052    #[tokio::test]
1053    async fn test_in_flight_request_deduplication() {
1054        let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60));
1055        let key = test_key();
1056        let peer = dummy_peer();
1057
1058        // Track how many times the operation is executed
1059        let execution_count = Arc::new(AtomicUsize::new(0));
1060
1061        // Create a channel to control when the first operation completes
1062        let (tx, rx) = oneshot::channel();
1063        let rx = Arc::new(tokio::sync::Mutex::new(Some(rx)));
1064
1065        // Start first request (will be slow - waits for signal)
1066        let manager_clone = Arc::clone(&manager);
1067        let key_clone = key.clone();
1068        let execution_count_clone = execution_count.clone();
1069        let rx_clone = Arc::clone(&rx);
1070        let peer_clone = peer.clone();
1071        let first_request = tokio::spawn(async move {
1072            manager_clone
1073                .deduplicated_request(key_clone, peer_clone, |_| {
1074                    let count = execution_count_clone.clone();
1075                    let rx = Arc::clone(&rx_clone);
1076                    async move {
1077                        count.fetch_add(1, Ordering::SeqCst);
1078                        // Wait for signal before completing
1079                        if let Some(receiver) = rx.lock().await.take() {
1080                            receiver.await.unwrap();
1081                        }
1082                        Ok(vec![])
1083                    }
1084                })
1085                .await
1086        });
1087
1088        // Start second request - should deduplicate and wait for the first
1089        let execution_count_clone2 = execution_count.clone();
1090        let second_request = tokio::spawn(async move {
1091            manager
1092                .deduplicated_request(key, peer, |_| {
1093                    let count = execution_count_clone2.clone();
1094                    async move {
1095                        count.fetch_add(1, Ordering::SeqCst);
1096                        Ok(vec![])
1097                    }
1098                })
1099                .await
1100        });
1101
1102        // Signal the first request to complete
1103        tx.send(()).unwrap();
1104
1105        // Both requests should complete successfully
1106        let result1: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1107            first_request.await.unwrap();
1108        let result2: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1109            second_request.await.unwrap();
1110
1111        assert!(result1.is_ok());
1112        assert_eq!(result1, result2);
1113
1114        // Operation should only have been executed once (deduplication worked)
1115        assert_eq!(execution_count.load(Ordering::SeqCst), 1);
1116    }
1117
1118    #[tokio::test]
1119    async fn test_multiple_subscribers_all_notified() {
1120        let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60));
1121        let key = test_key();
1122        let peer = dummy_peer();
1123
1124        // Track how many times the operation is executed
1125        let execution_count = Arc::new(AtomicUsize::new(0));
1126
1127        // Create a channel to control when the operation completes
1128        let (tx, rx) = oneshot::channel();
1129        let rx = Arc::new(tokio::sync::Mutex::new(Some(rx)));
1130
1131        // Start first request (will be slow - waits for signal)
1132        let manager_clone1 = Arc::clone(&manager);
1133        let key_clone1 = key.clone();
1134        let execution_count_clone = execution_count.clone();
1135        let rx_clone = Arc::clone(&rx);
1136        let peer_clone = peer.clone();
1137        let first_request = tokio::spawn(async move {
1138            manager_clone1
1139                .deduplicated_request(key_clone1, peer_clone, |_| {
1140                    let count = execution_count_clone.clone();
1141                    let rx = Arc::clone(&rx_clone);
1142                    async move {
1143                        count.fetch_add(1, Ordering::SeqCst);
1144                        if let Some(receiver) = rx.lock().await.take() {
1145                            receiver.await.unwrap();
1146                        }
1147                        Ok(vec![])
1148                    }
1149                })
1150                .await
1151        });
1152
1153        // Start multiple additional requests - all should deduplicate
1154        let mut handles = vec![];
1155        for _ in 0..5 {
1156            let manager_clone = Arc::clone(&manager);
1157            let key_clone = key.clone();
1158            let execution_count_clone = execution_count.clone();
1159            let peer_clone = peer.clone();
1160            let handle = tokio::spawn(async move {
1161                manager_clone
1162                    .deduplicated_request(key_clone, peer_clone, |_| {
1163                        let count = execution_count_clone.clone();
1164                        async move {
1165                            count.fetch_add(1, Ordering::SeqCst);
1166                            Ok(vec![])
1167                        }
1168                    })
1169                    .await
1170            });
1171            handles.push(handle);
1172        }
1173
1174        // Signal the first request to complete
1175        tx.send(()).unwrap();
1176
1177        // First request should complete successfully
1178        let result: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1179            first_request.await.unwrap();
1180        assert!(result.is_ok());
1181
1182        // All subscriber requests should also complete successfully
1183        for handle in handles {
1184            assert_eq!(handle.await.unwrap(), result);
1185        }
1186
1187        // Operation should only have been executed once (all requests were deduplicated)
1188        assert_eq!(execution_count.load(Ordering::SeqCst), 1);
1189    }
1190
1191    #[tokio::test]
1192    async fn test_timeout_triggers_new_request() {
1193        // Create a manager with a very short in-flight timeout
1194        let manager = create_test_manager(Duration::from_millis(50), Duration::from_secs(60));
1195
1196        let key = test_key();
1197        let peer = dummy_peer();
1198
1199        // Track how many times the operation is executed
1200        let execution_count = Arc::new(AtomicUsize::new(0));
1201
1202        // Create a channel to control when the first operation completes
1203        let (tx, rx) = oneshot::channel();
1204        let rx = Arc::new(tokio::sync::Mutex::new(Some(rx)));
1205
1206        // Start first request (will be slow - waits for signal)
1207        let manager_clone = Arc::clone(&manager);
1208        let key_clone = key.clone();
1209        let execution_count_clone = execution_count.clone();
1210        let rx_clone = Arc::clone(&rx);
1211        let peer_clone = peer.clone();
1212        let first_request = tokio::spawn(async move {
1213            manager_clone
1214                .deduplicated_request(key_clone, peer_clone, |_| {
1215                    let count = execution_count_clone.clone();
1216                    let rx = Arc::clone(&rx_clone);
1217                    async move {
1218                        count.fetch_add(1, Ordering::SeqCst);
1219                        if let Some(receiver) = rx.lock().await.take() {
1220                            receiver.await.unwrap();
1221                        }
1222                        Ok(vec![])
1223                    }
1224                })
1225                .await
1226        });
1227
1228        // Wait for the timeout to elapse
1229        tokio::time::sleep(Duration::from_millis(MAX_REQUEST_TTL_MS + 1)).await;
1230
1231        // Start second request - should NOT deduplicate because first request exceeded timeout
1232        let execution_count_clone2 = execution_count.clone();
1233        let second_request = tokio::spawn(async move {
1234            manager
1235                .deduplicated_request(key, peer, |_| {
1236                    let count = execution_count_clone2.clone();
1237                    async move {
1238                        count.fetch_add(1, Ordering::SeqCst);
1239                        Ok(vec![])
1240                    }
1241                })
1242                .await
1243        });
1244
1245        // Wait for second request to complete
1246        let result2: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1247            second_request.await.unwrap();
1248        assert!(result2.is_ok());
1249
1250        // Complete the first request
1251        tx.send(()).unwrap();
1252        let result1: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1253            first_request.await.unwrap();
1254        assert!(result1.is_ok());
1255
1256        // Operation should have been executed twice (timeout triggered new request)
1257        assert_eq!(execution_count.load(Ordering::SeqCst), 2);
1258    }
1259
1260    #[tokio::test]
1261    async fn test_alternative_peers_registered_on_deduplication() {
1262        use linera_base::identifiers::BlobType;
1263
1264        use crate::test_utils::{MemoryStorageBuilder, TestBuilder};
1265
1266        // Create a test environment with three validators
1267        let mut builder = TestBuilder::new(
1268            MemoryStorageBuilder::default(),
1269            3,
1270            0,
1271            InMemorySigner::new(None),
1272        )
1273        .await
1274        .unwrap();
1275
1276        // Get validator nodes
1277        let nodes: Vec<_> = (0..3)
1278            .map(|i| {
1279                let node = builder.node(i);
1280                let public_key = node.name();
1281                RemoteNode { public_key, node }
1282            })
1283            .collect();
1284
1285        // Create a RequestsScheduler
1286        let manager: Arc<RequestsScheduler<TestEnvironment>> =
1287            Arc::new(RequestsScheduler::with_config(
1288                nodes.clone(),
1289                ScoringWeights::default(),
1290                0.1,
1291                1000.0,
1292                Duration::from_secs(60),
1293                100,
1294                Duration::from_millis(MAX_REQUEST_TTL_MS),
1295                Duration::from_millis(STAGGERED_DELAY_MS),
1296            ));
1297
1298        let key = RequestKey::Blob(BlobId::new(
1299            CryptoHash::test_hash("test_blob"),
1300            BlobType::Data,
1301        ));
1302
1303        // Create a channel to control when first request completes
1304        let (tx, rx) = oneshot::channel();
1305        let rx = Arc::new(tokio::sync::Mutex::new(Some(rx)));
1306
1307        // Start first request with node 0 (will block until signaled)
1308        let manager_clone = Arc::clone(&manager);
1309        let node_clone = nodes[0].clone();
1310        let key_clone = key.clone();
1311        let rx_clone = Arc::clone(&rx);
1312        let first_request = tokio::spawn(async move {
1313            manager_clone
1314                .with_peer(key_clone, node_clone, move |_peer| {
1315                    let rx = Arc::clone(&rx_clone);
1316                    async move {
1317                        // Wait for signal
1318                        if let Some(receiver) = rx.lock().await.take() {
1319                            receiver.await.unwrap();
1320                        }
1321                        Ok(None) // Return Option<Blob>
1322                    }
1323                })
1324                .await
1325        });
1326
1327        // Give first request time to start and become in-flight
1328        tokio::time::sleep(Duration::from_millis(100)).await;
1329
1330        // Start second and third requests with different nodes
1331        // These should register as alternatives and wait for the first request
1332        let handles: Vec<_> = vec![nodes[1].clone(), nodes[2].clone()]
1333            .into_iter()
1334            .map(|node| {
1335                let manager_clone = Arc::clone(&manager);
1336                let key_clone = key.clone();
1337                tokio::spawn(async move {
1338                    manager_clone
1339                        .with_peer(key_clone, node, |_peer| async move {
1340                            Ok(None) // Return Option<Blob>
1341                        })
1342                        .await
1343                })
1344            })
1345            .collect();
1346
1347        // Give time for alternative peers to register
1348        tokio::time::sleep(Duration::from_millis(100)).await;
1349
1350        // Alternatives are being popped as staggered parallel runs.
1351        // The first request is blocked waiting for the signal, so staggered parallel has started
1352        // and may have already popped one or both alternatives. We just verify that at least
1353        // one alternative was registered (before being popped).
1354        // This test primarily validates that alternatives can be registered during deduplication.
1355
1356        // Signal first request to complete
1357        tx.send(()).unwrap();
1358
1359        // Wait for all requests to complete
1360        let _result1 = first_request.await.unwrap();
1361        for handle in handles {
1362            handle.await.unwrap().ok();
1363        }
1364
1365        // After completion, the in-flight entry should be removed
1366        tokio::time::sleep(Duration::from_millis(50)).await;
1367        let alt_peers = manager.get_alternative_peers(&key).await;
1368        assert!(
1369            alt_peers.is_none(),
1370            "Expected in-flight entry to be removed after completion"
1371        );
1372    }
1373
1374    #[tokio::test]
1375    async fn test_staggered_parallel_retry_on_failure() {
1376        use std::sync::atomic::{AtomicU64, Ordering};
1377
1378        use crate::test_utils::{MemoryStorageBuilder, TestBuilder};
1379
1380        // Create a test environment with four validators
1381        let mut builder = TestBuilder::new(
1382            MemoryStorageBuilder::default(),
1383            4,
1384            0,
1385            InMemorySigner::new(None),
1386        )
1387        .await
1388        .unwrap();
1389
1390        // Get validator nodes
1391        let nodes: Vec<_> = (0..4)
1392            .map(|i| {
1393                let node = builder.node(i);
1394                let public_key = node.name();
1395                RemoteNode { public_key, node }
1396            })
1397            .collect();
1398
1399        let staggered_delay = Duration::from_millis(100);
1400
1401        // Store public keys for comparison
1402        let node0_key = nodes[0].public_key;
1403        let node2_key = nodes[2].public_key;
1404
1405        // Create a RequestsScheduler
1406        let manager: Arc<RequestsScheduler<TestEnvironment>> =
1407            Arc::new(RequestsScheduler::with_config(
1408                nodes.clone(),
1409                ScoringWeights::default(),
1410                0.1,
1411                1000.0,
1412                Duration::from_secs(60),
1413                100,
1414                Duration::from_millis(MAX_REQUEST_TTL_MS),
1415                staggered_delay,
1416            ));
1417
1418        let key = test_key();
1419
1420        // Track when each peer is called
1421        let call_times = Arc::new(tokio::sync::Mutex::new(Vec::new()));
1422        let start_time = Instant::now();
1423
1424        // Track call count per peer
1425        let call_count = Arc::new(AtomicU64::new(0));
1426
1427        let call_times_clone = Arc::clone(&call_times);
1428        let call_count_clone = Arc::clone(&call_count);
1429
1430        // Test the staggered parallel retry logic directly
1431        let operation = |peer: RemoteNode<<TestEnvironment as Environment>::ValidatorNode>| {
1432            let times = Arc::clone(&call_times_clone);
1433            let count = Arc::clone(&call_count_clone);
1434            let start = start_time;
1435            async move {
1436                let elapsed = Instant::now().duration_since(start);
1437                times.lock().await.push((peer.public_key, elapsed));
1438                count.fetch_add(1, Ordering::SeqCst);
1439
1440                if peer.public_key == node0_key {
1441                    // Node 0 fails quickly
1442                    Err(NodeError::UnexpectedMessage)
1443                } else if peer.public_key == node2_key {
1444                    // Node 2 succeeds after a delay
1445                    tokio::time::sleep(staggered_delay / 2).await;
1446                    Ok(vec![])
1447                } else {
1448                    // Other nodes take longer or fail
1449                    tokio::time::sleep(staggered_delay * 2).await;
1450                    Err(NodeError::UnexpectedMessage)
1451                }
1452            }
1453        };
1454
1455        // Setup: Insert in-flight entry and register alternative peers
1456        manager.in_flight_tracker.insert_new(key.clone()).await;
1457        // Register nodes 3, 2, 1 as alternatives (will be popped in reverse: 1, 2, 3)
1458        for node in nodes.iter().skip(1).rev() {
1459            manager
1460                .in_flight_tracker
1461                .add_alternative_peer(&key, node.clone())
1462                .await;
1463        }
1464
1465        // Use node 0 as first peer, alternatives will be popped: node 1, then 2, then 3
1466        let result: Result<Vec<ConfirmedBlockCertificate>, NodeError> = manager
1467            .try_staggered_parallel(&key, nodes[0].clone(), &operation, staggered_delay)
1468            .await;
1469
1470        // Should succeed with result from node 2
1471        assert!(
1472            result.is_ok(),
1473            "Expected request to succeed with alternative peer"
1474        );
1475
1476        // Verify timing: calls should be staggered, not sequential
1477        let times = call_times.lock().await;
1478        // Can't test exactly 2 b/c we sleep _inside_ the operation and increase right at the start of it.
1479        assert!(
1480            times.len() >= 2,
1481            "Should have tried at least 2 peers, got {}",
1482            times.len()
1483        );
1484
1485        // First call should be at ~0ms
1486        assert!(
1487            times[0].1.as_millis() < 50,
1488            "First peer should be called immediately, was called at {}ms",
1489            times[0].1.as_millis()
1490        );
1491
1492        // Second call should start immediately after first fails (aggressive retry)
1493        // When node 0 fails immediately, we immediately start node 1
1494        if times.len() > 1 {
1495            let delay = times[1].1.as_millis();
1496            assert!(
1497                delay < 50,
1498                "Second peer should be called immediately on first failure, got {delay}ms"
1499            );
1500        }
1501
1502        // Total time should be significantly less than sequential (which would be
1503        // ~650ms: 200ms + 200ms + 50ms + 200ms). With parallel staggered retry:
1504        // node0 fails immediately, node1 starts immediately, the next delay is
1505        // 200ms (peer_index=2), node2 starts at ~200ms and succeeds at ~250ms.
1506        let total_time = Instant::now().duration_since(start_time).as_millis();
1507        assert!(
1508            total_time < 500,
1509            "Total time should be less than 500ms (sequential would be ~650ms), got {total_time}ms"
1510        );
1511    }
1512}