1use std::{
5 collections::BTreeMap,
6 future::Future,
7 sync::{
8 atomic::{AtomicU32, Ordering},
9 Arc,
10 },
11};
12
13use custom_debug_derive::Debug;
14use futures::stream::{FuturesUnordered, StreamExt};
15use linera_base::{
16 crypto::ValidatorPublicKey,
17 data_types::{Blob, BlobContent, BlockHeight},
18 identifiers::{BlobId, ChainId},
19 time::{Duration, Instant},
20};
21use linera_chain::types::ConfirmedBlockCertificate;
22use rand::{
23 distributions::{Distribution, WeightedIndex},
24 prelude::SliceRandom as _,
25};
26use tracing::instrument;
27
28use super::{
29 cache::{RequestsCache, SubsumingKey},
30 in_flight_tracker::{InFlightMatch, InFlightTracker},
31 node_info::NodeInfo,
32 request::{RequestKey, RequestResult},
33 scoring::ScoringWeights,
34};
35use crate::{
36 client::{
37 communicate_concurrently,
38 requests_scheduler::{in_flight_tracker::Subscribed, request::Cacheable},
39 RequestsSchedulerConfig,
40 },
41 environment::Environment,
42 node::{NodeError, ValidatorNode},
43 remote_node::RemoteNode,
44};
45
46#[cfg(with_metrics)]
47pub(super) mod metrics {
48 use std::sync::LazyLock;
49
50 use linera_base::prometheus_util::{
51 exponential_bucket_latencies, register_histogram_vec, register_int_counter,
52 register_int_counter_vec,
53 };
54 use prometheus::{HistogramVec, IntCounter, IntCounterVec};
55
56 pub(super) static VALIDATOR_RESPONSE_TIME: LazyLock<HistogramVec> = LazyLock::new(|| {
58 register_histogram_vec(
59 "requests_scheduler_response_time_ms",
60 "Response time for requests to validators in milliseconds",
61 &["validator"],
62 exponential_bucket_latencies(10000.0), )
64 });
65
66 pub(super) static VALIDATOR_REQUEST_TOTAL: LazyLock<IntCounterVec> = LazyLock::new(|| {
68 register_int_counter_vec(
69 "requests_scheduler_request_total",
70 "Total number of requests made to each validator",
71 &["validator"],
72 )
73 });
74
75 pub(super) static VALIDATOR_REQUEST_SUCCESS: LazyLock<IntCounterVec> = LazyLock::new(|| {
77 register_int_counter_vec(
78 "requests_scheduler_request_success",
79 "Number of successful requests to each validator",
80 &["validator"],
81 )
82 });
83
84 pub(super) static REQUEST_CACHE_DEDUPLICATION: LazyLock<IntCounter> = LazyLock::new(|| {
86 register_int_counter(
87 "requests_scheduler_request_deduplication_total",
88 "Number of requests that were deduplicated by finding the result in the cache.",
89 )
90 });
91
92 pub static REQUEST_CACHE_HIT: LazyLock<IntCounter> = LazyLock::new(|| {
94 register_int_counter(
95 "requests_scheduler_request_cache_hit_total",
96 "Number of requests that were served from cache",
97 )
98 });
99}
100
101#[derive(Debug, Clone)]
131pub struct RequestsScheduler<Env: Environment> {
132 nodes: Arc<tokio::sync::RwLock<BTreeMap<ValidatorPublicKey, NodeInfo<Env>>>>,
135 weights: ScoringWeights,
137 alpha: f64,
139 max_expected_latency: f64,
141 retry_delay: Duration,
143 in_flight_tracker: InFlightTracker<RemoteNode<Env::ValidatorNode>>,
145 cache: RequestsCache<RequestKey, RequestResult>,
147}
148
149impl<Env: Environment> RequestsScheduler<Env> {
150 pub fn new(
152 nodes: impl IntoIterator<Item = RemoteNode<Env::ValidatorNode>>,
153 config: RequestsSchedulerConfig,
154 ) -> Self {
155 Self::with_config(
156 nodes,
157 ScoringWeights::default(),
158 config.alpha,
159 config.max_accepted_latency_ms,
160 Duration::from_millis(config.cache_ttl_ms),
161 config.cache_max_size,
162 Duration::from_millis(config.max_request_ttl_ms),
163 Duration::from_millis(config.retry_delay_ms),
164 )
165 }
166
167 #[allow(clippy::too_many_arguments)]
180 pub fn with_config(
181 nodes: impl IntoIterator<Item = RemoteNode<Env::ValidatorNode>>,
182 weights: ScoringWeights,
183 alpha: f64,
184 max_expected_latency_ms: f64,
185 cache_ttl: Duration,
186 max_cache_size: usize,
187 max_request_ttl: Duration,
188 retry_delay: Duration,
189 ) -> Self {
190 assert!(alpha > 0.0 && alpha < 1.0, "Alpha must be in (0, 1) range");
191 Self {
192 nodes: Arc::new(tokio::sync::RwLock::new(
193 nodes
194 .into_iter()
195 .map(|node| {
196 (
197 node.public_key,
198 NodeInfo::with_config(node, weights, alpha, max_expected_latency_ms),
199 )
200 })
201 .collect(),
202 )),
203 weights,
204 alpha,
205 max_expected_latency: max_expected_latency_ms,
206 retry_delay,
207 in_flight_tracker: InFlightTracker::new(max_request_ttl),
208 cache: RequestsCache::new(cache_ttl, max_cache_size),
209 }
210 }
211
212 #[allow(unused)]
243 async fn with_best<R, F, Fut>(&self, key: RequestKey, operation: F) -> Result<R, NodeError>
244 where
245 R: Cacheable + Clone + Send + 'static,
246 F: Fn(RemoteNode<Env::ValidatorNode>) -> Fut,
247 Fut: Future<Output = Result<R, NodeError>> + 'static,
248 {
249 let peer = self
251 .select_best_peer()
252 .await
253 .ok_or_else(|| NodeError::WorkerError {
254 error: "No validators available".to_string(),
255 })?;
256 self.with_peer(key, peer, operation).await
257 }
258
259 async fn with_peer<R, F, Fut>(
278 &self,
279 key: RequestKey,
280 peer: RemoteNode<Env::ValidatorNode>,
281 operation: F,
282 ) -> Result<R, NodeError>
283 where
284 R: Cacheable + Clone + Send + 'static,
285 F: Fn(RemoteNode<Env::ValidatorNode>) -> Fut,
286 Fut: Future<Output = Result<R, NodeError>> + 'static,
287 {
288 self.add_peer(peer.clone()).await;
289 self.in_flight_tracker
290 .add_alternative_peer(&key, peer.clone())
291 .await;
292
293 let nodes = self.nodes.clone();
295 self.deduplicated_request(key, peer, move |peer| {
296 let fut = operation(peer.clone());
297 let nodes = nodes.clone();
298 async move { Self::track_request(nodes, peer, fut).await }
299 })
300 .await
301 }
302
303 #[instrument(level = "trace", skip_all)]
304 async fn download_blob(
305 &self,
306 peers: &[RemoteNode<Env::ValidatorNode>],
307 blob_id: BlobId,
308 timeout: Duration,
309 ) -> Result<Option<Blob>, NodeError> {
310 let key = RequestKey::Blob(blob_id);
311 let mut peers = peers.to_vec();
312 peers.shuffle(&mut rand::thread_rng());
313 communicate_concurrently(
314 &peers,
315 async move |peer| {
316 self.with_peer(key, peer, move |peer| async move {
317 peer.download_blob(blob_id).await
318 })
319 .await
320 },
321 |errors| errors.last().cloned().unwrap(),
322 timeout,
323 )
324 .await
325 .map_err(|(_validator, error)| error)
326 }
327
328 #[instrument(level = "trace", skip_all)]
332 pub async fn download_blobs(
333 &self,
334 peers: &[RemoteNode<Env::ValidatorNode>],
335 blob_ids: &[BlobId],
336 timeout: Duration,
337 ) -> Result<Option<Vec<Blob>>, NodeError> {
338 let mut stream = blob_ids
339 .iter()
340 .map(|blob_id| self.download_blob(peers, *blob_id, timeout))
341 .collect::<FuturesUnordered<_>>();
342
343 let mut blobs = Vec::new();
344 while let Some(maybe_blob) = stream.next().await {
345 blobs.push(maybe_blob?);
346 }
347 Ok(blobs.into_iter().collect::<Option<Vec<_>>>())
348 }
349
350 pub async fn download_certificates(
351 &self,
352 peer: &RemoteNode<Env::ValidatorNode>,
353 chain_id: ChainId,
354 start: BlockHeight,
355 limit: u64,
356 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
357 let heights = (start.0..start.0 + limit)
358 .map(BlockHeight)
359 .collect::<Vec<_>>();
360 self.with_peer(
361 RequestKey::Certificates {
362 chain_id,
363 heights: heights.clone(),
364 },
365 peer.clone(),
366 move |peer| {
367 let heights = heights.clone();
368 async move {
369 Box::pin(peer.download_certificates_by_heights(chain_id, heights)).await
370 }
371 },
372 )
373 .await
374 }
375
376 pub async fn download_certificates_by_heights(
377 &self,
378 peer: &RemoteNode<Env::ValidatorNode>,
379 chain_id: ChainId,
380 heights: Vec<BlockHeight>,
381 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
382 self.with_peer(
383 RequestKey::Certificates {
384 chain_id,
385 heights: heights.clone(),
386 },
387 peer.clone(),
388 move |peer| {
389 let heights = heights.clone();
390 async move {
391 peer.download_certificates_by_heights(chain_id, heights)
392 .await
393 }
394 },
395 )
396 .await
397 }
398
399 pub async fn download_certificate_for_blob(
400 &self,
401 peer: &RemoteNode<Env::ValidatorNode>,
402 blob_id: BlobId,
403 ) -> Result<ConfirmedBlockCertificate, NodeError> {
404 self.with_peer(
405 RequestKey::CertificateForBlob(blob_id),
406 peer.clone(),
407 move |peer| async move { peer.download_certificate_for_blob(blob_id).await },
408 )
409 .await
410 }
411
412 pub async fn download_pending_blob(
413 &self,
414 peer: &RemoteNode<Env::ValidatorNode>,
415 chain_id: ChainId,
416 blob_id: BlobId,
417 ) -> Result<BlobContent, NodeError> {
418 self.with_peer(
419 RequestKey::PendingBlob { chain_id, blob_id },
420 peer.clone(),
421 move |peer| async move { peer.node.download_pending_blob(chain_id, blob_id).await },
422 )
423 .await
424 }
425
426 pub async fn get_alternative_peers(
431 &self,
432 key: &RequestKey,
433 ) -> Option<Vec<RemoteNode<Env::ValidatorNode>>> {
434 self.in_flight_tracker.get_alternative_peers(key).await
435 }
436
437 pub async fn get_node_scores(&self) -> BTreeMap<ValidatorPublicKey, (f64, f64, u64)> {
446 let nodes = self.nodes.read().await;
447 let mut result = BTreeMap::new();
448
449 for (key, info) in nodes.iter() {
450 let score = info.calculate_score().await;
451 result.insert(
452 *key,
453 (score, info.ema_success_rate(), info.total_requests()),
454 );
455 }
456
457 result
458 }
459
460 async fn track_request<T, Fut>(
474 nodes: Arc<tokio::sync::RwLock<BTreeMap<ValidatorPublicKey, NodeInfo<Env>>>>,
475 peer: RemoteNode<Env::ValidatorNode>,
476 operation: Fut,
477 ) -> Result<T, NodeError>
478 where
479 Fut: Future<Output = Result<T, NodeError>> + 'static,
480 {
481 let start_time = Instant::now();
482 let public_key = peer.public_key;
483
484 let result = operation.await;
486
487 let response_time_ms = start_time.elapsed().as_millis() as u64;
489 let is_success = result.is_ok();
490 {
491 let mut nodes_guard = nodes.write().await;
492 if let Some(info) = nodes_guard.get_mut(&public_key) {
493 info.update_metrics(is_success, response_time_ms);
494 let score = info.calculate_score().await;
495 tracing::trace!(
496 node = %public_key,
497 address = %info.node.node.address(),
498 success = %is_success,
499 response_time_ms = %response_time_ms,
500 score = %score,
501 total_requests = %info.total_requests(),
502 "Request completed"
503 );
504 }
505 }
506
507 #[cfg(with_metrics)]
509 {
510 let validator_name = public_key.to_string();
511 metrics::VALIDATOR_RESPONSE_TIME
512 .with_label_values(&[&validator_name])
513 .observe(response_time_ms as f64);
514 metrics::VALIDATOR_REQUEST_TOTAL
515 .with_label_values(&[&validator_name])
516 .inc();
517 if is_success {
518 metrics::VALIDATOR_REQUEST_SUCCESS
519 .with_label_values(&[&validator_name])
520 .inc();
521 }
522 }
523
524 result
525 }
526
527 async fn deduplicated_request<T, F, Fut>(
544 &self,
545 key: RequestKey,
546 peer: RemoteNode<Env::ValidatorNode>,
547 operation: F,
548 ) -> Result<T, NodeError>
549 where
550 T: Cacheable + Clone + Send + 'static,
551 F: Fn(RemoteNode<Env::ValidatorNode>) -> Fut,
552 Fut: Future<Output = Result<T, NodeError>> + 'static,
553 {
554 if let Some(result) = self.cache.get(&key).await {
556 return Ok(result);
557 }
558
559 if let Some(in_flight_match) = self.in_flight_tracker.try_subscribe(&key).await {
561 match in_flight_match {
562 InFlightMatch::Exact(Subscribed(mut receiver)) => {
563 tracing::trace!(
564 ?key,
565 "deduplicating request (exact match) - joining existing in-flight request"
566 );
567 #[cfg(with_metrics)]
568 metrics::REQUEST_CACHE_DEDUPLICATION.inc();
569 match receiver.recv().await {
571 Ok(result) => match result.as_ref().clone() {
572 Ok(res) => match T::try_from(res) {
573 Ok(converted) => {
574 tracing::trace!(
575 ?key,
576 "received result from deduplicated in-flight request"
577 );
578 return Ok(converted);
579 }
580 Err(_) => {
581 tracing::warn!(
582 ?key,
583 "failed to convert result from deduplicated in-flight request, will execute independently"
584 );
585 }
586 },
587 Err(error) => {
588 tracing::trace!(
589 ?key,
590 %error,
591 "in-flight request failed",
592 );
593 }
595 },
596 Err(_) => {
597 tracing::trace!(?key, "in-flight request sender dropped");
598 }
600 }
601 }
602 InFlightMatch::Subsuming {
603 key: subsuming_key,
604 outcome: Subscribed(mut receiver),
605 } => {
606 tracing::trace!(
607 ?key,
608 subsumed_by = ?subsuming_key,
609 "deduplicating request (subsumption) - joining larger in-flight request"
610 );
611 #[cfg(with_metrics)]
612 metrics::REQUEST_CACHE_DEDUPLICATION.inc();
613 match receiver.recv().await {
615 Ok(result) => {
616 match result.as_ref() {
617 Ok(res) => {
618 if let Some(extracted) =
619 key.try_extract_result(&subsuming_key, res)
620 {
621 tracing::trace!(
622 ?key,
623 "extracted subset result from larger in-flight request"
624 );
625 match T::try_from(extracted) {
626 Ok(converted) => return Ok(converted),
627 Err(_) => {
628 tracing::trace!(
629 ?key,
630 "failed to convert extracted result, will execute independently"
631 );
632 }
633 }
634 } else {
635 tracing::trace!(
637 ?key,
638 "failed to extract from subsuming request, will execute independently"
639 );
640 }
641 }
642 Err(error) => {
643 tracing::trace!(
644 ?key,
645 ?error,
646 "subsuming in-flight request failed",
647 );
648 }
650 }
651 }
652 Err(_) => {
653 tracing::trace!(?key, "subsuming in-flight request sender dropped");
654 }
655 }
656 }
657 }
658 };
659
660 self.in_flight_tracker.insert_new(key.clone()).await;
662
663 self.in_flight_tracker
665 .remove_alternative_peer(&key, &peer)
666 .await;
667
668 tracing::trace!(?key, ?peer, "executing staggered parallel request");
671 let result = self
672 .try_staggered_parallel(&key, peer, &operation, self.retry_delay)
673 .await;
674
675 let result_for_broadcast: Result<RequestResult, NodeError> = result.clone().map(Into::into);
676 let shared_result = Arc::new(result_for_broadcast);
677
678 self.in_flight_tracker
680 .complete_and_broadcast(&key, shared_result.clone())
681 .await;
682
683 if let Ok(success) = shared_result.as_ref() {
684 self.cache
685 .store(key.clone(), Arc::new(success.clone()))
686 .await;
687 }
688 result
689 }
690
691 async fn try_staggered_parallel<T, F, Fut>(
706 &self,
707 key: &RequestKey,
708 first_peer: RemoteNode<Env::ValidatorNode>,
709 operation: &F,
710 staggered_delay: Duration,
711 ) -> Result<T, NodeError>
712 where
713 T: 'static,
714 F: Fn(RemoteNode<Env::ValidatorNode>) -> Fut,
715 Fut: Future<Output = Result<T, NodeError>> + 'static,
716 {
717 use futures::{
718 future::{select, Either},
719 stream::{FuturesUnordered, StreamExt},
720 };
721 use linera_base::time::timer::sleep;
722
723 let mut futures: FuturesUnordered<Fut> = FuturesUnordered::new();
724 let peer_index = AtomicU32::new(0);
725
726 let push_future = |futures: &mut FuturesUnordered<Fut>, fut: Fut| {
727 futures.push(fut);
728 peer_index.fetch_add(1, Ordering::SeqCst)
729 };
730
731 push_future(&mut futures, operation(first_peer));
733
734 let mut last_error = NodeError::UnexpectedMessage;
735 let mut next_delay = Box::pin(sleep(staggered_delay * peer_index.load(Ordering::SeqCst)));
736
737 loop {
739 if futures.is_empty() {
741 if let Some(peer) = self.in_flight_tracker.pop_alternative_peer(key).await {
742 push_future(&mut futures, operation(peer));
743 next_delay =
744 Box::pin(sleep(staggered_delay * peer_index.load(Ordering::SeqCst)));
745 } else {
746 break;
748 }
749 }
750
751 let next_result = Box::pin(futures.next());
752
753 match select(next_result, next_delay).await {
754 Either::Left((Some(result), delay_fut)) => {
756 next_delay = delay_fut;
758
759 match result {
760 Ok(value) => {
761 tracing::trace!(?key, "staggered parallel request succeeded");
762 return Ok(value);
763 }
764 Err(error) => {
765 tracing::debug!(
766 ?key,
767 %error,
768 "staggered parallel request attempt failed"
769 );
770 last_error = error;
771
772 if let Some(peer) =
774 self.in_flight_tracker.pop_alternative_peer(key).await
775 {
776 push_future(&mut futures, operation(peer));
777 next_delay = Box::pin(sleep(
778 staggered_delay * peer_index.load(Ordering::SeqCst),
779 ));
780 }
781 }
782 }
783 }
784 Either::Left((None, delay_fut)) => {
786 next_delay = delay_fut;
788 continue;
790 }
791 Either::Right((_, _)) => {
793 if let Some(peer) = self.in_flight_tracker.pop_alternative_peer(key).await {
794 push_future(&mut futures, operation(peer));
795 next_delay =
796 Box::pin(sleep(staggered_delay * peer_index.load(Ordering::SeqCst)));
797 } else {
798 break;
800 }
801 }
802 }
803 }
804
805 while let Some(result) = futures.next().await {
807 match result {
808 Ok(value) => {
809 tracing::trace!(?key, "staggered parallel request succeeded");
810 return Ok(value);
811 }
812 Err(error) => {
813 tracing::debug!(
814 ?key,
815 %error,
816 "staggered parallel request attempt failed"
817 );
818 last_error = error;
819 }
820 }
821 }
822
823 tracing::debug!(?key, "all staggered parallel retry attempts failed");
825 Err(last_error)
826 }
827
828 async fn peers_by_score(&self) -> Vec<(f64, RemoteNode<Env::ValidatorNode>)> {
837 let nodes = self.nodes.read().await;
838
839 let mut scored_nodes = Vec::new();
841 for info in nodes.values() {
842 let score = info.calculate_score().await;
843 scored_nodes.push((score, info.node.clone()));
844 }
845
846 scored_nodes.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
848
849 scored_nodes
850 }
851
852 async fn select_best_peer(&self) -> Option<RemoteNode<Env::ValidatorNode>> {
864 let scored_nodes = self.peers_by_score().await;
865
866 if scored_nodes.is_empty() {
867 return None;
868 }
869
870 let top_count = scored_nodes.len().min(3);
872 let top_nodes = &scored_nodes[..top_count];
873
874 let weights: Vec<f64> = top_nodes.iter().map(|(score, _)| score.max(0.01)).collect();
877
878 if let Ok(dist) = WeightedIndex::new(&weights) {
879 let mut rng = rand::thread_rng();
880 let index = dist.sample(&mut rng);
881 Some(top_nodes[index].1.clone())
882 } else {
883 tracing::warn!("failed to create weighted distribution, defaulting to best node");
885 Some(scored_nodes[0].1.clone())
886 }
887 }
888
889 async fn add_peer(&self, node: RemoteNode<Env::ValidatorNode>) {
891 let mut nodes = self.nodes.write().await;
892 let public_key = node.public_key;
893 nodes.entry(public_key).or_insert_with(|| {
894 NodeInfo::with_config(node, self.weights, self.alpha, self.max_expected_latency)
895 });
896 }
897}
898
899#[cfg(test)]
900mod tests {
901 use std::sync::{
902 atomic::{AtomicUsize, Ordering},
903 Arc,
904 };
905
906 use linera_base::{
907 crypto::{CryptoHash, InMemorySigner},
908 data_types::BlockHeight,
909 identifiers::ChainId,
910 time::Duration,
911 };
912 use linera_chain::types::ConfirmedBlockCertificate;
913 use tokio::sync::oneshot;
914
915 use super::{super::request::RequestKey, *};
916 use crate::{
917 client::requests_scheduler::{MAX_REQUEST_TTL_MS, STAGGERED_DELAY_MS},
918 node::NodeError,
919 };
920
921 type TestEnvironment = crate::environment::Test;
922
923 fn create_test_manager(
925 in_flight_timeout: Duration,
926 cache_ttl: Duration,
927 ) -> Arc<RequestsScheduler<TestEnvironment>> {
928 let mut manager = RequestsScheduler::with_config(
929 vec![], ScoringWeights::default(),
931 0.1,
932 1000.0,
933 cache_ttl,
934 100,
935 in_flight_timeout,
936 Duration::from_millis(STAGGERED_DELAY_MS),
937 );
938 manager.in_flight_tracker = InFlightTracker::new(in_flight_timeout);
940 Arc::new(manager)
941 }
942
943 fn test_result_ok() -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
945 Ok(vec![])
946 }
947
948 fn test_key() -> RequestKey {
950 RequestKey::Certificates {
951 chain_id: ChainId(CryptoHash::test_hash("test")),
952 heights: vec![BlockHeight(0), BlockHeight(1)],
953 }
954 }
955
956 fn dummy_peer() -> RemoteNode<<TestEnvironment as Environment>::ValidatorNode> {
958 use crate::test_utils::{MemoryStorageBuilder, TestBuilder};
959
960 let mut builder = futures::executor::block_on(async {
962 TestBuilder::new(
963 MemoryStorageBuilder::default(),
964 1,
965 0,
966 linera_base::crypto::InMemorySigner::new(None),
967 )
968 .await
969 .unwrap()
970 });
971
972 let node = builder.node(0);
973 let public_key = node.name();
974 RemoteNode { public_key, node }
975 }
976
977 #[tokio::test]
978 async fn test_cache_hit_returns_cached_result() {
979 let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60));
981 let key = test_key();
982 let peer = dummy_peer();
983
984 let execution_count = Arc::new(AtomicUsize::new(0));
986 let execution_count_clone = execution_count.clone();
987
988 let result1: Result<Vec<ConfirmedBlockCertificate>, NodeError> = manager
990 .deduplicated_request(key.clone(), peer.clone(), |_| {
991 let count = execution_count_clone.clone();
992 async move {
993 count.fetch_add(1, Ordering::SeqCst);
994 test_result_ok()
995 }
996 })
997 .await;
998
999 assert!(result1.is_ok());
1000 assert_eq!(execution_count.load(Ordering::SeqCst), 1);
1001
1002 let execution_count_clone2 = execution_count.clone();
1004 let result2: Result<Vec<ConfirmedBlockCertificate>, NodeError> = manager
1005 .deduplicated_request(key.clone(), peer.clone(), |_| {
1006 let count = execution_count_clone2.clone();
1007 async move {
1008 count.fetch_add(1, Ordering::SeqCst);
1009 test_result_ok()
1010 }
1011 })
1012 .await;
1013
1014 assert_eq!(result1, result2);
1015 assert_eq!(execution_count.load(Ordering::SeqCst), 1);
1017 }
1018
1019 #[tokio::test]
1020 async fn test_in_flight_request_deduplication() {
1021 let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60));
1022 let key = test_key();
1023 let peer = dummy_peer();
1024
1025 let execution_count = Arc::new(AtomicUsize::new(0));
1027
1028 let (tx, rx) = oneshot::channel();
1030 let rx = Arc::new(tokio::sync::Mutex::new(Some(rx)));
1031
1032 let manager_clone = Arc::clone(&manager);
1034 let key_clone = key.clone();
1035 let execution_count_clone = execution_count.clone();
1036 let rx_clone = Arc::clone(&rx);
1037 let peer_clone = peer.clone();
1038 let first_request = tokio::spawn(async move {
1039 manager_clone
1040 .deduplicated_request(key_clone, peer_clone, |_| {
1041 let count = execution_count_clone.clone();
1042 let rx = Arc::clone(&rx_clone);
1043 async move {
1044 count.fetch_add(1, Ordering::SeqCst);
1045 if let Some(receiver) = rx.lock().await.take() {
1047 receiver.await.unwrap();
1048 }
1049 test_result_ok()
1050 }
1051 })
1052 .await
1053 });
1054
1055 let execution_count_clone2 = execution_count.clone();
1057 let second_request = tokio::spawn(async move {
1058 manager
1059 .deduplicated_request(key, peer, |_| {
1060 let count = execution_count_clone2.clone();
1061 async move {
1062 count.fetch_add(1, Ordering::SeqCst);
1063 test_result_ok()
1064 }
1065 })
1066 .await
1067 });
1068
1069 tx.send(()).unwrap();
1071
1072 let result1: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1074 first_request.await.unwrap();
1075 let result2: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1076 second_request.await.unwrap();
1077
1078 assert!(result1.is_ok());
1079 assert_eq!(result1, result2);
1080
1081 assert_eq!(execution_count.load(Ordering::SeqCst), 1);
1083 }
1084
1085 #[tokio::test]
1086 async fn test_multiple_subscribers_all_notified() {
1087 let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60));
1088 let key = test_key();
1089 let peer = dummy_peer();
1090
1091 let execution_count = Arc::new(AtomicUsize::new(0));
1093
1094 let (tx, rx) = oneshot::channel();
1096 let rx = Arc::new(tokio::sync::Mutex::new(Some(rx)));
1097
1098 let manager_clone1 = Arc::clone(&manager);
1100 let key_clone1 = key.clone();
1101 let execution_count_clone = execution_count.clone();
1102 let rx_clone = Arc::clone(&rx);
1103 let peer_clone = peer.clone();
1104 let first_request = tokio::spawn(async move {
1105 manager_clone1
1106 .deduplicated_request(key_clone1, peer_clone, |_| {
1107 let count = execution_count_clone.clone();
1108 let rx = Arc::clone(&rx_clone);
1109 async move {
1110 count.fetch_add(1, Ordering::SeqCst);
1111 if let Some(receiver) = rx.lock().await.take() {
1112 receiver.await.unwrap();
1113 }
1114 test_result_ok()
1115 }
1116 })
1117 .await
1118 });
1119
1120 let mut handles = vec![];
1122 for _ in 0..5 {
1123 let manager_clone = Arc::clone(&manager);
1124 let key_clone = key.clone();
1125 let execution_count_clone = execution_count.clone();
1126 let peer_clone = peer.clone();
1127 let handle = tokio::spawn(async move {
1128 manager_clone
1129 .deduplicated_request(key_clone, peer_clone, |_| {
1130 let count = execution_count_clone.clone();
1131 async move {
1132 count.fetch_add(1, Ordering::SeqCst);
1133 test_result_ok()
1134 }
1135 })
1136 .await
1137 });
1138 handles.push(handle);
1139 }
1140
1141 tx.send(()).unwrap();
1143
1144 let result: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1146 first_request.await.unwrap();
1147 assert!(result.is_ok());
1148
1149 for handle in handles {
1151 assert_eq!(handle.await.unwrap(), result);
1152 }
1153
1154 assert_eq!(execution_count.load(Ordering::SeqCst), 1);
1156 }
1157
1158 #[tokio::test]
1159 async fn test_timeout_triggers_new_request() {
1160 let manager = create_test_manager(Duration::from_millis(50), Duration::from_secs(60));
1162
1163 let key = test_key();
1164 let peer = dummy_peer();
1165
1166 let execution_count = Arc::new(AtomicUsize::new(0));
1168
1169 let (tx, rx) = oneshot::channel();
1171 let rx = Arc::new(tokio::sync::Mutex::new(Some(rx)));
1172
1173 let manager_clone = Arc::clone(&manager);
1175 let key_clone = key.clone();
1176 let execution_count_clone = execution_count.clone();
1177 let rx_clone = Arc::clone(&rx);
1178 let peer_clone = peer.clone();
1179 let first_request = tokio::spawn(async move {
1180 manager_clone
1181 .deduplicated_request(key_clone, peer_clone, |_| {
1182 let count = execution_count_clone.clone();
1183 let rx = Arc::clone(&rx_clone);
1184 async move {
1185 count.fetch_add(1, Ordering::SeqCst);
1186 if let Some(receiver) = rx.lock().await.take() {
1187 receiver.await.unwrap();
1188 }
1189 test_result_ok()
1190 }
1191 })
1192 .await
1193 });
1194
1195 tokio::time::sleep(Duration::from_millis(MAX_REQUEST_TTL_MS + 1)).await;
1197
1198 let execution_count_clone2 = execution_count.clone();
1200 let second_request = tokio::spawn(async move {
1201 manager
1202 .deduplicated_request(key, peer, |_| {
1203 let count = execution_count_clone2.clone();
1204 async move {
1205 count.fetch_add(1, Ordering::SeqCst);
1206 test_result_ok()
1207 }
1208 })
1209 .await
1210 });
1211
1212 let result2: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1214 second_request.await.unwrap();
1215 assert!(result2.is_ok());
1216
1217 tx.send(()).unwrap();
1219 let result1: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
1220 first_request.await.unwrap();
1221 assert!(result1.is_ok());
1222
1223 assert_eq!(execution_count.load(Ordering::SeqCst), 2);
1225 }
1226
1227 #[tokio::test]
1228 async fn test_alternative_peers_registered_on_deduplication() {
1229 use linera_base::identifiers::BlobType;
1230
1231 use crate::test_utils::{MemoryStorageBuilder, TestBuilder};
1232
1233 let mut builder = TestBuilder::new(
1235 MemoryStorageBuilder::default(),
1236 3,
1237 0,
1238 InMemorySigner::new(None),
1239 )
1240 .await
1241 .unwrap();
1242
1243 let nodes: Vec<_> = (0..3)
1245 .map(|i| {
1246 let node = builder.node(i);
1247 let public_key = node.name();
1248 RemoteNode { public_key, node }
1249 })
1250 .collect();
1251
1252 let manager: Arc<RequestsScheduler<TestEnvironment>> =
1254 Arc::new(RequestsScheduler::with_config(
1255 nodes.clone(),
1256 ScoringWeights::default(),
1257 0.1,
1258 1000.0,
1259 Duration::from_secs(60),
1260 100,
1261 Duration::from_millis(MAX_REQUEST_TTL_MS),
1262 Duration::from_millis(STAGGERED_DELAY_MS),
1263 ));
1264
1265 let key = RequestKey::Blob(BlobId::new(
1266 CryptoHash::test_hash("test_blob"),
1267 BlobType::Data,
1268 ));
1269
1270 let (tx, rx) = oneshot::channel();
1272 let rx = Arc::new(tokio::sync::Mutex::new(Some(rx)));
1273
1274 let manager_clone = Arc::clone(&manager);
1276 let node_clone = nodes[0].clone();
1277 let key_clone = key.clone();
1278 let rx_clone = Arc::clone(&rx);
1279 let first_request = tokio::spawn(async move {
1280 manager_clone
1281 .with_peer(key_clone, node_clone, move |_peer| {
1282 let rx = Arc::clone(&rx_clone);
1283 async move {
1284 if let Some(receiver) = rx.lock().await.take() {
1286 receiver.await.unwrap();
1287 }
1288 Ok(None) }
1290 })
1291 .await
1292 });
1293
1294 tokio::time::sleep(Duration::from_millis(100)).await;
1296
1297 let handles: Vec<_> = vec![nodes[1].clone(), nodes[2].clone()]
1300 .into_iter()
1301 .map(|node| {
1302 let manager_clone = Arc::clone(&manager);
1303 let key_clone = key.clone();
1304 tokio::spawn(async move {
1305 manager_clone
1306 .with_peer(key_clone, node, |_peer| async move {
1307 Ok(None) })
1309 .await
1310 })
1311 })
1312 .collect();
1313
1314 tokio::time::sleep(Duration::from_millis(100)).await;
1316
1317 tx.send(()).unwrap();
1325
1326 let _result1 = first_request.await.unwrap();
1328 for handle in handles {
1329 let _ = handle.await.unwrap();
1330 }
1331
1332 tokio::time::sleep(Duration::from_millis(50)).await;
1334 let alt_peers = manager.get_alternative_peers(&key).await;
1335 assert!(
1336 alt_peers.is_none(),
1337 "Expected in-flight entry to be removed after completion"
1338 );
1339 }
1340
1341 #[tokio::test]
1342 async fn test_staggered_parallel_retry_on_failure() {
1343 use std::sync::atomic::{AtomicU64, Ordering};
1344
1345 use crate::test_utils::{MemoryStorageBuilder, TestBuilder};
1346
1347 let mut builder = TestBuilder::new(
1349 MemoryStorageBuilder::default(),
1350 4,
1351 0,
1352 InMemorySigner::new(None),
1353 )
1354 .await
1355 .unwrap();
1356
1357 let nodes: Vec<_> = (0..4)
1359 .map(|i| {
1360 let node = builder.node(i);
1361 let public_key = node.name();
1362 RemoteNode { public_key, node }
1363 })
1364 .collect();
1365
1366 let staggered_delay = Duration::from_millis(10);
1367
1368 let node0_key = nodes[0].public_key;
1370 let node2_key = nodes[2].public_key;
1371
1372 let manager: Arc<RequestsScheduler<TestEnvironment>> =
1374 Arc::new(RequestsScheduler::with_config(
1375 nodes.clone(),
1376 ScoringWeights::default(),
1377 0.1,
1378 1000.0,
1379 Duration::from_secs(60),
1380 100,
1381 Duration::from_millis(MAX_REQUEST_TTL_MS),
1382 staggered_delay,
1383 ));
1384
1385 let key = test_key();
1386
1387 let call_times = Arc::new(tokio::sync::Mutex::new(Vec::new()));
1389 let start_time = Instant::now();
1390
1391 let call_count = Arc::new(AtomicU64::new(0));
1393
1394 let call_times_clone = Arc::clone(&call_times);
1395 let call_count_clone = Arc::clone(&call_count);
1396
1397 let operation = |peer: RemoteNode<<TestEnvironment as Environment>::ValidatorNode>| {
1399 let times = Arc::clone(&call_times_clone);
1400 let count = Arc::clone(&call_count_clone);
1401 let start = start_time;
1402 async move {
1403 let elapsed = Instant::now().duration_since(start);
1404 times.lock().await.push((peer.public_key, elapsed));
1405 count.fetch_add(1, Ordering::SeqCst);
1406
1407 if peer.public_key == node0_key {
1408 Err(NodeError::UnexpectedMessage)
1410 } else if peer.public_key == node2_key {
1411 tokio::time::sleep(staggered_delay / 2).await;
1413 Ok(vec![])
1414 } else {
1415 tokio::time::sleep(staggered_delay * 2).await;
1417 Err(NodeError::UnexpectedMessage)
1418 }
1419 }
1420 };
1421
1422 manager.in_flight_tracker.insert_new(key.clone()).await;
1424 for node in nodes.iter().skip(1).rev() {
1426 manager
1427 .in_flight_tracker
1428 .add_alternative_peer(&key, node.clone())
1429 .await;
1430 }
1431
1432 let result: Result<Vec<ConfirmedBlockCertificate>, NodeError> = manager
1434 .try_staggered_parallel(&key, nodes[0].clone(), &operation, staggered_delay)
1435 .await;
1436
1437 assert!(
1439 result.is_ok(),
1440 "Expected request to succeed with alternative peer"
1441 );
1442
1443 let times = call_times.lock().await;
1445 assert!(
1447 times.len() >= 2,
1448 "Should have tried at least 2 peers, got {}",
1449 times.len()
1450 );
1451
1452 assert!(
1454 times[0].1.as_millis() < 10,
1455 "First peer should be called immediately, was called at {}ms",
1456 times[0].1.as_millis()
1457 );
1458
1459 if times.len() > 1 {
1462 let delay = times[1].1.as_millis();
1463 assert!(
1464 delay < 10,
1465 "Second peer should be called immediately on first failure, got {}ms",
1466 delay
1467 );
1468 }
1469
1470 let total_time = Instant::now().duration_since(start_time).as_millis();
1475 assert!(
1476 total_time < 50,
1477 "Total time should be less than 50ms, got {}ms",
1478 total_time
1479 );
1480 }
1481}