1use std::{
5 collections::BTreeMap,
6 future::Future,
7 sync::{
8 atomic::{AtomicU32, Ordering},
9 Arc,
10 },
11};
12
13use custom_debug_derive::Debug;
14use futures::stream::{FuturesUnordered, StreamExt};
15use linera_base::{
16 crypto::ValidatorPublicKey,
17 data_types::{Blob, BlobContent, BlockHeight},
18 identifiers::{BlobId, ChainId},
19 time::{Duration, Instant},
20};
21use linera_chain::types::ConfirmedBlockCertificate;
22use rand::distributions::{Distribution, WeightedIndex};
23use tracing::{instrument, warn};
24
25use super::{
26 cache::{RequestsCache, SubsumingKey},
27 in_flight_tracker::{InFlightMatch, InFlightTracker},
28 node_info::NodeInfo,
29 request::{RequestKey, RequestResult},
30 scoring::ScoringWeights,
31};
32use crate::{
33 client::{
34 communicate_concurrently,
35 requests_scheduler::{in_flight_tracker::Subscribed, request::Cacheable},
36 RequestsSchedulerConfig,
37 },
38 environment::Environment,
39 node::{NodeError, ValidatorNode},
40 remote_node::RemoteNode,
41};
42
43#[cfg(with_metrics)]
44pub(super) mod metrics {
45 use std::sync::LazyLock;
46
47 use linera_base::prometheus_util::{
48 exponential_bucket_latencies, register_histogram_vec, register_int_counter,
49 register_int_counter_vec,
50 };
51 use prometheus::{HistogramVec, IntCounter, IntCounterVec};
52
53 pub(super) static VALIDATOR_RESPONSE_TIME: LazyLock<HistogramVec> = LazyLock::new(|| {
55 register_histogram_vec(
56 "requests_scheduler_response_time_ms",
57 "Response time for requests to validators in milliseconds",
58 &["validator"],
59 exponential_bucket_latencies(10000.0), )
61 });
62
63 pub(super) static VALIDATOR_REQUEST_TOTAL: LazyLock<IntCounterVec> = LazyLock::new(|| {
65 register_int_counter_vec(
66 "requests_scheduler_request_total",
67 "Total number of requests made to each validator",
68 &["validator"],
69 )
70 });
71
72 pub(super) static VALIDATOR_REQUEST_SUCCESS: LazyLock<IntCounterVec> = LazyLock::new(|| {
74 register_int_counter_vec(
75 "requests_scheduler_request_success",
76 "Number of successful requests to each validator",
77 &["validator"],
78 )
79 });
80
81 pub(super) static REQUEST_CACHE_DEDUPLICATION: LazyLock<IntCounter> = LazyLock::new(|| {
83 register_int_counter(
84 "requests_scheduler_request_deduplication_total",
85 "Number of requests that were deduplicated by finding the result in the cache.",
86 )
87 });
88
89 pub static REQUEST_CACHE_HIT: LazyLock<IntCounter> = LazyLock::new(|| {
91 register_int_counter(
92 "requests_scheduler_request_cache_hit_total",
93 "Number of requests that were served from cache",
94 )
95 });
96}
97
98#[derive(Debug, Clone)]
129pub struct RequestsScheduler<Env: Environment> {
130 nodes: Arc<tokio::sync::RwLock<BTreeMap<ValidatorPublicKey, NodeInfo<Env>>>>,
133 weights: ScoringWeights,
135 alpha: f64,
137 max_expected_latency: f64,
139 retry_delay: Duration,
141 in_flight_tracker: InFlightTracker<RemoteNode<Env::ValidatorNode>>,
143 cache: RequestsCache<RequestKey, RequestResult>,
145}
146
147impl<Env: Environment> RequestsScheduler<Env> {
148 pub fn new(
150 nodes: impl IntoIterator<Item = RemoteNode<Env::ValidatorNode>>,
151 config: &RequestsSchedulerConfig,
152 ) -> Self {
153 Self::with_config(
154 nodes,
155 ScoringWeights::default(),
156 config.alpha,
157 config.max_accepted_latency_ms,
158 Duration::from_millis(config.cache_ttl_ms),
159 config.cache_max_size,
160 Duration::from_millis(config.max_request_ttl_ms),
161 Duration::from_millis(config.retry_delay_ms),
162 )
163 }
164
165 #[expect(clippy::too_many_arguments)]
178 pub fn with_config(
179 nodes: impl IntoIterator<Item = RemoteNode<Env::ValidatorNode>>,
180 weights: ScoringWeights,
181 alpha: f64,
182 max_expected_latency_ms: f64,
183 cache_ttl: Duration,
184 max_cache_size: usize,
185 max_request_ttl: Duration,
186 retry_delay: Duration,
187 ) -> Self {
188 assert!(alpha > 0.0 && alpha < 1.0, "Alpha must be in (0, 1) range");
189 Self {
190 nodes: Arc::new(tokio::sync::RwLock::new(
191 nodes
192 .into_iter()
193 .map(|node| {
194 (
195 node.public_key,
196 NodeInfo::with_config(node, weights, alpha, max_expected_latency_ms),
197 )
198 })
199 .collect(),
200 )),
201 weights,
202 alpha,
203 max_expected_latency: max_expected_latency_ms,
204 retry_delay,
205 in_flight_tracker: InFlightTracker::new(max_request_ttl),
206 cache: RequestsCache::new(cache_ttl, max_cache_size),
207 }
208 }
209
210 #[allow(unused)]
241 async fn with_best<R, F, Fut>(&self, key: RequestKey, operation: F) -> Result<R, NodeError>
242 where
243 R: Cacheable + Clone + Send + 'static,
244 F: Fn(RemoteNode<Env::ValidatorNode>) -> Fut,
245 Fut: Future<Output = Result<R, NodeError>> + 'static,
246 {
247 let peer = self
249 .select_best_peer()
250 .await
251 .ok_or_else(|| NodeError::WorkerError {
252 error: "No validators available".to_string(),
253 })?;
254 self.with_peer(key, peer, operation).await
255 }
256
257 async fn with_peer<R, F, Fut>(
276 &self,
277 key: RequestKey,
278 peer: RemoteNode<Env::ValidatorNode>,
279 operation: F,
280 ) -> Result<R, NodeError>
281 where
282 R: Cacheable + Clone + Send + 'static,
283 F: Fn(RemoteNode<Env::ValidatorNode>) -> Fut,
284 Fut: Future<Output = Result<R, NodeError>> + 'static,
285 {
286 self.add_peer(peer.clone()).await;
287 self.in_flight_tracker
288 .add_alternative_peer(&key, peer.clone())
289 .await;
290
291 let nodes = self.nodes.clone();
293 self.deduplicated_request(key, peer, move |peer| {
294 let fut = operation(peer.clone());
295 let nodes = nodes.clone();
296 async move { Self::track_request(nodes, peer, fut).await }
297 })
298 .await
299 }
300
301 #[instrument(level = "trace", skip_all)]
302 async fn download_blob(
303 &self,
304 peers: &[RemoteNode<Env::ValidatorNode>],
305 blob_id: BlobId,
306 timeout: Duration,
307 ) -> Result<Option<Blob>, NodeError> {
308 let key = RequestKey::Blob(blob_id);
309 communicate_concurrently(
310 peers,
311 async move |peer| {
312 self.with_peer(key, peer, move |peer| async move {
313 peer.download_blob(blob_id).await
314 })
315 .await
316 },
317 timeout,
318 )
319 .await
320 .map_err(|errors| {
321 for (validator, error) in &errors {
322 warn!(
323 %validator,
324 %blob_id,
325 %error,
326 "failed to download blob from validator",
327 );
328 }
329 errors
330 .into_iter()
331 .last()
332 .map_or(NodeError::NoValidators, |(_, error)| error)
333 })
334 }
335
336 #[instrument(level = "trace", skip_all)]
340 pub async fn download_blobs(
341 &self,
342 peers: &[RemoteNode<Env::ValidatorNode>],
343 blob_ids: &[BlobId],
344 timeout: Duration,
345 ) -> Result<Option<Vec<Blob>>, NodeError> {
346 let mut stream = blob_ids
347 .iter()
348 .map(|blob_id| self.download_blob(peers, *blob_id, timeout))
349 .collect::<FuturesUnordered<_>>();
350
351 let mut blobs = Vec::new();
352 while let Some(maybe_blob) = stream.next().await {
353 blobs.push(maybe_blob?);
354 }
355 Ok(blobs.into_iter().collect::<Option<Vec<_>>>())
356 }
357
358 pub async fn download_certificates(
359 &self,
360 peer: &RemoteNode<Env::ValidatorNode>,
361 chain_id: ChainId,
362 start: BlockHeight,
363 limit: u64,
364 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
365 let heights = (start.0..start.0 + limit)
366 .map(BlockHeight)
367 .collect::<Vec<_>>();
368 self.with_peer(
369 RequestKey::Certificates {
370 chain_id,
371 heights: heights.clone(),
372 },
373 peer.clone(),
374 move |peer| {
375 let heights = heights.clone();
376 async move {
377 Box::pin(peer.download_certificates_by_heights(chain_id, heights)).await
378 }
379 },
380 )
381 .await
382 }
383
384 pub async fn download_certificates_from_validators(
387 &self,
388 peers: &[RemoteNode<Env::ValidatorNode>],
389 chain_id: ChainId,
390 start: BlockHeight,
391 limit: u64,
392 timeout: Duration,
393 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
394 if peers.is_empty() {
395 return Err(NodeError::NoValidators);
396 }
397 let heights = (start.0..start.0 + limit)
398 .map(BlockHeight)
399 .collect::<Vec<_>>();
400 let key = RequestKey::Certificates {
401 chain_id,
402 heights: heights.clone(),
403 };
404 communicate_concurrently(
405 peers,
406 async move |peer| {
407 self.with_peer(key, peer, move |peer| {
408 let heights = heights.clone();
409 async move {
410 Box::pin(peer.download_certificates_by_heights(chain_id, heights)).await
411 }
412 })
413 .await
414 },
415 timeout,
416 )
417 .await
418 .map_err(|errors| {
419 for (validator, error) in &errors {
420 warn!(
421 %validator,
422 %chain_id,
423 %error,
424 "failed to download certificates from validator",
425 );
426 }
427 errors
428 .into_iter()
429 .last()
430 .map_or(NodeError::NoValidators, |(_, error)| error)
431 })
432 }
433
434 pub async fn download_certificates_by_heights(
435 &self,
436 peer: &RemoteNode<Env::ValidatorNode>,
437 chain_id: ChainId,
438 heights: Vec<BlockHeight>,
439 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
440 self.with_peer(
441 RequestKey::Certificates {
442 chain_id,
443 heights: heights.clone(),
444 },
445 peer.clone(),
446 move |peer| {
447 let heights = heights.clone();
448 async move {
449 peer.download_certificates_by_heights(chain_id, heights)
450 .await
451 }
452 },
453 )
454 .await
455 }
456
457 pub async fn download_certificate_for_blob(
458 &self,
459 peer: &RemoteNode<Env::ValidatorNode>,
460 blob_id: BlobId,
461 ) -> Result<ConfirmedBlockCertificate, NodeError> {
462 self.with_peer(
463 RequestKey::CertificateForBlob(blob_id),
464 peer.clone(),
465 move |peer| async move { peer.download_certificate_for_blob(blob_id).await },
466 )
467 .await
468 }
469
470 pub async fn download_pending_blob(
471 &self,
472 peer: &RemoteNode<Env::ValidatorNode>,
473 chain_id: ChainId,
474 blob_id: BlobId,
475 ) -> Result<BlobContent, NodeError> {
476 self.with_peer(
477 RequestKey::PendingBlob { chain_id, blob_id },
478 peer.clone(),
479 move |peer| async move { peer.node.download_pending_blob(chain_id, blob_id).await },
480 )
481 .await
482 }
483
484 pub async fn get_alternative_peers(
489 &self,
490 key: &RequestKey,
491 ) -> Option<Vec<RemoteNode<Env::ValidatorNode>>> {
492 self.in_flight_tracker.get_alternative_peers(key).await
493 }
494
495 async fn track_request<T, Fut>(
509 nodes: Arc<tokio::sync::RwLock<BTreeMap<ValidatorPublicKey, NodeInfo<Env>>>>,
510 peer: RemoteNode<Env::ValidatorNode>,
511 operation: Fut,
512 ) -> Result<T, NodeError>
513 where
514 Fut: Future<Output = Result<T, NodeError>> + 'static,
515 {
516 let start_time = Instant::now();
517 let public_key = peer.public_key;
518
519 let result = operation.await;
521
522 #[expect(
524 clippy::cast_possible_truncation,
525 reason = "elapsed millis fits in u64 for any realistic measurement window"
526 )]
527 let response_time_ms = start_time.elapsed().as_millis() as u64;
528 let is_success = result.is_ok();
529 {
530 let mut nodes_guard = nodes.write().await;
531 if let Some(info) = nodes_guard.get_mut(&public_key) {
532 info.update_metrics(is_success, response_time_ms);
533 let score = info.calculate_score().await;
534 tracing::trace!(
535 node = %public_key,
536 address = %info.node.node.address(),
537 success = %is_success,
538 response_time_ms = %response_time_ms,
539 score = %score,
540 total_requests = %info.total_requests(),
541 "Request completed"
542 );
543 }
544 }
545
546 #[cfg(with_metrics)]
548 {
549 let validator_name = public_key.to_string();
550 metrics::VALIDATOR_RESPONSE_TIME
551 .with_label_values(&[&validator_name])
552 .observe(response_time_ms as f64);
553 metrics::VALIDATOR_REQUEST_TOTAL
554 .with_label_values(&[&validator_name])
555 .inc();
556 if is_success {
557 metrics::VALIDATOR_REQUEST_SUCCESS
558 .with_label_values(&[&validator_name])
559 .inc();
560 }
561 }
562
563 result
564 }
565
566 async fn deduplicated_request<T, F, Fut>(
583 &self,
584 key: RequestKey,
585 peer: RemoteNode<Env::ValidatorNode>,
586 operation: F,
587 ) -> Result<T, NodeError>
588 where
589 T: Cacheable + Clone + Send + 'static,
590 F: Fn(RemoteNode<Env::ValidatorNode>) -> Fut,
591 Fut: Future<Output = Result<T, NodeError>> + 'static,
592 {
593 if let Some(result) = self.cache.get(&key).await {
595 return Ok(result);
596 }
597
598 if let Some(in_flight_match) = self.in_flight_tracker.try_subscribe(&key).await {
600 match in_flight_match {
601 InFlightMatch::Exact(Subscribed(mut receiver)) => {
602 tracing::trace!(
603 ?key,
604 "deduplicating request (exact match) - joining existing in-flight request"
605 );
606 #[cfg(with_metrics)]
607 metrics::REQUEST_CACHE_DEDUPLICATION.inc();
608 match receiver.recv().await {
610 Ok(result) => match result.as_ref().clone() {
611 Ok(res) => match T::try_from(res) {
612 Ok(converted) => {
613 tracing::trace!(
614 ?key,
615 "received result from deduplicated in-flight request"
616 );
617 return Ok(converted);
618 }
619 Err(_) => {
620 tracing::warn!(
621 ?key,
622 "failed to convert result from deduplicated in-flight request, will execute independently"
623 );
624 }
625 },
626 Err(error) => {
627 tracing::trace!(
628 ?key,
629 %error,
630 "in-flight request failed",
631 );
632 }
634 },
635 Err(_) => {
636 tracing::trace!(?key, "in-flight request sender dropped");
637 }
639 }
640 }
641 InFlightMatch::Subsuming {
642 key: subsuming_key,
643 outcome: Subscribed(mut receiver),
644 } => {
645 tracing::trace!(
646 ?key,
647 subsumed_by = ?subsuming_key,
648 "deduplicating request (subsumption) - joining larger in-flight request"
649 );
650 #[cfg(with_metrics)]
651 metrics::REQUEST_CACHE_DEDUPLICATION.inc();
652 match receiver.recv().await {
654 Ok(result) => {
655 match result.as_ref() {
656 Ok(res) => {
657 if let Some(extracted) =
658 key.try_extract_result(&subsuming_key, res)
659 {
660 tracing::trace!(
661 ?key,
662 "extracted subset result from larger in-flight request"
663 );
664 match T::try_from(extracted) {
665 Ok(converted) => return Ok(converted),
666 Err(_) => {
667 tracing::trace!(
668 ?key,
669 "failed to convert extracted result, will execute independently"
670 );
671 }
672 }
673 } else {
674 tracing::trace!(
676 ?key,
677 "failed to extract from subsuming request, will execute independently"
678 );
679 }
680 }
681 Err(error) => {
682 tracing::trace!(
683 ?key,
684 ?error,
685 "subsuming in-flight request failed",
686 );
687 }
689 }
690 }
691 Err(_) => {
692 tracing::trace!(?key, "subsuming in-flight request sender dropped");
693 }
694 }
695 }
696 }
697 };
698
699 self.in_flight_tracker.insert_new(key.clone()).await;
701
702 self.in_flight_tracker
704 .remove_alternative_peer(&key, &peer)
705 .await;
706
707 tracing::trace!(?key, ?peer, "executing staggered parallel request");
710 let result = self
711 .try_staggered_parallel(&key, peer, &operation, self.retry_delay)
712 .await;
713
714 let result_for_broadcast: Result<RequestResult, NodeError> = result.clone().map(Into::into);
715 let shared_result = Arc::new(result_for_broadcast);
716
717 self.in_flight_tracker
719 .complete_and_broadcast(&key, shared_result.clone())
720 .await;
721
722 if let Ok(success) = shared_result.as_ref() {
723 self.cache
724 .store(key.clone(), Arc::new(success.clone()))
725 .await;
726 }
727 result
728 }
729
730 async fn try_staggered_parallel<T, F, Fut>(
745 &self,
746 key: &RequestKey,
747 first_peer: RemoteNode<Env::ValidatorNode>,
748 operation: &F,
749 staggered_delay: Duration,
750 ) -> Result<T, NodeError>
751 where
752 T: 'static,
753 F: Fn(RemoteNode<Env::ValidatorNode>) -> Fut,
754 Fut: Future<Output = Result<T, NodeError>> + 'static,
755 {
756 use futures::{
757 future::{select, Either},
758 stream::{FuturesUnordered, StreamExt},
759 };
760 use linera_base::time::timer::sleep;
761
762 let mut futures: FuturesUnordered<Fut> = FuturesUnordered::new();
763 let peer_index = AtomicU32::new(0);
764
765 let push_future = |futures: &mut FuturesUnordered<Fut>, fut: Fut| {
766 futures.push(fut);
767 peer_index.fetch_add(1, Ordering::SeqCst)
768 };
769
770 push_future(&mut futures, operation(first_peer));
772
773 let mut last_error = NodeError::UnexpectedMessage;
774 let mut next_delay = Box::pin(sleep(staggered_delay * peer_index.load(Ordering::SeqCst)));
775
776 loop {
778 if futures.is_empty() {
780 if let Some(peer) = self.in_flight_tracker.pop_alternative_peer(key).await {
781 push_future(&mut futures, operation(peer));
782 next_delay =
783 Box::pin(sleep(staggered_delay * peer_index.load(Ordering::SeqCst)));
784 } else {
785 break;
787 }
788 }
789
790 let next_result = Box::pin(futures.next());
791
792 match select(next_result, next_delay).await {
793 Either::Left((Some(result), delay_fut)) => {
795 next_delay = delay_fut;
797
798 match result {
799 Ok(value) => {
800 tracing::trace!(?key, "staggered parallel request succeeded");
801 return Ok(value);
802 }
803 Err(error) => {
804 tracing::debug!(
805 ?key,
806 %error,
807 "staggered parallel request attempt failed"
808 );
809 last_error = error;
810
811 if let Some(peer) =
813 self.in_flight_tracker.pop_alternative_peer(key).await
814 {
815 push_future(&mut futures, operation(peer));
816 next_delay = Box::pin(sleep(
817 staggered_delay * peer_index.load(Ordering::SeqCst),
818 ));
819 }
820 }
821 }
822 }
823 Either::Left((None, delay_fut)) => {
825 next_delay = delay_fut;
827 continue;
829 }
830 Either::Right((_, _)) => {
832 if let Some(peer) = self.in_flight_tracker.pop_alternative_peer(key).await {
833 push_future(&mut futures, operation(peer));
834 next_delay =
835 Box::pin(sleep(staggered_delay * peer_index.load(Ordering::SeqCst)));
836 } else {
837 break;
839 }
840 }
841 }
842 }
843
844 while let Some(result) = futures.next().await {
846 match result {
847 Ok(value) => {
848 tracing::trace!(?key, "staggered parallel request succeeded");
849 return Ok(value);
850 }
851 Err(error) => {
852 tracing::debug!(
853 ?key,
854 %error,
855 "staggered parallel request attempt failed"
856 );
857 last_error = error;
858 }
859 }
860 }
861
862 tracing::debug!(?key, "all staggered parallel retry attempts failed");
864 Err(last_error)
865 }
866
867 async fn peers_by_score(&self) -> Vec<(f64, RemoteNode<Env::ValidatorNode>)> {
876 let nodes = self.nodes.read().await;
877
878 let mut scored_nodes = Vec::new();
880 for info in nodes.values() {
881 let score = info.calculate_score().await;
882 scored_nodes.push((score, info.node.clone()));
883 }
884
885 scored_nodes.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
887
888 scored_nodes
889 }
890
891 async fn select_best_peer(&self) -> Option<RemoteNode<Env::ValidatorNode>> {
902 let scored_nodes = self.peers_by_score().await;
903
904 if scored_nodes.is_empty() {
905 return None;
906 }
907
908 let top_count = scored_nodes.len().min(3);
910 let top_nodes = &scored_nodes[..top_count];
911
912 let weights: Vec<f64> = top_nodes.iter().map(|(score, _)| score.max(0.01)).collect();
915
916 if let Ok(dist) = WeightedIndex::new(&weights) {
917 let mut rng = rand::thread_rng();
918 let index = dist.sample(&mut rng);
919 Some(top_nodes[index].1.clone())
920 } else {
921 tracing::warn!("failed to create weighted distribution, defaulting to best node");
923 Some(scored_nodes[0].1.clone())
924 }
925 }
926
927 async fn add_peer(&self, node: RemoteNode<Env::ValidatorNode>) {
929 let mut nodes = self.nodes.write().await;
930 let public_key = node.public_key;
931 nodes.entry(public_key).or_insert_with(|| {
932 NodeInfo::with_config(node, self.weights, self.alpha, self.max_expected_latency)
933 });
934 }
935}
936
937#[cfg(test)]
938mod tests {
939 use std::sync::{
940 atomic::{AtomicUsize, Ordering},
941 Arc,
942 };
943
944 use linera_base::{
945 crypto::{CryptoHash, InMemorySigner},
946 data_types::BlockHeight,
947 identifiers::ChainId,
948 time::Duration,
949 };
950 use linera_chain::types::ConfirmedBlockCertificate;
951 use tokio::sync::oneshot;
952
953 use super::{super::request::RequestKey, *};
954 use crate::{
955 client::requests_scheduler::{MAX_REQUEST_TTL_MS, STAGGERED_DELAY_MS},
956 node::NodeError,
957 };
958
959 type TestEnvironment = crate::environment::Test;
960
961 fn create_test_manager(
963 in_flight_timeout: Duration,
964 cache_ttl: Duration,
965 ) -> Arc<RequestsScheduler<TestEnvironment>> {
966 let mut manager = RequestsScheduler::with_config(
967 vec![], ScoringWeights::default(),
969 0.1,
970 1000.0,
971 cache_ttl,
972 100,
973 in_flight_timeout,
974 Duration::from_millis(STAGGERED_DELAY_MS),
975 );
976 manager.in_flight_tracker = InFlightTracker::new(in_flight_timeout);
978 Arc::new(manager)
979 }
980
981 fn test_key() -> RequestKey {
983 RequestKey::Certificates {
984 chain_id: ChainId(CryptoHash::test_hash("test")),
985 heights: vec![BlockHeight(0), BlockHeight(1)],
986 }
987 }
988
989 fn dummy_peer() -> RemoteNode<<TestEnvironment as Environment>::ValidatorNode> {
991 use crate::test_utils::{MemoryStorageBuilder, TestBuilder};
992
993 let mut builder = futures::executor::block_on(async {
995 TestBuilder::new(
996 MemoryStorageBuilder::default(),
997 1,
998 0,
999 linera_base::crypto::InMemorySigner::new(None),
1000 )
1001 .await
1002 .unwrap()
1003 });
1004
1005 let node = builder.node(0);
1006 let public_key = node.name();
1007 RemoteNode { public_key, node }
1008 }
1009
1010 #[tokio::test]
1011 async fn test_cache_hit_returns_cached_result() {
1012 let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60));
1014 let key = test_key();
1015 let peer = dummy_peer();
1016
1017 let execution_count = Arc::new(AtomicUsize::new(0));
1019 let execution_count_clone = execution_count.clone();
1020
1021 let result1: Result<Vec<ConfirmedBlockCertificate>, NodeError> = manager
1023 .deduplicated_request(key.clone(), peer.clone(), |_| {
1024 let count = execution_count_clone.clone();
1025 async move {
1026 count.fetch_add(1, Ordering::SeqCst);
1027 Ok(vec![])
1028 }
1029 })
1030 .await;
1031
1032 assert!(result1.is_ok());
1033 assert_eq!(execution_count.load(Ordering::SeqCst), 1);
1034
1035 let execution_count_clone2 = execution_count.clone();
1037 let result2: Result<Vec<ConfirmedBlockCertificate>, NodeError> = manager
1038 .deduplicated_request(key.clone(), peer.clone(), |_| {
1039 let count = execution_count_clone2.clone();
1040 async move {
1041 count.fetch_add(1, Ordering::SeqCst);
1042 Ok(vec![])
1043 }
1044 })
1045 .await;
1046
1047 assert_eq!(result1, result2);
1048 assert_eq!(execution_count.load(Ordering::SeqCst), 1);
1050 }
1051
1052 #[tokio::test]
1053 async fn test_in_flight_request_deduplication() {
1054 let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60));
1055 let key = test_key();
1056 let peer = dummy_peer();
1057
1058 let execution_count = Arc::new(AtomicUsize::new(0));
1060
1061 let (tx, rx) = oneshot::channel();
1063 let rx = Arc::new(tokio::sync::Mutex::new(Some(rx)));
1064
1065 let manager_clone = Arc::clone(&manager);
1067 let key_clone = key.clone();
1068 let execution_count_clone = execution_count.clone();
1069 let rx_clone = Arc::clone(&rx);
1070 let peer_clone = peer.clone();
1071 let first_request = tokio::spawn(async move {
1072 manager_clone
1073 .deduplicated_request(key_clone, peer_clone, |_| {
1074 let count = execution_count_clone.clone();
1075 let rx = Arc::clone(&rx_clone);
1076 async move {
1077 count.fetch_add(1, Ordering::SeqCst);
1078 if let Some(receiver) = rx.lock().await.take() {
1080 receiver.await.unwrap();
1081 }
1082 Ok(vec![])
1083 }
1084 })
1085 .await
1086 });
1087
1088 let execution_count_clone2 = execution_count.clone();
1090 let second_request = tokio::spawn(async move {
1091 manager
1092 .deduplicated_request(key, peer, |_| {
1093 let count = execution_count_clone2.clone();
1094 async move {
1095 count.fetch_add(1, Ordering::SeqCst);
1096 Ok(vec![])
1097 }
1098 })
1099 .await
1100 });
1101
1102 tx.send(()).unwrap();
1104
1105 let result1: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1107 first_request.await.unwrap();
1108 let result2: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1109 second_request.await.unwrap();
1110
1111 assert!(result1.is_ok());
1112 assert_eq!(result1, result2);
1113
1114 assert_eq!(execution_count.load(Ordering::SeqCst), 1);
1116 }
1117
1118 #[tokio::test]
1119 async fn test_multiple_subscribers_all_notified() {
1120 let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60));
1121 let key = test_key();
1122 let peer = dummy_peer();
1123
1124 let execution_count = Arc::new(AtomicUsize::new(0));
1126
1127 let (tx, rx) = oneshot::channel();
1129 let rx = Arc::new(tokio::sync::Mutex::new(Some(rx)));
1130
1131 let manager_clone1 = Arc::clone(&manager);
1133 let key_clone1 = key.clone();
1134 let execution_count_clone = execution_count.clone();
1135 let rx_clone = Arc::clone(&rx);
1136 let peer_clone = peer.clone();
1137 let first_request = tokio::spawn(async move {
1138 manager_clone1
1139 .deduplicated_request(key_clone1, peer_clone, |_| {
1140 let count = execution_count_clone.clone();
1141 let rx = Arc::clone(&rx_clone);
1142 async move {
1143 count.fetch_add(1, Ordering::SeqCst);
1144 if let Some(receiver) = rx.lock().await.take() {
1145 receiver.await.unwrap();
1146 }
1147 Ok(vec![])
1148 }
1149 })
1150 .await
1151 });
1152
1153 let mut handles = vec![];
1155 for _ in 0..5 {
1156 let manager_clone = Arc::clone(&manager);
1157 let key_clone = key.clone();
1158 let execution_count_clone = execution_count.clone();
1159 let peer_clone = peer.clone();
1160 let handle = tokio::spawn(async move {
1161 manager_clone
1162 .deduplicated_request(key_clone, peer_clone, |_| {
1163 let count = execution_count_clone.clone();
1164 async move {
1165 count.fetch_add(1, Ordering::SeqCst);
1166 Ok(vec![])
1167 }
1168 })
1169 .await
1170 });
1171 handles.push(handle);
1172 }
1173
1174 tx.send(()).unwrap();
1176
1177 let result: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1179 first_request.await.unwrap();
1180 assert!(result.is_ok());
1181
1182 for handle in handles {
1184 assert_eq!(handle.await.unwrap(), result);
1185 }
1186
1187 assert_eq!(execution_count.load(Ordering::SeqCst), 1);
1189 }
1190
1191 #[tokio::test]
1192 async fn test_timeout_triggers_new_request() {
1193 let manager = create_test_manager(Duration::from_millis(50), Duration::from_secs(60));
1195
1196 let key = test_key();
1197 let peer = dummy_peer();
1198
1199 let execution_count = Arc::new(AtomicUsize::new(0));
1201
1202 let (tx, rx) = oneshot::channel();
1204 let rx = Arc::new(tokio::sync::Mutex::new(Some(rx)));
1205
1206 let manager_clone = Arc::clone(&manager);
1208 let key_clone = key.clone();
1209 let execution_count_clone = execution_count.clone();
1210 let rx_clone = Arc::clone(&rx);
1211 let peer_clone = peer.clone();
1212 let first_request = tokio::spawn(async move {
1213 manager_clone
1214 .deduplicated_request(key_clone, peer_clone, |_| {
1215 let count = execution_count_clone.clone();
1216 let rx = Arc::clone(&rx_clone);
1217 async move {
1218 count.fetch_add(1, Ordering::SeqCst);
1219 if let Some(receiver) = rx.lock().await.take() {
1220 receiver.await.unwrap();
1221 }
1222 Ok(vec![])
1223 }
1224 })
1225 .await
1226 });
1227
1228 tokio::time::sleep(Duration::from_millis(MAX_REQUEST_TTL_MS + 1)).await;
1230
1231 let execution_count_clone2 = execution_count.clone();
1233 let second_request = tokio::spawn(async move {
1234 manager
1235 .deduplicated_request(key, peer, |_| {
1236 let count = execution_count_clone2.clone();
1237 async move {
1238 count.fetch_add(1, Ordering::SeqCst);
1239 Ok(vec![])
1240 }
1241 })
1242 .await
1243 });
1244
1245 let result2: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1247 second_request.await.unwrap();
1248 assert!(result2.is_ok());
1249
1250 tx.send(()).unwrap();
1252 let result1: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1253 first_request.await.unwrap();
1254 assert!(result1.is_ok());
1255
1256 assert_eq!(execution_count.load(Ordering::SeqCst), 2);
1258 }
1259
1260 #[tokio::test]
1261 async fn test_alternative_peers_registered_on_deduplication() {
1262 use linera_base::identifiers::BlobType;
1263
1264 use crate::test_utils::{MemoryStorageBuilder, TestBuilder};
1265
1266 let mut builder = TestBuilder::new(
1268 MemoryStorageBuilder::default(),
1269 3,
1270 0,
1271 InMemorySigner::new(None),
1272 )
1273 .await
1274 .unwrap();
1275
1276 let nodes: Vec<_> = (0..3)
1278 .map(|i| {
1279 let node = builder.node(i);
1280 let public_key = node.name();
1281 RemoteNode { public_key, node }
1282 })
1283 .collect();
1284
1285 let manager: Arc<RequestsScheduler<TestEnvironment>> =
1287 Arc::new(RequestsScheduler::with_config(
1288 nodes.clone(),
1289 ScoringWeights::default(),
1290 0.1,
1291 1000.0,
1292 Duration::from_secs(60),
1293 100,
1294 Duration::from_millis(MAX_REQUEST_TTL_MS),
1295 Duration::from_millis(STAGGERED_DELAY_MS),
1296 ));
1297
1298 let key = RequestKey::Blob(BlobId::new(
1299 CryptoHash::test_hash("test_blob"),
1300 BlobType::Data,
1301 ));
1302
1303 let (tx, rx) = oneshot::channel();
1305 let rx = Arc::new(tokio::sync::Mutex::new(Some(rx)));
1306
1307 let manager_clone = Arc::clone(&manager);
1309 let node_clone = nodes[0].clone();
1310 let key_clone = key.clone();
1311 let rx_clone = Arc::clone(&rx);
1312 let first_request = tokio::spawn(async move {
1313 manager_clone
1314 .with_peer(key_clone, node_clone, move |_peer| {
1315 let rx = Arc::clone(&rx_clone);
1316 async move {
1317 if let Some(receiver) = rx.lock().await.take() {
1319 receiver.await.unwrap();
1320 }
1321 Ok(None) }
1323 })
1324 .await
1325 });
1326
1327 tokio::time::sleep(Duration::from_millis(100)).await;
1329
1330 let handles: Vec<_> = vec![nodes[1].clone(), nodes[2].clone()]
1333 .into_iter()
1334 .map(|node| {
1335 let manager_clone = Arc::clone(&manager);
1336 let key_clone = key.clone();
1337 tokio::spawn(async move {
1338 manager_clone
1339 .with_peer(key_clone, node, |_peer| async move {
1340 Ok(None) })
1342 .await
1343 })
1344 })
1345 .collect();
1346
1347 tokio::time::sleep(Duration::from_millis(100)).await;
1349
1350 tx.send(()).unwrap();
1358
1359 let _result1 = first_request.await.unwrap();
1361 for handle in handles {
1362 handle.await.unwrap().ok();
1363 }
1364
1365 tokio::time::sleep(Duration::from_millis(50)).await;
1367 let alt_peers = manager.get_alternative_peers(&key).await;
1368 assert!(
1369 alt_peers.is_none(),
1370 "Expected in-flight entry to be removed after completion"
1371 );
1372 }
1373
1374 #[tokio::test]
1375 async fn test_staggered_parallel_retry_on_failure() {
1376 use std::sync::atomic::{AtomicU64, Ordering};
1377
1378 use crate::test_utils::{MemoryStorageBuilder, TestBuilder};
1379
1380 let mut builder = TestBuilder::new(
1382 MemoryStorageBuilder::default(),
1383 4,
1384 0,
1385 InMemorySigner::new(None),
1386 )
1387 .await
1388 .unwrap();
1389
1390 let nodes: Vec<_> = (0..4)
1392 .map(|i| {
1393 let node = builder.node(i);
1394 let public_key = node.name();
1395 RemoteNode { public_key, node }
1396 })
1397 .collect();
1398
1399 let staggered_delay = Duration::from_millis(100);
1400
1401 let node0_key = nodes[0].public_key;
1403 let node2_key = nodes[2].public_key;
1404
1405 let manager: Arc<RequestsScheduler<TestEnvironment>> =
1407 Arc::new(RequestsScheduler::with_config(
1408 nodes.clone(),
1409 ScoringWeights::default(),
1410 0.1,
1411 1000.0,
1412 Duration::from_secs(60),
1413 100,
1414 Duration::from_millis(MAX_REQUEST_TTL_MS),
1415 staggered_delay,
1416 ));
1417
1418 let key = test_key();
1419
1420 let call_times = Arc::new(tokio::sync::Mutex::new(Vec::new()));
1422 let start_time = Instant::now();
1423
1424 let call_count = Arc::new(AtomicU64::new(0));
1426
1427 let call_times_clone = Arc::clone(&call_times);
1428 let call_count_clone = Arc::clone(&call_count);
1429
1430 let operation = |peer: RemoteNode<<TestEnvironment as Environment>::ValidatorNode>| {
1432 let times = Arc::clone(&call_times_clone);
1433 let count = Arc::clone(&call_count_clone);
1434 let start = start_time;
1435 async move {
1436 let elapsed = Instant::now().duration_since(start);
1437 times.lock().await.push((peer.public_key, elapsed));
1438 count.fetch_add(1, Ordering::SeqCst);
1439
1440 if peer.public_key == node0_key {
1441 Err(NodeError::UnexpectedMessage)
1443 } else if peer.public_key == node2_key {
1444 tokio::time::sleep(staggered_delay / 2).await;
1446 Ok(vec![])
1447 } else {
1448 tokio::time::sleep(staggered_delay * 2).await;
1450 Err(NodeError::UnexpectedMessage)
1451 }
1452 }
1453 };
1454
1455 manager.in_flight_tracker.insert_new(key.clone()).await;
1457 for node in nodes.iter().skip(1).rev() {
1459 manager
1460 .in_flight_tracker
1461 .add_alternative_peer(&key, node.clone())
1462 .await;
1463 }
1464
1465 let result: Result<Vec<ConfirmedBlockCertificate>, NodeError> = manager
1467 .try_staggered_parallel(&key, nodes[0].clone(), &operation, staggered_delay)
1468 .await;
1469
1470 assert!(
1472 result.is_ok(),
1473 "Expected request to succeed with alternative peer"
1474 );
1475
1476 let times = call_times.lock().await;
1478 assert!(
1480 times.len() >= 2,
1481 "Should have tried at least 2 peers, got {}",
1482 times.len()
1483 );
1484
1485 assert!(
1487 times[0].1.as_millis() < 50,
1488 "First peer should be called immediately, was called at {}ms",
1489 times[0].1.as_millis()
1490 );
1491
1492 if times.len() > 1 {
1495 let delay = times[1].1.as_millis();
1496 assert!(
1497 delay < 50,
1498 "Second peer should be called immediately on first failure, got {delay}ms"
1499 );
1500 }
1501
1502 let total_time = Instant::now().duration_since(start_time).as_millis();
1507 assert!(
1508 total_time < 500,
1509 "Total time should be less than 500ms (sequential would be ~650ms), got {total_time}ms"
1510 );
1511 }
1512}