1use std::{
5 collections::BTreeMap,
6 future::Future,
7 sync::{
8 atomic::{AtomicU32, Ordering},
9 Arc,
10 },
11};
12
13use custom_debug_derive::Debug;
14use futures::stream::{FuturesUnordered, StreamExt};
15use linera_base::{
16 crypto::ValidatorPublicKey,
17 data_types::{Blob, BlobContent, BlockHeight},
18 identifiers::{BlobId, ChainId},
19 time::{Duration, Instant},
20};
21use linera_chain::types::ConfirmedBlockCertificate;
22use rand::distributions::{Distribution, WeightedIndex};
23use tracing::instrument;
24
25use super::{
26 cache::{RequestsCache, SubsumingKey},
27 in_flight_tracker::{InFlightMatch, InFlightTracker},
28 node_info::NodeInfo,
29 request::{RequestKey, RequestResult},
30 scoring::ScoringWeights,
31};
32use crate::{
33 client::{
34 communicate_concurrently,
35 requests_scheduler::{in_flight_tracker::Subscribed, request::Cacheable},
36 RequestsSchedulerConfig,
37 },
38 environment::Environment,
39 node::{NodeError, ValidatorNode},
40 remote_node::RemoteNode,
41};
42
43#[cfg(with_metrics)]
44pub(super) mod metrics {
45 use std::sync::LazyLock;
46
47 use linera_base::prometheus_util::{
48 exponential_bucket_latencies, register_histogram_vec, register_int_counter,
49 register_int_counter_vec,
50 };
51 use prometheus::{HistogramVec, IntCounter, IntCounterVec};
52
53 pub(super) static VALIDATOR_RESPONSE_TIME: LazyLock<HistogramVec> = LazyLock::new(|| {
55 register_histogram_vec(
56 "requests_scheduler_response_time_ms",
57 "Response time for requests to validators in milliseconds",
58 &["validator"],
59 exponential_bucket_latencies(10000.0), )
61 });
62
63 pub(super) static VALIDATOR_REQUEST_TOTAL: LazyLock<IntCounterVec> = LazyLock::new(|| {
65 register_int_counter_vec(
66 "requests_scheduler_request_total",
67 "Total number of requests made to each validator",
68 &["validator"],
69 )
70 });
71
72 pub(super) static VALIDATOR_REQUEST_SUCCESS: LazyLock<IntCounterVec> = LazyLock::new(|| {
74 register_int_counter_vec(
75 "requests_scheduler_request_success",
76 "Number of successful requests to each validator",
77 &["validator"],
78 )
79 });
80
81 pub(super) static REQUEST_CACHE_DEDUPLICATION: LazyLock<IntCounter> = LazyLock::new(|| {
83 register_int_counter(
84 "requests_scheduler_request_deduplication_total",
85 "Number of requests that were deduplicated by finding the result in the cache.",
86 )
87 });
88
89 pub static REQUEST_CACHE_HIT: LazyLock<IntCounter> = LazyLock::new(|| {
91 register_int_counter(
92 "requests_scheduler_request_cache_hit_total",
93 "Number of requests that were served from cache",
94 )
95 });
96}
97
98#[derive(Debug, Clone)]
129pub struct RequestsScheduler<Env: Environment> {
130 nodes: Arc<tokio::sync::RwLock<BTreeMap<ValidatorPublicKey, NodeInfo<Env>>>>,
133 weights: ScoringWeights,
135 alpha: f64,
137 max_expected_latency: f64,
139 retry_delay: Duration,
141 in_flight_tracker: InFlightTracker<RemoteNode<Env::ValidatorNode>>,
143 cache: RequestsCache<RequestKey, RequestResult>,
145}
146
147impl<Env: Environment> RequestsScheduler<Env> {
148 pub fn new(
150 nodes: impl IntoIterator<Item = RemoteNode<Env::ValidatorNode>>,
151 config: &RequestsSchedulerConfig,
152 ) -> Self {
153 Self::with_config(
154 nodes,
155 ScoringWeights::default(),
156 config.alpha,
157 config.max_accepted_latency_ms,
158 Duration::from_millis(config.cache_ttl_ms),
159 config.cache_max_size,
160 Duration::from_millis(config.max_request_ttl_ms),
161 Duration::from_millis(config.retry_delay_ms),
162 )
163 }
164
165 #[expect(clippy::too_many_arguments)]
178 pub fn with_config(
179 nodes: impl IntoIterator<Item = RemoteNode<Env::ValidatorNode>>,
180 weights: ScoringWeights,
181 alpha: f64,
182 max_expected_latency_ms: f64,
183 cache_ttl: Duration,
184 max_cache_size: usize,
185 max_request_ttl: Duration,
186 retry_delay: Duration,
187 ) -> Self {
188 assert!(alpha > 0.0 && alpha < 1.0, "Alpha must be in (0, 1) range");
189 Self {
190 nodes: Arc::new(tokio::sync::RwLock::new(
191 nodes
192 .into_iter()
193 .map(|node| {
194 (
195 node.public_key,
196 NodeInfo::with_config(node, weights, alpha, max_expected_latency_ms),
197 )
198 })
199 .collect(),
200 )),
201 weights,
202 alpha,
203 max_expected_latency: max_expected_latency_ms,
204 retry_delay,
205 in_flight_tracker: InFlightTracker::new(max_request_ttl),
206 cache: RequestsCache::new(cache_ttl, max_cache_size),
207 }
208 }
209
210 #[allow(unused)]
241 async fn with_best<R, F, Fut>(&self, key: RequestKey, operation: F) -> Result<R, NodeError>
242 where
243 R: Cacheable + Clone + Send + 'static,
244 F: Fn(RemoteNode<Env::ValidatorNode>) -> Fut,
245 Fut: Future<Output = Result<R, NodeError>> + 'static,
246 {
247 let peer = self
249 .select_best_peer()
250 .await
251 .ok_or_else(|| NodeError::WorkerError {
252 error: "No validators available".to_string(),
253 })?;
254 self.with_peer(key, peer, operation).await
255 }
256
257 async fn with_peer<R, F, Fut>(
276 &self,
277 key: RequestKey,
278 peer: RemoteNode<Env::ValidatorNode>,
279 operation: F,
280 ) -> Result<R, NodeError>
281 where
282 R: Cacheable + Clone + Send + 'static,
283 F: Fn(RemoteNode<Env::ValidatorNode>) -> Fut,
284 Fut: Future<Output = Result<R, NodeError>> + 'static,
285 {
286 self.add_peer(peer.clone()).await;
287 self.in_flight_tracker
288 .add_alternative_peer(&key, peer.clone())
289 .await;
290
291 let nodes = self.nodes.clone();
293 self.deduplicated_request(key, peer, move |peer| {
294 let fut = operation(peer.clone());
295 let nodes = nodes.clone();
296 async move { Self::track_request(nodes, peer, fut).await }
297 })
298 .await
299 }
300
301 #[instrument(level = "trace", skip_all)]
302 async fn download_blob(
303 &self,
304 peers: &[RemoteNode<Env::ValidatorNode>],
305 blob_id: BlobId,
306 timeout: Duration,
307 ) -> Result<Option<Blob>, NodeError> {
308 let key = RequestKey::Blob(blob_id);
309 communicate_concurrently(
310 peers,
311 async move |peer| {
312 self.with_peer(key, peer, move |peer| async move {
313 peer.download_blob(blob_id).await
314 })
315 .await
316 },
317 |errors| errors.last().cloned().unwrap(),
318 timeout,
319 )
320 .await
321 .map_err(|(_validator, error)| error)
322 }
323
324 #[instrument(level = "trace", skip_all)]
328 pub async fn download_blobs(
329 &self,
330 peers: &[RemoteNode<Env::ValidatorNode>],
331 blob_ids: &[BlobId],
332 timeout: Duration,
333 ) -> Result<Option<Vec<Blob>>, NodeError> {
334 let mut stream = blob_ids
335 .iter()
336 .map(|blob_id| self.download_blob(peers, *blob_id, timeout))
337 .collect::<FuturesUnordered<_>>();
338
339 let mut blobs = Vec::new();
340 while let Some(maybe_blob) = stream.next().await {
341 blobs.push(maybe_blob?);
342 }
343 Ok(blobs.into_iter().collect::<Option<Vec<_>>>())
344 }
345
346 pub async fn download_certificates(
347 &self,
348 peer: &RemoteNode<Env::ValidatorNode>,
349 chain_id: ChainId,
350 start: BlockHeight,
351 limit: u64,
352 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
353 let heights = (start.0..start.0 + limit)
354 .map(BlockHeight)
355 .collect::<Vec<_>>();
356 self.with_peer(
357 RequestKey::Certificates {
358 chain_id,
359 heights: heights.clone(),
360 },
361 peer.clone(),
362 move |peer| {
363 let heights = heights.clone();
364 async move {
365 Box::pin(peer.download_certificates_by_heights(chain_id, heights)).await
366 }
367 },
368 )
369 .await
370 }
371
372 pub async fn download_certificates_from_validators(
375 &self,
376 peers: &[RemoteNode<Env::ValidatorNode>],
377 chain_id: ChainId,
378 start: BlockHeight,
379 limit: u64,
380 timeout: Duration,
381 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
382 if peers.is_empty() {
383 return Err(NodeError::NoValidators);
384 }
385 let heights = (start.0..start.0 + limit)
386 .map(BlockHeight)
387 .collect::<Vec<_>>();
388 let key = RequestKey::Certificates {
389 chain_id,
390 heights: heights.clone(),
391 };
392 communicate_concurrently(
393 peers,
394 async move |peer| {
395 self.with_peer(key, peer, move |peer| {
396 let heights = heights.clone();
397 async move {
398 Box::pin(peer.download_certificates_by_heights(chain_id, heights)).await
399 }
400 })
401 .await
402 },
403 |errors| errors.last().cloned().unwrap(),
404 timeout,
405 )
406 .await
407 .map_err(|(_validator, error)| error)
408 }
409
410 pub async fn download_certificates_by_heights(
411 &self,
412 peer: &RemoteNode<Env::ValidatorNode>,
413 chain_id: ChainId,
414 heights: Vec<BlockHeight>,
415 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
416 self.with_peer(
417 RequestKey::Certificates {
418 chain_id,
419 heights: heights.clone(),
420 },
421 peer.clone(),
422 move |peer| {
423 let heights = heights.clone();
424 async move {
425 peer.download_certificates_by_heights(chain_id, heights)
426 .await
427 }
428 },
429 )
430 .await
431 }
432
433 pub async fn download_certificate_for_blob(
434 &self,
435 peer: &RemoteNode<Env::ValidatorNode>,
436 blob_id: BlobId,
437 ) -> Result<ConfirmedBlockCertificate, NodeError> {
438 self.with_peer(
439 RequestKey::CertificateForBlob(blob_id),
440 peer.clone(),
441 move |peer| async move { peer.download_certificate_for_blob(blob_id).await },
442 )
443 .await
444 }
445
446 pub async fn download_pending_blob(
447 &self,
448 peer: &RemoteNode<Env::ValidatorNode>,
449 chain_id: ChainId,
450 blob_id: BlobId,
451 ) -> Result<BlobContent, NodeError> {
452 self.with_peer(
453 RequestKey::PendingBlob { chain_id, blob_id },
454 peer.clone(),
455 move |peer| async move { peer.node.download_pending_blob(chain_id, blob_id).await },
456 )
457 .await
458 }
459
460 pub async fn get_alternative_peers(
465 &self,
466 key: &RequestKey,
467 ) -> Option<Vec<RemoteNode<Env::ValidatorNode>>> {
468 self.in_flight_tracker.get_alternative_peers(key).await
469 }
470
471 async fn track_request<T, Fut>(
485 nodes: Arc<tokio::sync::RwLock<BTreeMap<ValidatorPublicKey, NodeInfo<Env>>>>,
486 peer: RemoteNode<Env::ValidatorNode>,
487 operation: Fut,
488 ) -> Result<T, NodeError>
489 where
490 Fut: Future<Output = Result<T, NodeError>> + 'static,
491 {
492 let start_time = Instant::now();
493 let public_key = peer.public_key;
494
495 let result = operation.await;
497
498 let response_time_ms = start_time.elapsed().as_millis() as u64;
500 let is_success = result.is_ok();
501 {
502 let mut nodes_guard = nodes.write().await;
503 if let Some(info) = nodes_guard.get_mut(&public_key) {
504 info.update_metrics(is_success, response_time_ms);
505 let score = info.calculate_score().await;
506 tracing::trace!(
507 node = %public_key,
508 address = %info.node.node.address(),
509 success = %is_success,
510 response_time_ms = %response_time_ms,
511 score = %score,
512 total_requests = %info.total_requests(),
513 "Request completed"
514 );
515 }
516 }
517
518 #[cfg(with_metrics)]
520 {
521 let validator_name = public_key.to_string();
522 metrics::VALIDATOR_RESPONSE_TIME
523 .with_label_values(&[&validator_name])
524 .observe(response_time_ms as f64);
525 metrics::VALIDATOR_REQUEST_TOTAL
526 .with_label_values(&[&validator_name])
527 .inc();
528 if is_success {
529 metrics::VALIDATOR_REQUEST_SUCCESS
530 .with_label_values(&[&validator_name])
531 .inc();
532 }
533 }
534
535 result
536 }
537
538 async fn deduplicated_request<T, F, Fut>(
555 &self,
556 key: RequestKey,
557 peer: RemoteNode<Env::ValidatorNode>,
558 operation: F,
559 ) -> Result<T, NodeError>
560 where
561 T: Cacheable + Clone + Send + 'static,
562 F: Fn(RemoteNode<Env::ValidatorNode>) -> Fut,
563 Fut: Future<Output = Result<T, NodeError>> + 'static,
564 {
565 if let Some(result) = self.cache.get(&key).await {
567 return Ok(result);
568 }
569
570 if let Some(in_flight_match) = self.in_flight_tracker.try_subscribe(&key).await {
572 match in_flight_match {
573 InFlightMatch::Exact(Subscribed(mut receiver)) => {
574 tracing::trace!(
575 ?key,
576 "deduplicating request (exact match) - joining existing in-flight request"
577 );
578 #[cfg(with_metrics)]
579 metrics::REQUEST_CACHE_DEDUPLICATION.inc();
580 match receiver.recv().await {
582 Ok(result) => match result.as_ref().clone() {
583 Ok(res) => match T::try_from(res) {
584 Ok(converted) => {
585 tracing::trace!(
586 ?key,
587 "received result from deduplicated in-flight request"
588 );
589 return Ok(converted);
590 }
591 Err(_) => {
592 tracing::warn!(
593 ?key,
594 "failed to convert result from deduplicated in-flight request, will execute independently"
595 );
596 }
597 },
598 Err(error) => {
599 tracing::trace!(
600 ?key,
601 %error,
602 "in-flight request failed",
603 );
604 }
606 },
607 Err(_) => {
608 tracing::trace!(?key, "in-flight request sender dropped");
609 }
611 }
612 }
613 InFlightMatch::Subsuming {
614 key: subsuming_key,
615 outcome: Subscribed(mut receiver),
616 } => {
617 tracing::trace!(
618 ?key,
619 subsumed_by = ?subsuming_key,
620 "deduplicating request (subsumption) - joining larger in-flight request"
621 );
622 #[cfg(with_metrics)]
623 metrics::REQUEST_CACHE_DEDUPLICATION.inc();
624 match receiver.recv().await {
626 Ok(result) => {
627 match result.as_ref() {
628 Ok(res) => {
629 if let Some(extracted) =
630 key.try_extract_result(&subsuming_key, res)
631 {
632 tracing::trace!(
633 ?key,
634 "extracted subset result from larger in-flight request"
635 );
636 match T::try_from(extracted) {
637 Ok(converted) => return Ok(converted),
638 Err(_) => {
639 tracing::trace!(
640 ?key,
641 "failed to convert extracted result, will execute independently"
642 );
643 }
644 }
645 } else {
646 tracing::trace!(
648 ?key,
649 "failed to extract from subsuming request, will execute independently"
650 );
651 }
652 }
653 Err(error) => {
654 tracing::trace!(
655 ?key,
656 ?error,
657 "subsuming in-flight request failed",
658 );
659 }
661 }
662 }
663 Err(_) => {
664 tracing::trace!(?key, "subsuming in-flight request sender dropped");
665 }
666 }
667 }
668 }
669 };
670
671 self.in_flight_tracker.insert_new(key.clone()).await;
673
674 self.in_flight_tracker
676 .remove_alternative_peer(&key, &peer)
677 .await;
678
679 tracing::trace!(?key, ?peer, "executing staggered parallel request");
682 let result = self
683 .try_staggered_parallel(&key, peer, &operation, self.retry_delay)
684 .await;
685
686 let result_for_broadcast: Result<RequestResult, NodeError> = result.clone().map(Into::into);
687 let shared_result = Arc::new(result_for_broadcast);
688
689 self.in_flight_tracker
691 .complete_and_broadcast(&key, shared_result.clone())
692 .await;
693
694 if let Ok(success) = shared_result.as_ref() {
695 self.cache
696 .store(key.clone(), Arc::new(success.clone()))
697 .await;
698 }
699 result
700 }
701
702 async fn try_staggered_parallel<T, F, Fut>(
717 &self,
718 key: &RequestKey,
719 first_peer: RemoteNode<Env::ValidatorNode>,
720 operation: &F,
721 staggered_delay: Duration,
722 ) -> Result<T, NodeError>
723 where
724 T: 'static,
725 F: Fn(RemoteNode<Env::ValidatorNode>) -> Fut,
726 Fut: Future<Output = Result<T, NodeError>> + 'static,
727 {
728 use futures::{
729 future::{select, Either},
730 stream::{FuturesUnordered, StreamExt},
731 };
732 use linera_base::time::timer::sleep;
733
734 let mut futures: FuturesUnordered<Fut> = FuturesUnordered::new();
735 let peer_index = AtomicU32::new(0);
736
737 let push_future = |futures: &mut FuturesUnordered<Fut>, fut: Fut| {
738 futures.push(fut);
739 peer_index.fetch_add(1, Ordering::SeqCst)
740 };
741
742 push_future(&mut futures, operation(first_peer));
744
745 let mut last_error = NodeError::UnexpectedMessage;
746 let mut next_delay = Box::pin(sleep(staggered_delay * peer_index.load(Ordering::SeqCst)));
747
748 loop {
750 if futures.is_empty() {
752 if let Some(peer) = self.in_flight_tracker.pop_alternative_peer(key).await {
753 push_future(&mut futures, operation(peer));
754 next_delay =
755 Box::pin(sleep(staggered_delay * peer_index.load(Ordering::SeqCst)));
756 } else {
757 break;
759 }
760 }
761
762 let next_result = Box::pin(futures.next());
763
764 match select(next_result, next_delay).await {
765 Either::Left((Some(result), delay_fut)) => {
767 next_delay = delay_fut;
769
770 match result {
771 Ok(value) => {
772 tracing::trace!(?key, "staggered parallel request succeeded");
773 return Ok(value);
774 }
775 Err(error) => {
776 tracing::debug!(
777 ?key,
778 %error,
779 "staggered parallel request attempt failed"
780 );
781 last_error = error;
782
783 if let Some(peer) =
785 self.in_flight_tracker.pop_alternative_peer(key).await
786 {
787 push_future(&mut futures, operation(peer));
788 next_delay = Box::pin(sleep(
789 staggered_delay * peer_index.load(Ordering::SeqCst),
790 ));
791 }
792 }
793 }
794 }
795 Either::Left((None, delay_fut)) => {
797 next_delay = delay_fut;
799 continue;
801 }
802 Either::Right((_, _)) => {
804 if let Some(peer) = self.in_flight_tracker.pop_alternative_peer(key).await {
805 push_future(&mut futures, operation(peer));
806 next_delay =
807 Box::pin(sleep(staggered_delay * peer_index.load(Ordering::SeqCst)));
808 } else {
809 break;
811 }
812 }
813 }
814 }
815
816 while let Some(result) = futures.next().await {
818 match result {
819 Ok(value) => {
820 tracing::trace!(?key, "staggered parallel request succeeded");
821 return Ok(value);
822 }
823 Err(error) => {
824 tracing::debug!(
825 ?key,
826 %error,
827 "staggered parallel request attempt failed"
828 );
829 last_error = error;
830 }
831 }
832 }
833
834 tracing::debug!(?key, "all staggered parallel retry attempts failed");
836 Err(last_error)
837 }
838
839 async fn peers_by_score(&self) -> Vec<(f64, RemoteNode<Env::ValidatorNode>)> {
848 let nodes = self.nodes.read().await;
849
850 let mut scored_nodes = Vec::new();
852 for info in nodes.values() {
853 let score = info.calculate_score().await;
854 scored_nodes.push((score, info.node.clone()));
855 }
856
857 scored_nodes.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
859
860 scored_nodes
861 }
862
863 async fn select_best_peer(&self) -> Option<RemoteNode<Env::ValidatorNode>> {
874 let scored_nodes = self.peers_by_score().await;
875
876 if scored_nodes.is_empty() {
877 return None;
878 }
879
880 let top_count = scored_nodes.len().min(3);
882 let top_nodes = &scored_nodes[..top_count];
883
884 let weights: Vec<f64> = top_nodes.iter().map(|(score, _)| score.max(0.01)).collect();
887
888 if let Ok(dist) = WeightedIndex::new(&weights) {
889 let mut rng = rand::thread_rng();
890 let index = dist.sample(&mut rng);
891 Some(top_nodes[index].1.clone())
892 } else {
893 tracing::warn!("failed to create weighted distribution, defaulting to best node");
895 Some(scored_nodes[0].1.clone())
896 }
897 }
898
899 async fn add_peer(&self, node: RemoteNode<Env::ValidatorNode>) {
901 let mut nodes = self.nodes.write().await;
902 let public_key = node.public_key;
903 nodes.entry(public_key).or_insert_with(|| {
904 NodeInfo::with_config(node, self.weights, self.alpha, self.max_expected_latency)
905 });
906 }
907}
908
909#[cfg(test)]
910mod tests {
911 use std::sync::{
912 atomic::{AtomicUsize, Ordering},
913 Arc,
914 };
915
916 use linera_base::{
917 crypto::{CryptoHash, InMemorySigner},
918 data_types::BlockHeight,
919 identifiers::ChainId,
920 time::Duration,
921 };
922 use linera_chain::types::ConfirmedBlockCertificate;
923 use tokio::sync::oneshot;
924
925 use super::{super::request::RequestKey, *};
926 use crate::{
927 client::requests_scheduler::{MAX_REQUEST_TTL_MS, STAGGERED_DELAY_MS},
928 node::NodeError,
929 };
930
931 type TestEnvironment = crate::environment::Test;
932
933 fn create_test_manager(
935 in_flight_timeout: Duration,
936 cache_ttl: Duration,
937 ) -> Arc<RequestsScheduler<TestEnvironment>> {
938 let mut manager = RequestsScheduler::with_config(
939 vec![], ScoringWeights::default(),
941 0.1,
942 1000.0,
943 cache_ttl,
944 100,
945 in_flight_timeout,
946 Duration::from_millis(STAGGERED_DELAY_MS),
947 );
948 manager.in_flight_tracker = InFlightTracker::new(in_flight_timeout);
950 Arc::new(manager)
951 }
952
953 fn test_key() -> RequestKey {
955 RequestKey::Certificates {
956 chain_id: ChainId(CryptoHash::test_hash("test")),
957 heights: vec![BlockHeight(0), BlockHeight(1)],
958 }
959 }
960
961 fn dummy_peer() -> RemoteNode<<TestEnvironment as Environment>::ValidatorNode> {
963 use crate::test_utils::{MemoryStorageBuilder, TestBuilder};
964
965 let mut builder = futures::executor::block_on(async {
967 TestBuilder::new(
968 MemoryStorageBuilder::default(),
969 1,
970 0,
971 linera_base::crypto::InMemorySigner::new(None),
972 )
973 .await
974 .unwrap()
975 });
976
977 let node = builder.node(0);
978 let public_key = node.name();
979 RemoteNode { public_key, node }
980 }
981
982 #[tokio::test]
983 async fn test_cache_hit_returns_cached_result() {
984 let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60));
986 let key = test_key();
987 let peer = dummy_peer();
988
989 let execution_count = Arc::new(AtomicUsize::new(0));
991 let execution_count_clone = execution_count.clone();
992
993 let result1: Result<Vec<ConfirmedBlockCertificate>, NodeError> = manager
995 .deduplicated_request(key.clone(), peer.clone(), |_| {
996 let count = execution_count_clone.clone();
997 async move {
998 count.fetch_add(1, Ordering::SeqCst);
999 Ok(vec![])
1000 }
1001 })
1002 .await;
1003
1004 assert!(result1.is_ok());
1005 assert_eq!(execution_count.load(Ordering::SeqCst), 1);
1006
1007 let execution_count_clone2 = execution_count.clone();
1009 let result2: Result<Vec<ConfirmedBlockCertificate>, NodeError> = manager
1010 .deduplicated_request(key.clone(), peer.clone(), |_| {
1011 let count = execution_count_clone2.clone();
1012 async move {
1013 count.fetch_add(1, Ordering::SeqCst);
1014 Ok(vec![])
1015 }
1016 })
1017 .await;
1018
1019 assert_eq!(result1, result2);
1020 assert_eq!(execution_count.load(Ordering::SeqCst), 1);
1022 }
1023
1024 #[tokio::test]
1025 async fn test_in_flight_request_deduplication() {
1026 let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60));
1027 let key = test_key();
1028 let peer = dummy_peer();
1029
1030 let execution_count = Arc::new(AtomicUsize::new(0));
1032
1033 let (tx, rx) = oneshot::channel();
1035 let rx = Arc::new(tokio::sync::Mutex::new(Some(rx)));
1036
1037 let manager_clone = Arc::clone(&manager);
1039 let key_clone = key.clone();
1040 let execution_count_clone = execution_count.clone();
1041 let rx_clone = Arc::clone(&rx);
1042 let peer_clone = peer.clone();
1043 let first_request = tokio::spawn(async move {
1044 manager_clone
1045 .deduplicated_request(key_clone, peer_clone, |_| {
1046 let count = execution_count_clone.clone();
1047 let rx = Arc::clone(&rx_clone);
1048 async move {
1049 count.fetch_add(1, Ordering::SeqCst);
1050 if let Some(receiver) = rx.lock().await.take() {
1052 receiver.await.unwrap();
1053 }
1054 Ok(vec![])
1055 }
1056 })
1057 .await
1058 });
1059
1060 let execution_count_clone2 = execution_count.clone();
1062 let second_request = tokio::spawn(async move {
1063 manager
1064 .deduplicated_request(key, peer, |_| {
1065 let count = execution_count_clone2.clone();
1066 async move {
1067 count.fetch_add(1, Ordering::SeqCst);
1068 Ok(vec![])
1069 }
1070 })
1071 .await
1072 });
1073
1074 tx.send(()).unwrap();
1076
1077 let result1: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1079 first_request.await.unwrap();
1080 let result2: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1081 second_request.await.unwrap();
1082
1083 assert!(result1.is_ok());
1084 assert_eq!(result1, result2);
1085
1086 assert_eq!(execution_count.load(Ordering::SeqCst), 1);
1088 }
1089
1090 #[tokio::test]
1091 async fn test_multiple_subscribers_all_notified() {
1092 let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60));
1093 let key = test_key();
1094 let peer = dummy_peer();
1095
1096 let execution_count = Arc::new(AtomicUsize::new(0));
1098
1099 let (tx, rx) = oneshot::channel();
1101 let rx = Arc::new(tokio::sync::Mutex::new(Some(rx)));
1102
1103 let manager_clone1 = Arc::clone(&manager);
1105 let key_clone1 = key.clone();
1106 let execution_count_clone = execution_count.clone();
1107 let rx_clone = Arc::clone(&rx);
1108 let peer_clone = peer.clone();
1109 let first_request = tokio::spawn(async move {
1110 manager_clone1
1111 .deduplicated_request(key_clone1, peer_clone, |_| {
1112 let count = execution_count_clone.clone();
1113 let rx = Arc::clone(&rx_clone);
1114 async move {
1115 count.fetch_add(1, Ordering::SeqCst);
1116 if let Some(receiver) = rx.lock().await.take() {
1117 receiver.await.unwrap();
1118 }
1119 Ok(vec![])
1120 }
1121 })
1122 .await
1123 });
1124
1125 let mut handles = vec![];
1127 for _ in 0..5 {
1128 let manager_clone = Arc::clone(&manager);
1129 let key_clone = key.clone();
1130 let execution_count_clone = execution_count.clone();
1131 let peer_clone = peer.clone();
1132 let handle = tokio::spawn(async move {
1133 manager_clone
1134 .deduplicated_request(key_clone, peer_clone, |_| {
1135 let count = execution_count_clone.clone();
1136 async move {
1137 count.fetch_add(1, Ordering::SeqCst);
1138 Ok(vec![])
1139 }
1140 })
1141 .await
1142 });
1143 handles.push(handle);
1144 }
1145
1146 tx.send(()).unwrap();
1148
1149 let result: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1151 first_request.await.unwrap();
1152 assert!(result.is_ok());
1153
1154 for handle in handles {
1156 assert_eq!(handle.await.unwrap(), result);
1157 }
1158
1159 assert_eq!(execution_count.load(Ordering::SeqCst), 1);
1161 }
1162
1163 #[tokio::test]
1164 async fn test_timeout_triggers_new_request() {
1165 let manager = create_test_manager(Duration::from_millis(50), Duration::from_secs(60));
1167
1168 let key = test_key();
1169 let peer = dummy_peer();
1170
1171 let execution_count = Arc::new(AtomicUsize::new(0));
1173
1174 let (tx, rx) = oneshot::channel();
1176 let rx = Arc::new(tokio::sync::Mutex::new(Some(rx)));
1177
1178 let manager_clone = Arc::clone(&manager);
1180 let key_clone = key.clone();
1181 let execution_count_clone = execution_count.clone();
1182 let rx_clone = Arc::clone(&rx);
1183 let peer_clone = peer.clone();
1184 let first_request = tokio::spawn(async move {
1185 manager_clone
1186 .deduplicated_request(key_clone, peer_clone, |_| {
1187 let count = execution_count_clone.clone();
1188 let rx = Arc::clone(&rx_clone);
1189 async move {
1190 count.fetch_add(1, Ordering::SeqCst);
1191 if let Some(receiver) = rx.lock().await.take() {
1192 receiver.await.unwrap();
1193 }
1194 Ok(vec![])
1195 }
1196 })
1197 .await
1198 });
1199
1200 tokio::time::sleep(Duration::from_millis(MAX_REQUEST_TTL_MS + 1)).await;
1202
1203 let execution_count_clone2 = execution_count.clone();
1205 let second_request = tokio::spawn(async move {
1206 manager
1207 .deduplicated_request(key, peer, |_| {
1208 let count = execution_count_clone2.clone();
1209 async move {
1210 count.fetch_add(1, Ordering::SeqCst);
1211 Ok(vec![])
1212 }
1213 })
1214 .await
1215 });
1216
1217 let result2: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1219 second_request.await.unwrap();
1220 assert!(result2.is_ok());
1221
1222 tx.send(()).unwrap();
1224 let result1: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1225 first_request.await.unwrap();
1226 assert!(result1.is_ok());
1227
1228 assert_eq!(execution_count.load(Ordering::SeqCst), 2);
1230 }
1231
1232 #[tokio::test]
1233 async fn test_alternative_peers_registered_on_deduplication() {
1234 use linera_base::identifiers::BlobType;
1235
1236 use crate::test_utils::{MemoryStorageBuilder, TestBuilder};
1237
1238 let mut builder = TestBuilder::new(
1240 MemoryStorageBuilder::default(),
1241 3,
1242 0,
1243 InMemorySigner::new(None),
1244 )
1245 .await
1246 .unwrap();
1247
1248 let nodes: Vec<_> = (0..3)
1250 .map(|i| {
1251 let node = builder.node(i);
1252 let public_key = node.name();
1253 RemoteNode { public_key, node }
1254 })
1255 .collect();
1256
1257 let manager: Arc<RequestsScheduler<TestEnvironment>> =
1259 Arc::new(RequestsScheduler::with_config(
1260 nodes.clone(),
1261 ScoringWeights::default(),
1262 0.1,
1263 1000.0,
1264 Duration::from_secs(60),
1265 100,
1266 Duration::from_millis(MAX_REQUEST_TTL_MS),
1267 Duration::from_millis(STAGGERED_DELAY_MS),
1268 ));
1269
1270 let key = RequestKey::Blob(BlobId::new(
1271 CryptoHash::test_hash("test_blob"),
1272 BlobType::Data,
1273 ));
1274
1275 let (tx, rx) = oneshot::channel();
1277 let rx = Arc::new(tokio::sync::Mutex::new(Some(rx)));
1278
1279 let manager_clone = Arc::clone(&manager);
1281 let node_clone = nodes[0].clone();
1282 let key_clone = key.clone();
1283 let rx_clone = Arc::clone(&rx);
1284 let first_request = tokio::spawn(async move {
1285 manager_clone
1286 .with_peer(key_clone, node_clone, move |_peer| {
1287 let rx = Arc::clone(&rx_clone);
1288 async move {
1289 if let Some(receiver) = rx.lock().await.take() {
1291 receiver.await.unwrap();
1292 }
1293 Ok(None) }
1295 })
1296 .await
1297 });
1298
1299 tokio::time::sleep(Duration::from_millis(100)).await;
1301
1302 let handles: Vec<_> = vec![nodes[1].clone(), nodes[2].clone()]
1305 .into_iter()
1306 .map(|node| {
1307 let manager_clone = Arc::clone(&manager);
1308 let key_clone = key.clone();
1309 tokio::spawn(async move {
1310 manager_clone
1311 .with_peer(key_clone, node, |_peer| async move {
1312 Ok(None) })
1314 .await
1315 })
1316 })
1317 .collect();
1318
1319 tokio::time::sleep(Duration::from_millis(100)).await;
1321
1322 tx.send(()).unwrap();
1330
1331 let _result1 = first_request.await.unwrap();
1333 for handle in handles {
1334 handle.await.unwrap().ok();
1335 }
1336
1337 tokio::time::sleep(Duration::from_millis(50)).await;
1339 let alt_peers = manager.get_alternative_peers(&key).await;
1340 assert!(
1341 alt_peers.is_none(),
1342 "Expected in-flight entry to be removed after completion"
1343 );
1344 }
1345
1346 #[tokio::test]
1347 async fn test_staggered_parallel_retry_on_failure() {
1348 use std::sync::atomic::{AtomicU64, Ordering};
1349
1350 use crate::test_utils::{MemoryStorageBuilder, TestBuilder};
1351
1352 let mut builder = TestBuilder::new(
1354 MemoryStorageBuilder::default(),
1355 4,
1356 0,
1357 InMemorySigner::new(None),
1358 )
1359 .await
1360 .unwrap();
1361
1362 let nodes: Vec<_> = (0..4)
1364 .map(|i| {
1365 let node = builder.node(i);
1366 let public_key = node.name();
1367 RemoteNode { public_key, node }
1368 })
1369 .collect();
1370
1371 let staggered_delay = Duration::from_millis(100);
1372
1373 let node0_key = nodes[0].public_key;
1375 let node2_key = nodes[2].public_key;
1376
1377 let manager: Arc<RequestsScheduler<TestEnvironment>> =
1379 Arc::new(RequestsScheduler::with_config(
1380 nodes.clone(),
1381 ScoringWeights::default(),
1382 0.1,
1383 1000.0,
1384 Duration::from_secs(60),
1385 100,
1386 Duration::from_millis(MAX_REQUEST_TTL_MS),
1387 staggered_delay,
1388 ));
1389
1390 let key = test_key();
1391
1392 let call_times = Arc::new(tokio::sync::Mutex::new(Vec::new()));
1394 let start_time = Instant::now();
1395
1396 let call_count = Arc::new(AtomicU64::new(0));
1398
1399 let call_times_clone = Arc::clone(&call_times);
1400 let call_count_clone = Arc::clone(&call_count);
1401
1402 let operation = |peer: RemoteNode<<TestEnvironment as Environment>::ValidatorNode>| {
1404 let times = Arc::clone(&call_times_clone);
1405 let count = Arc::clone(&call_count_clone);
1406 let start = start_time;
1407 async move {
1408 let elapsed = Instant::now().duration_since(start);
1409 times.lock().await.push((peer.public_key, elapsed));
1410 count.fetch_add(1, Ordering::SeqCst);
1411
1412 if peer.public_key == node0_key {
1413 Err(NodeError::UnexpectedMessage)
1415 } else if peer.public_key == node2_key {
1416 tokio::time::sleep(staggered_delay / 2).await;
1418 Ok(vec![])
1419 } else {
1420 tokio::time::sleep(staggered_delay * 2).await;
1422 Err(NodeError::UnexpectedMessage)
1423 }
1424 }
1425 };
1426
1427 manager.in_flight_tracker.insert_new(key.clone()).await;
1429 for node in nodes.iter().skip(1).rev() {
1431 manager
1432 .in_flight_tracker
1433 .add_alternative_peer(&key, node.clone())
1434 .await;
1435 }
1436
1437 let result: Result<Vec<ConfirmedBlockCertificate>, NodeError> = manager
1439 .try_staggered_parallel(&key, nodes[0].clone(), &operation, staggered_delay)
1440 .await;
1441
1442 assert!(
1444 result.is_ok(),
1445 "Expected request to succeed with alternative peer"
1446 );
1447
1448 let times = call_times.lock().await;
1450 assert!(
1452 times.len() >= 2,
1453 "Should have tried at least 2 peers, got {}",
1454 times.len()
1455 );
1456
1457 assert!(
1459 times[0].1.as_millis() < 50,
1460 "First peer should be called immediately, was called at {}ms",
1461 times[0].1.as_millis()
1462 );
1463
1464 if times.len() > 1 {
1467 let delay = times[1].1.as_millis();
1468 assert!(
1469 delay < 50,
1470 "Second peer should be called immediately on first failure, got {delay}ms"
1471 );
1472 }
1473
1474 let total_time = Instant::now().duration_since(start_time).as_millis();
1479 assert!(
1480 total_time < 500,
1481 "Total time should be less than 500ms (sequential would be ~650ms), got {total_time}ms"
1482 );
1483 }
1484}