linera_core/client/requests_scheduler/
scheduler.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{cmp::Ordering, collections::BTreeMap, future::Future, sync::Arc};
5
6use custom_debug_derive::Debug;
7use futures::stream::{FuturesUnordered, StreamExt};
8use linera_base::{
9    crypto::ValidatorPublicKey,
10    data_types::{Blob, BlobContent, BlockHeight},
11    identifiers::{BlobId, ChainId},
12    time::{Duration, Instant},
13};
14use linera_chain::types::ConfirmedBlockCertificate;
15use rand::{
16    distributions::{Distribution, WeightedIndex},
17    prelude::SliceRandom as _,
18};
19use tracing::instrument;
20
21use super::{
22    cache::{RequestsCache, SubsumingKey},
23    in_flight_tracker::{InFlightMatch, InFlightTracker},
24    node_info::NodeInfo,
25    request::{RequestKey, RequestResult},
26    scoring::ScoringWeights,
27};
28use crate::{
29    client::{
30        communicate_concurrently,
31        requests_scheduler::{in_flight_tracker::Subscribed, request::Cacheable},
32        RequestsSchedulerConfig,
33    },
34    environment::Environment,
35    node::{NodeError, ValidatorNode},
36    remote_node::RemoteNode,
37};
38
39#[cfg(with_metrics)]
40pub(super) mod metrics {
41    use std::sync::LazyLock;
42
43    use linera_base::prometheus_util::{
44        exponential_bucket_latencies, register_histogram_vec, register_int_counter,
45        register_int_counter_vec,
46    };
47    use prometheus::{HistogramVec, IntCounter, IntCounterVec};
48
49    /// Histogram of response times per validator (in milliseconds)
50    pub(super) static VALIDATOR_RESPONSE_TIME: LazyLock<HistogramVec> = LazyLock::new(|| {
51        register_histogram_vec(
52            "requests_scheduler_response_time_ms",
53            "Response time for requests to validators in milliseconds",
54            &["validator"],
55            exponential_bucket_latencies(10000.0), // up to 10 seconds
56        )
57    });
58
59    /// Counter of total requests made to each validator
60    pub(super) static VALIDATOR_REQUEST_TOTAL: LazyLock<IntCounterVec> = LazyLock::new(|| {
61        register_int_counter_vec(
62            "requests_scheduler_request_total",
63            "Total number of requests made to each validator",
64            &["validator"],
65        )
66    });
67
68    /// Counter of successful requests per validator
69    pub(super) static VALIDATOR_REQUEST_SUCCESS: LazyLock<IntCounterVec> = LazyLock::new(|| {
70        register_int_counter_vec(
71            "requests_scheduler_request_success",
72            "Number of successful requests to each validator",
73            &["validator"],
74        )
75    });
76
77    /// Counter for requests that were resolved from the response cache.
78    pub(super) static REQUEST_CACHE_DEDUPLICATION: LazyLock<IntCounter> = LazyLock::new(|| {
79        register_int_counter(
80            "requests_scheduler_request_deduplication_total",
81            "Number of requests that were deduplicated by finding the result in the cache.",
82        )
83    });
84
85    /// Counter for requests that were served from cache
86    pub static REQUEST_CACHE_HIT: LazyLock<IntCounter> = LazyLock::new(|| {
87        register_int_counter(
88            "requests_scheduler_request_cache_hit_total",
89            "Number of requests that were served from cache",
90        )
91    });
92}
93
94/// Manages a pool of validator nodes with intelligent load balancing and performance tracking.
95///
96/// The `RequestsScheduler` maintains performance metrics for each validator node using
97/// Exponential Moving Averages (EMA) and uses these metrics to make intelligent routing
98/// decisions. It prevents node overload through request capacity limits and automatically
99/// retries failed requests on alternative nodes.
100///
101/// # Examples
102///
103/// ```ignore
104/// // Create with default configuration (balanced scoring)
105/// let manager = RequestsScheduler::new(validator_nodes);
106///
107/// // Create with custom configuration prioritizing low latency
108/// let latency_weights = ScoringWeights {
109///     latency: 0.6,
110///     success: 0.3,
111///     load: 0.1,
112/// };
113/// let manager = RequestsScheduler::with_config(
114///     validator_nodes,
115///     15,                      // max 15 concurrent requests per node
116///     latency_weights,         // custom scoring weights
117///     0.2,                     // higher alpha for faster adaptation
118///     3000.0,                  // max expected latency (3 seconds)
119///     Duration::from_secs(60), // 60 second cache TTL
120///     200,                     // cache up to 200 entries
121/// );
122/// ```
123#[derive(Debug, Clone)]
124pub struct RequestsScheduler<Env: Environment> {
125    /// Thread-safe map of validator nodes indexed by their public keys.
126    /// Each node is wrapped with EMA-based performance tracking information.
127    nodes: Arc<tokio::sync::RwLock<BTreeMap<ValidatorPublicKey, NodeInfo<Env>>>>,
128    /// Default scoring weights applied to new nodes.
129    weights: ScoringWeights,
130    /// Default EMA smoothing factor for new nodes.
131    alpha: f64,
132    /// Default maximum expected latency in milliseconds for score normalization.
133    max_expected_latency: f64,
134    /// Tracks in-flight requests to deduplicate concurrent requests for the same data.
135    in_flight_tracker: InFlightTracker<RemoteNode<Env::ValidatorNode>>,
136    /// Cache of recently completed requests with their results and timestamps.
137    cache: RequestsCache<RequestKey, RequestResult>,
138}
139
140impl<Env: Environment> RequestsScheduler<Env> {
141    /// Creates a new `RequestsScheduler` with the provided configuration.
142    pub fn new(
143        nodes: impl IntoIterator<Item = RemoteNode<Env::ValidatorNode>>,
144        config: RequestsSchedulerConfig,
145    ) -> Self {
146        Self::with_config(
147            nodes,
148            ScoringWeights::default(),
149            config.alpha,
150            config.max_accepted_latency_ms,
151            Duration::from_millis(config.cache_ttl_ms),
152            config.cache_max_size,
153            Duration::from_millis(config.max_request_ttl_ms),
154        )
155    }
156
157    /// Creates a new `RequestsScheduler` with custom configuration.
158    ///
159    /// # Arguments
160    /// - `nodes`: Initial set of validator nodes
161    /// - `max_requests_per_node`: Maximum concurrent requests per node
162    /// - `weights`: Scoring weights for performance metrics
163    /// - `alpha`: EMA smoothing factor (0 < alpha < 1)
164    /// - `max_expected_latency_ms`: Maximum expected latency for score normalization
165    /// - `cache_ttl`: Time-to-live for cached responses
166    /// - `max_cache_size`: Maximum number of entries in the cache
167    /// - `max_request_ttl`: Maximum latency for an in-flight request before we stop deduplicating it
168    pub fn with_config(
169        nodes: impl IntoIterator<Item = RemoteNode<Env::ValidatorNode>>,
170        weights: ScoringWeights,
171        alpha: f64,
172        max_expected_latency_ms: f64,
173        cache_ttl: Duration,
174        max_cache_size: usize,
175        max_request_ttl: Duration,
176    ) -> Self {
177        assert!(alpha > 0.0 && alpha < 1.0, "Alpha must be in (0, 1) range");
178        Self {
179            nodes: Arc::new(tokio::sync::RwLock::new(
180                nodes
181                    .into_iter()
182                    .map(|node| {
183                        (
184                            node.public_key,
185                            NodeInfo::with_config(node, weights, alpha, max_expected_latency_ms),
186                        )
187                    })
188                    .collect(),
189            )),
190            weights,
191            alpha,
192            max_expected_latency: max_expected_latency_ms,
193            in_flight_tracker: InFlightTracker::new(max_request_ttl),
194            cache: RequestsCache::new(cache_ttl, max_cache_size),
195        }
196    }
197
198    /// Executes an operation with an automatically selected peer, handling deduplication,
199    /// tracking, and peer selection.
200    ///
201    /// This method provides a high-level API for executing operations against remote nodes
202    /// while leveraging the [`RequestsScheduler`]'s intelligent peer selection, performance tracking,
203    /// and request deduplication capabilities.
204    ///
205    /// # Type Parameters
206    /// - `R`: The inner result type (what the operation returns on success)
207    /// - `F`: The async closure type that takes a `RemoteNode` and returns a future
208    /// - `Fut`: The future type returned by the closure
209    ///
210    /// # Arguments
211    /// - `key`: Unique identifier for request deduplication
212    /// - `operation`: Async closure that takes a selected peer and performs the operation
213    ///
214    /// # Returns
215    /// The result from the operation, potentially from cache or a deduplicated in-flight request
216    ///
217    /// # Example
218    /// ```ignore
219    /// let result: Result<Vec<ConfirmedBlockCertificate>, NodeError> = requests_scheduler
220    ///     .with_best(
221    ///         RequestKey::Certificates { chain_id, start, limit },
222    ///         |peer| async move {
223    ///             peer.download_certificates_from(chain_id, start, limit).await
224    ///         }
225    ///     )
226    ///     .await;
227    /// ```
228    #[allow(unused)]
229    async fn with_best<R, F, Fut>(&self, key: RequestKey, operation: F) -> Result<R, NodeError>
230    where
231        R: Cacheable + Clone + Send + 'static,
232        F: FnOnce(RemoteNode<Env::ValidatorNode>) -> Fut,
233        Fut: Future<Output = Result<R, NodeError>>,
234    {
235        // Select the best available peer
236        let peer = self
237            .select_best_peer()
238            .await
239            .ok_or_else(|| NodeError::WorkerError {
240                error: "No validators available".to_string(),
241            })?;
242        self.with_peer(key, peer, operation).await
243    }
244
245    /// Executes an operation with a specific peer.
246    ///
247    /// Similar to [`with_best`](Self::with_best), but uses the provided peer directly
248    /// instead of selecting the best available peer. This is useful when you need to
249    /// query a specific validator node.
250    ///
251    /// # Type Parameters
252    /// - `R`: The inner result type (what the operation returns on success)
253    /// - `F`: The async closure type that takes a `RemoteNode` and returns a future
254    /// - `Fut`: The future type returned by the closure
255    ///
256    /// # Arguments
257    /// - `key`: Unique identifier for request deduplication
258    /// - `peer`: The specific peer to use for the operation
259    /// - `operation`: Async closure that takes the peer and performs the operation
260    ///
261    /// # Returns
262    /// The result from the operation, potentially from cache or a deduplicated in-flight request
263    async fn with_peer<R, F, Fut>(
264        &self,
265        key: RequestKey,
266        peer: RemoteNode<Env::ValidatorNode>,
267        operation: F,
268    ) -> Result<R, NodeError>
269    where
270        R: Cacheable + Clone + Send + 'static,
271        F: FnOnce(RemoteNode<Env::ValidatorNode>) -> Fut,
272        Fut: Future<Output = Result<R, NodeError>>,
273    {
274        self.add_peer(peer.clone()).await;
275        self.in_flight_tracker
276            .add_alternative_peer(&key, peer.clone())
277            .await;
278        self.deduplicated_request(key, peer, |peer| async {
279            self.track_request(peer, operation).await
280        })
281        .await
282    }
283
284    #[instrument(level = "trace", skip_all)]
285    async fn download_blob(
286        &self,
287        peers: &[RemoteNode<Env::ValidatorNode>],
288        blob_id: BlobId,
289        timeout: Duration,
290    ) -> Result<Option<Blob>, NodeError> {
291        let key = RequestKey::Blob(blob_id);
292        let mut peers = peers.to_vec();
293        peers.shuffle(&mut rand::thread_rng());
294        communicate_concurrently(
295            &peers,
296            async move |peer| {
297                self.with_peer(key, peer, |peer| async move {
298                    peer.download_blob(blob_id).await
299                })
300                .await
301            },
302            |errors| errors.last().cloned().unwrap(),
303            timeout,
304        )
305        .await
306        .map_err(|(_validator, error)| error)
307    }
308
309    /// Downloads the blobs with the given IDs. This is done in one concurrent task per blob.
310    /// Uses intelligent peer selection based on scores and load balancing.
311    /// Returns `None` if it couldn't find all blobs.
312    #[instrument(level = "trace", skip_all)]
313    pub async fn download_blobs(
314        &self,
315        peers: &[RemoteNode<Env::ValidatorNode>],
316        blob_ids: &[BlobId],
317        timeout: Duration,
318    ) -> Result<Option<Vec<Blob>>, NodeError> {
319        let mut stream = blob_ids
320            .iter()
321            .map(|blob_id| self.download_blob(peers, *blob_id, timeout))
322            .collect::<FuturesUnordered<_>>();
323
324        let mut blobs = Vec::new();
325        while let Some(maybe_blob) = stream.next().await {
326            blobs.push(maybe_blob?);
327        }
328        Ok(blobs.into_iter().collect::<Option<Vec<_>>>())
329    }
330
331    pub async fn download_certificates(
332        &self,
333        peer: &RemoteNode<Env::ValidatorNode>,
334        chain_id: ChainId,
335        start: BlockHeight,
336        limit: u64,
337    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
338        let heights = (start.0..start.0 + limit)
339            .map(BlockHeight)
340            .collect::<Vec<_>>();
341        self.with_peer(
342            RequestKey::Certificates {
343                chain_id,
344                heights: heights.clone(),
345            },
346            peer.clone(),
347            |peer| async move {
348                Box::pin(peer.download_certificates_by_heights(chain_id, heights)).await
349            },
350        )
351        .await
352    }
353
354    pub async fn download_certificates_by_heights(
355        &self,
356        peer: &RemoteNode<Env::ValidatorNode>,
357        chain_id: ChainId,
358        heights: Vec<BlockHeight>,
359    ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
360        self.with_peer(
361            RequestKey::Certificates {
362                chain_id,
363                heights: heights.clone(),
364            },
365            peer.clone(),
366            |peer| async move {
367                peer.download_certificates_by_heights(chain_id, heights)
368                    .await
369            },
370        )
371        .await
372    }
373
374    pub async fn download_certificate_for_blob(
375        &self,
376        peer: &RemoteNode<Env::ValidatorNode>,
377        blob_id: BlobId,
378    ) -> Result<ConfirmedBlockCertificate, NodeError> {
379        self.with_peer(
380            RequestKey::CertificateForBlob(blob_id),
381            peer.clone(),
382            |peer| async move { peer.download_certificate_for_blob(blob_id).await },
383        )
384        .await
385    }
386
387    pub async fn download_pending_blob(
388        &self,
389        peer: &RemoteNode<Env::ValidatorNode>,
390        chain_id: ChainId,
391        blob_id: BlobId,
392    ) -> Result<BlobContent, NodeError> {
393        self.with_peer(
394            RequestKey::PendingBlob { chain_id, blob_id },
395            peer.clone(),
396            |peer| async move { peer.node.download_pending_blob(chain_id, blob_id).await },
397        )
398        .await
399    }
400
401    /// Returns the alternative peers registered for an in-flight request, if any.
402    ///
403    /// This can be used to retry a failed request with alternative data sources
404    /// that were registered during request deduplication.
405    pub async fn get_alternative_peers(
406        &self,
407        key: &RequestKey,
408    ) -> Option<Vec<RemoteNode<Env::ValidatorNode>>> {
409        self.in_flight_tracker.get_alternative_peers(key).await
410    }
411
412    /// Returns current performance metrics for all managed nodes.
413    ///
414    /// Each entry contains:
415    /// - Performance score (f64, normalized 0.0-1.0)
416    /// - EMA success rate (f64, 0.0-1.0)
417    /// - Total requests processed (u64)
418    ///
419    /// Useful for monitoring and debugging node performance.
420    pub async fn get_node_scores(&self) -> BTreeMap<ValidatorPublicKey, (f64, f64, u64)> {
421        let nodes = self.nodes.read().await;
422        let mut result = BTreeMap::new();
423
424        for (key, info) in nodes.iter() {
425            let score = info.calculate_score().await;
426            result.insert(
427                *key,
428                (score, info.ema_success_rate(), info.total_requests()),
429            );
430        }
431
432        result
433    }
434
435    /// Wraps a request operation with performance tracking and capacity management.
436    ///
437    /// This method:
438    /// 1. Acquires a request slot (blocks asynchronously until one is available)
439    /// 2. Executes the provided operation with the selected peer
440    /// 3. Measures response time
441    /// 4. Updates node metrics based on success/failure
442    /// 5. Releases the request slot
443    ///
444    /// # Arguments
445    /// - `peer`: The remote node to execute the operation on
446    /// - `operation`: Async closure that performs the actual request with the selected peer
447    ///
448    /// # Behavior
449    /// If no slot is available, this method will wait asynchronously (without polling)
450    /// until another request completes and releases its slot. The task will be efficiently
451    /// suspended and woken by the async runtime using notification mechanisms.
452    async fn track_request<T, F, Fut>(
453        &self,
454        peer: RemoteNode<Env::ValidatorNode>,
455        operation: F,
456    ) -> Result<T, NodeError>
457    where
458        F: FnOnce(RemoteNode<Env::ValidatorNode>) -> Fut,
459        Fut: Future<Output = Result<T, NodeError>>,
460    {
461        let start_time = Instant::now();
462        let public_key = peer.public_key;
463
464        // Execute the operation
465        let result = operation(peer).await;
466
467        // Update metrics and release slot
468        let response_time_ms = start_time.elapsed().as_millis() as u64;
469        let is_success = result.is_ok();
470        {
471            let mut nodes = self.nodes.write().await;
472            if let Some(info) = nodes.get_mut(&public_key) {
473                info.update_metrics(is_success, response_time_ms);
474                let score = info.calculate_score().await;
475                tracing::trace!(
476                    node = %public_key,
477                    address = %info.node.node.address(),
478                    success = %is_success,
479                    response_time_ms = %response_time_ms,
480                    score = %score,
481                    total_requests = %info.total_requests(),
482                    "Request completed"
483                );
484            }
485        }
486
487        // Record Prometheus metrics
488        #[cfg(with_metrics)]
489        {
490            let validator_name = public_key.to_string();
491            metrics::VALIDATOR_RESPONSE_TIME
492                .with_label_values(&[&validator_name])
493                .observe(response_time_ms as f64);
494            metrics::VALIDATOR_REQUEST_TOTAL
495                .with_label_values(&[&validator_name])
496                .inc();
497            if is_success {
498                metrics::VALIDATOR_REQUEST_SUCCESS
499                    .with_label_values(&[&validator_name])
500                    .inc();
501            }
502        }
503
504        result
505    }
506
507    /// Deduplicates concurrent requests for the same data.
508    ///
509    /// If a request for the same key is already in flight, this method waits for
510    /// the existing request to complete and returns its result. Otherwise, it
511    /// executes the operation and broadcasts the result to all waiting callers.
512    ///
513    /// This method also performs **subsumption-based deduplication**: if a larger
514    /// request that contains all the data needed by this request is already cached
515    /// or in flight, we can extract the subset result instead of making a new request.
516    ///
517    /// # Arguments
518    /// - `key`: Unique identifier for the request
519    /// - `operation`: Async closure that performs the actual request
520    ///
521    /// # Returns
522    /// The result from either the in-flight request or the newly executed operation
523    async fn deduplicated_request<T, F, Fut, N>(
524        &self,
525        key: RequestKey,
526        peer: N,
527        operation: F,
528    ) -> Result<T, NodeError>
529    where
530        T: Cacheable + Clone + Send + 'static,
531        F: FnOnce(N) -> Fut,
532        Fut: Future<Output = Result<T, NodeError>>,
533    {
534        // Check cache for exact or subsuming match
535        if let Some(result) = self.cache.get(&key).await {
536            return Ok(result);
537        }
538
539        // Check if there's an in-flight request (exact or subsuming)
540        if let Some(in_flight_match) = self.in_flight_tracker.try_subscribe(&key).await {
541            match in_flight_match {
542                InFlightMatch::Exact(Subscribed(mut receiver)) => {
543                    tracing::trace!(
544                        key = ?key,
545                        "deduplicating request (exact match) - joining existing in-flight request"
546                    );
547                    #[cfg(with_metrics)]
548                    metrics::REQUEST_CACHE_DEDUPLICATION.inc();
549                    // Wait for result from existing request
550                    match receiver.recv().await {
551                        Ok(result) => match result.as_ref().clone() {
552                            Ok(res) => match T::try_from(res) {
553                                Ok(converted) => {
554                                    tracing::trace!(
555                                        key = ?key,
556                                        "received result from deduplicated in-flight request"
557                                    );
558                                    return Ok(converted);
559                                }
560                                Err(_) => {
561                                    tracing::trace!(
562                                        key = ?key,
563                                        "failed to convert result from deduplicated in-flight request, will execute independently"
564                                    );
565                                }
566                            },
567                            Err(e) => {
568                                tracing::trace!(
569                                    key = ?key,
570                                    error = %e,
571                                    "in-flight request failed",
572                                );
573                                // Fall through to execute a new request
574                            }
575                        },
576                        Err(_) => {
577                            tracing::trace!(
578                                key = ?key,
579                                "in-flight request sender dropped"
580                            );
581                            // Fall through to execute a new request
582                        }
583                    }
584                }
585                InFlightMatch::Subsuming {
586                    key: subsuming_key,
587                    outcome: Subscribed(mut receiver),
588                } => {
589                    tracing::trace!(
590                    key = ?key,
591                    subsumed_by = ?subsuming_key,
592                        "deduplicating request (subsumption) - joining larger in-flight request"
593                    );
594                    #[cfg(with_metrics)]
595                    metrics::REQUEST_CACHE_DEDUPLICATION.inc();
596                    // Wait for result from the subsuming request
597                    match receiver.recv().await {
598                        Ok(result) => {
599                            match result.as_ref() {
600                                Ok(res) => {
601                                    if let Some(extracted) =
602                                        key.try_extract_result(&subsuming_key, res)
603                                    {
604                                        tracing::trace!(
605                                            key = ?key,
606                                            "extracted subset result from larger in-flight request"
607                                        );
608                                        match T::try_from(extracted) {
609                                            Ok(converted) => return Ok(converted),
610                                            Err(_) => {
611                                                tracing::trace!(
612                                                    key = ?key,
613                                                    "failed to convert extracted result, will execute independently"
614                                                );
615                                            }
616                                        }
617                                    } else {
618                                        // Extraction failed, fall through to execute our own request
619                                        tracing::trace!(
620                                            key = ?key,
621                                            "failed to extract from subsuming request, will execute independently"
622                                        );
623                                    }
624                                }
625                                Err(e) => {
626                                    tracing::trace!(
627                                        key = ?key,
628                                        error = %e,
629                                        "subsuming in-flight request failed",
630                                    );
631                                    // Fall through to execute our own request
632                                }
633                            }
634                        }
635                        Err(_) => {
636                            tracing::trace!(
637                                key = ?key,
638                                "subsuming in-flight request sender dropped"
639                            );
640                        }
641                    }
642                }
643            }
644        };
645
646        // Create new in-flight entry for this request
647        self.in_flight_tracker.insert_new(key.clone()).await;
648
649        // Execute the actual request
650        tracing::trace!(key = ?key, "executing new request");
651        let result = operation(peer).await;
652        let result_for_broadcast: Result<RequestResult, NodeError> = result.clone().map(Into::into);
653        let shared_result = Arc::new(result_for_broadcast);
654
655        // Broadcast result and clean up
656        self.in_flight_tracker
657            .complete_and_broadcast(&key, shared_result.clone())
658            .await;
659
660        if let Ok(success) = shared_result.as_ref() {
661            self.cache
662                .store(key.clone(), Arc::new(success.clone()))
663                .await;
664        }
665        result
666    }
667
668    /// Returns all peers ordered by their score (highest first).
669    ///
670    /// Only includes peers that can currently accept requests. Each peer is paired
671    /// with its calculated score based on latency, success rate, and availability.
672    ///
673    /// # Returns
674    /// A vector of `(score, peer)` tuples sorted by score in descending order.
675    /// Returns an empty vector if no peers can accept requests.
676    async fn peers_by_score(&self) -> Vec<(f64, RemoteNode<Env::ValidatorNode>)> {
677        let nodes = self.nodes.read().await;
678
679        // Filter nodes that can accept requests and calculate their scores
680        let mut scored_nodes = Vec::new();
681        for info in nodes.values() {
682            let score = info.calculate_score().await;
683            scored_nodes.push((score, info.node.clone()));
684        }
685
686        // Sort by score (highest first)
687        scored_nodes.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(Ordering::Equal));
688
689        scored_nodes
690    }
691
692    /// Selects the best available peer using weighted random selection from top performers.
693    ///
694    /// This method:
695    /// 1. Filters nodes that have available request capacity
696    /// 2. Sorts them by performance score
697    /// 3. Performs weighted random selection from the top 3 performers
698    ///
699    /// This approach balances between choosing high-performing nodes and distributing
700    /// load across multiple validators to avoid creating hotspots.
701    ///
702    /// Returns `None` if no nodes are available or all are at capacity.
703    async fn select_best_peer(&self) -> Option<RemoteNode<Env::ValidatorNode>> {
704        let scored_nodes = self.peers_by_score().await;
705
706        if scored_nodes.is_empty() {
707            return None;
708        }
709
710        // Use weighted random selection from top performers (top 3 or all if less)
711        let top_count = scored_nodes.len().min(3);
712        let top_nodes = &scored_nodes[..top_count];
713
714        // Create weights based on normalized scores
715        // Add small epsilon to prevent zero weights
716        let weights: Vec<f64> = top_nodes.iter().map(|(score, _)| score.max(0.01)).collect();
717
718        if let Ok(dist) = WeightedIndex::new(&weights) {
719            let mut rng = rand::thread_rng();
720            let index = dist.sample(&mut rng);
721            Some(top_nodes[index].1.clone())
722        } else {
723            // Fallback to the best node if weights are invalid
724            tracing::warn!("failed to create weighted distribution, defaulting to best node");
725            Some(scored_nodes[0].1.clone())
726        }
727    }
728
729    /// Adds a new peer to the manager if it doesn't already exist.
730    async fn add_peer(&self, node: RemoteNode<Env::ValidatorNode>) {
731        let mut nodes = self.nodes.write().await;
732        let public_key = node.public_key;
733        nodes.entry(public_key).or_insert_with(|| {
734            NodeInfo::with_config(node, self.weights, self.alpha, self.max_expected_latency)
735        });
736    }
737}
738
739#[cfg(test)]
740mod tests {
741    use std::sync::{
742        atomic::{AtomicUsize, Ordering},
743        Arc,
744    };
745
746    use linera_base::{
747        crypto::{CryptoHash, InMemorySigner},
748        data_types::BlockHeight,
749        identifiers::ChainId,
750        time::Duration,
751    };
752    use linera_chain::types::ConfirmedBlockCertificate;
753    use tokio::sync::oneshot;
754
755    use super::{super::request::RequestKey, *};
756    use crate::{client::requests_scheduler::MAX_REQUEST_TTL_MS, node::NodeError};
757
758    type TestEnvironment = crate::environment::Test;
759
760    /// Helper function to create a test RequestsScheduler with custom configuration
761    fn create_test_manager(
762        in_flight_timeout: Duration,
763        cache_ttl: Duration,
764    ) -> Arc<RequestsScheduler<TestEnvironment>> {
765        let mut manager = RequestsScheduler::with_config(
766            vec![], // No actual nodes needed for these tests
767            ScoringWeights::default(),
768            0.1,
769            1000.0,
770            cache_ttl,
771            100,
772            in_flight_timeout,
773        );
774        // Replace the tracker with one using the custom timeout
775        manager.in_flight_tracker = InFlightTracker::new(in_flight_timeout);
776        Arc::new(manager)
777    }
778
779    /// Helper function to create a test result
780    fn test_result_ok() -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
781        Ok(vec![])
782    }
783
784    /// Helper function to create a test request key
785    fn test_key() -> RequestKey {
786        RequestKey::Certificates {
787            chain_id: ChainId(CryptoHash::test_hash("test")),
788            heights: vec![BlockHeight(0), BlockHeight(1)],
789        }
790    }
791
792    #[tokio::test]
793    async fn test_cache_hit_returns_cached_result() {
794        // Create a manager with standard cache TTL
795        let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60));
796        let key = test_key();
797
798        // Track how many times the operation is executed
799        let execution_count = Arc::new(AtomicUsize::new(0));
800        let execution_count_clone = execution_count.clone();
801
802        // First call - should execute the operation and cache the result
803        let result1: Result<Vec<ConfirmedBlockCertificate>, NodeError> = manager
804            .deduplicated_request(key.clone(), (), |_| async move {
805                execution_count_clone.fetch_add(1, Ordering::SeqCst);
806                test_result_ok()
807            })
808            .await;
809
810        assert!(result1.is_ok());
811        assert_eq!(execution_count.load(Ordering::SeqCst), 1);
812
813        // Second call - should return cached result without executing the operation
814        let execution_count_clone2 = execution_count.clone();
815        let result2: Result<Vec<ConfirmedBlockCertificate>, NodeError> = manager
816            .deduplicated_request(key.clone(), (), |_| async move {
817                execution_count_clone2.fetch_add(1, Ordering::SeqCst);
818                test_result_ok()
819            })
820            .await;
821
822        assert_eq!(result1, result2);
823        // Operation should still only have been executed once (cache hit)
824        assert_eq!(execution_count.load(Ordering::SeqCst), 1);
825    }
826
827    #[tokio::test]
828    async fn test_in_flight_request_deduplication() {
829        let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60));
830        let key = test_key();
831
832        // Track how many times the operation is executed
833        let execution_count = Arc::new(AtomicUsize::new(0));
834
835        // Create a channel to control when the first operation completes
836        let (tx, rx) = oneshot::channel();
837
838        // Start first request (will be slow - waits for signal)
839        let manager_clone = Arc::clone(&manager);
840        let key_clone = key.clone();
841        let execution_count_clone = execution_count.clone();
842        let first_request = tokio::spawn(async move {
843            manager_clone
844                .deduplicated_request(key_clone, (), |_| async move {
845                    execution_count_clone.fetch_add(1, Ordering::SeqCst);
846                    // Wait for signal before completing
847                    rx.await.unwrap();
848                    test_result_ok()
849                })
850                .await
851        });
852
853        // Start second request - should deduplicate and wait for the first
854        let execution_count_clone2 = execution_count.clone();
855        let second_request = tokio::spawn(async move {
856            manager
857                .deduplicated_request(key, (), |_| async move {
858                    execution_count_clone2.fetch_add(1, Ordering::SeqCst);
859                    test_result_ok()
860                })
861                .await
862        });
863
864        // Signal the first request to complete
865        tx.send(()).unwrap();
866
867        // Both requests should complete successfully
868        let result1: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
869            first_request.await.unwrap();
870        let result2: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
871            second_request.await.unwrap();
872
873        assert!(result1.is_ok());
874        assert_eq!(result1, result2);
875
876        // Operation should only have been executed once (deduplication worked)
877        assert_eq!(execution_count.load(Ordering::SeqCst), 1);
878    }
879
880    #[tokio::test]
881    async fn test_multiple_subscribers_all_notified() {
882        let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60));
883        let key = test_key();
884
885        // Track how many times the operation is executed
886        let execution_count = Arc::new(AtomicUsize::new(0));
887
888        // Create a channel to control when the operation completes
889        let (tx, rx) = oneshot::channel();
890
891        // Start first request (will be slow - waits for signal)
892        let manager_clone1 = Arc::clone(&manager);
893        let key_clone1 = key.clone();
894        let execution_count_clone = execution_count.clone();
895        let first_request = tokio::spawn(async move {
896            manager_clone1
897                .deduplicated_request(key_clone1, (), |_| async move {
898                    execution_count_clone.fetch_add(1, Ordering::SeqCst);
899                    rx.await.unwrap();
900                    test_result_ok()
901                })
902                .await
903        });
904
905        // Start multiple additional requests - all should deduplicate
906        let mut handles = vec![];
907        for _ in 0..5 {
908            let manager_clone = Arc::clone(&manager);
909            let key_clone = key.clone();
910            let execution_count_clone = execution_count.clone();
911            let handle = tokio::spawn(async move {
912                manager_clone
913                    .deduplicated_request(key_clone, (), |_| async move {
914                        execution_count_clone.fetch_add(1, Ordering::SeqCst);
915                        test_result_ok()
916                    })
917                    .await
918            });
919            handles.push(handle);
920        }
921
922        // Signal the first request to complete
923        tx.send(()).unwrap();
924
925        // First request should complete successfully
926        let result: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
927            first_request.await.unwrap();
928        assert!(result.is_ok());
929
930        // All subscriber requests should also complete successfully
931        for handle in handles {
932            assert_eq!(handle.await.unwrap(), result);
933        }
934
935        // Operation should only have been executed once (all requests were deduplicated)
936        assert_eq!(execution_count.load(Ordering::SeqCst), 1);
937    }
938
939    #[tokio::test]
940    async fn test_timeout_triggers_new_request() {
941        // Create a manager with a very short in-flight timeout
942        let manager = create_test_manager(Duration::from_millis(50), Duration::from_secs(60));
943
944        let key = test_key();
945
946        // Track how many times the operation is executed
947        let execution_count = Arc::new(AtomicUsize::new(0));
948
949        // Create a channel to control when the first operation completes
950        let (tx, rx) = oneshot::channel();
951
952        // Start first request (will be slow - waits for signal)
953        let manager_clone = Arc::clone(&manager);
954        let key_clone = key.clone();
955        let execution_count_clone = execution_count.clone();
956        let first_request = tokio::spawn(async move {
957            manager_clone
958                .deduplicated_request(key_clone, (), |_| async move {
959                    execution_count_clone.fetch_add(1, Ordering::SeqCst);
960                    rx.await.unwrap();
961                    test_result_ok()
962                })
963                .await
964        });
965
966        // Wait for the timeout to elapse
967        tokio::time::sleep(Duration::from_millis(MAX_REQUEST_TTL_MS + 1)).await;
968
969        // Start second request - should NOT deduplicate because first request exceeded timeout
970        let execution_count_clone2 = execution_count.clone();
971        let second_request = tokio::spawn(async move {
972            manager
973                .deduplicated_request(key, (), |_| async move {
974                    execution_count_clone2.fetch_add(1, Ordering::SeqCst);
975                    test_result_ok()
976                })
977                .await
978        });
979
980        // Wait for second request to complete
981        let result2: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
982            second_request.await.unwrap();
983        assert!(result2.is_ok());
984
985        // Complete the first request
986        tx.send(()).unwrap();
987        let result1: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
988            first_request.await.unwrap();
989        assert!(result1.is_ok());
990
991        // Operation should have been executed twice (timeout triggered new request)
992        assert_eq!(execution_count.load(Ordering::SeqCst), 2);
993    }
994
995    #[tokio::test]
996    async fn test_alternative_peers_registered_on_deduplication() {
997        use linera_base::identifiers::BlobType;
998
999        use crate::test_utils::{MemoryStorageBuilder, TestBuilder};
1000
1001        // Create a test environment with three validators
1002        let mut builder = TestBuilder::new(
1003            MemoryStorageBuilder::default(),
1004            3,
1005            0,
1006            InMemorySigner::new(None),
1007        )
1008        .await
1009        .unwrap();
1010
1011        // Get validator nodes
1012        let nodes: Vec<_> = (0..3)
1013            .map(|i| {
1014                let node = builder.node(i);
1015                let public_key = node.name();
1016                RemoteNode { public_key, node }
1017            })
1018            .collect();
1019
1020        // Create a RequestsScheduler
1021        let manager: Arc<RequestsScheduler<TestEnvironment>> =
1022            Arc::new(RequestsScheduler::with_config(
1023                nodes.clone(),
1024                ScoringWeights::default(),
1025                0.1,
1026                1000.0,
1027                Duration::from_secs(60),
1028                100,
1029                Duration::from_millis(MAX_REQUEST_TTL_MS),
1030            ));
1031
1032        let key = RequestKey::Blob(BlobId::new(
1033            CryptoHash::test_hash("test_blob"),
1034            BlobType::Data,
1035        ));
1036
1037        // Create a channel to control when first request completes
1038        let (tx, rx) = oneshot::channel();
1039
1040        // Start first request with node 0 (will block until signaled)
1041        let manager_clone = Arc::clone(&manager);
1042        let node_clone = nodes[0].clone();
1043        let key_clone = key.clone();
1044        let first_request = tokio::spawn(async move {
1045            manager_clone
1046                .with_peer(key_clone, node_clone, |_peer| async move {
1047                    // Wait for signal
1048                    rx.await.unwrap();
1049                    Ok(None) // Return Option<Blob>
1050                })
1051                .await
1052        });
1053
1054        // Give first request time to start and become in-flight
1055        tokio::time::sleep(Duration::from_millis(100)).await;
1056
1057        // Start second and third requests with different nodes
1058        // These should register as alternatives and wait for the first request
1059        let handles: Vec<_> = vec![nodes[1].clone(), nodes[2].clone()]
1060            .into_iter()
1061            .map(|node| {
1062                let manager_clone = Arc::clone(&manager);
1063                let key_clone = key.clone();
1064                tokio::spawn(async move {
1065                    manager_clone
1066                        .with_peer(key_clone, node, |_peer| async move {
1067                            Ok(None) // Return Option<Blob>
1068                        })
1069                        .await
1070                })
1071            })
1072            .collect();
1073
1074        // Give time for alternative peers to register
1075        tokio::time::sleep(Duration::from_millis(100)).await;
1076
1077        // Check that nodes 1 and 2 are registered as alternatives
1078        let alt_peers = manager
1079            .get_alternative_peers(&key)
1080            .await
1081            .expect("in-flight entry")
1082            .into_iter()
1083            .map(|p| p.public_key)
1084            .collect::<Vec<_>>();
1085        assert_eq!(
1086            alt_peers,
1087            vec![nodes[1].public_key, nodes[2].public_key],
1088            "expected nodes 2 and 3 to be registered as alternative peers"
1089        );
1090
1091        // Signal first request to complete
1092        tx.send(()).unwrap();
1093
1094        // Wait for all requests to complete
1095        let _result1 = first_request.await.unwrap();
1096        for handle in handles {
1097            let _ = handle.await.unwrap();
1098        }
1099
1100        // After completion, the in-flight entry should be removed
1101        tokio::time::sleep(Duration::from_millis(50)).await;
1102        let alt_peers = manager.get_alternative_peers(&key).await;
1103        assert!(
1104            alt_peers.is_none(),
1105            "Expected in-flight entry to be removed after completion"
1106        );
1107    }
1108}