1use std::{cmp::Ordering, collections::BTreeMap, future::Future, sync::Arc};
5
6use custom_debug_derive::Debug;
7use futures::stream::{FuturesUnordered, StreamExt};
8use linera_base::{
9 crypto::ValidatorPublicKey,
10 data_types::{Blob, BlobContent, BlockHeight},
11 identifiers::{BlobId, ChainId},
12 time::{Duration, Instant},
13};
14use linera_chain::types::ConfirmedBlockCertificate;
15use rand::{
16 distributions::{Distribution, WeightedIndex},
17 prelude::SliceRandom as _,
18};
19use tracing::instrument;
20
21use super::{
22 cache::{RequestsCache, SubsumingKey},
23 in_flight_tracker::{InFlightMatch, InFlightTracker},
24 node_info::NodeInfo,
25 request::{RequestKey, RequestResult},
26 scoring::ScoringWeights,
27};
28use crate::{
29 client::{
30 communicate_concurrently,
31 requests_scheduler::{in_flight_tracker::Subscribed, request::Cacheable},
32 RequestsSchedulerConfig,
33 },
34 environment::Environment,
35 node::{NodeError, ValidatorNode},
36 remote_node::RemoteNode,
37};
38
39#[cfg(with_metrics)]
40pub(super) mod metrics {
41 use std::sync::LazyLock;
42
43 use linera_base::prometheus_util::{
44 exponential_bucket_latencies, register_histogram_vec, register_int_counter,
45 register_int_counter_vec,
46 };
47 use prometheus::{HistogramVec, IntCounter, IntCounterVec};
48
49 pub(super) static VALIDATOR_RESPONSE_TIME: LazyLock<HistogramVec> = LazyLock::new(|| {
51 register_histogram_vec(
52 "requests_scheduler_response_time_ms",
53 "Response time for requests to validators in milliseconds",
54 &["validator"],
55 exponential_bucket_latencies(10000.0), )
57 });
58
59 pub(super) static VALIDATOR_REQUEST_TOTAL: LazyLock<IntCounterVec> = LazyLock::new(|| {
61 register_int_counter_vec(
62 "requests_scheduler_request_total",
63 "Total number of requests made to each validator",
64 &["validator"],
65 )
66 });
67
68 pub(super) static VALIDATOR_REQUEST_SUCCESS: LazyLock<IntCounterVec> = LazyLock::new(|| {
70 register_int_counter_vec(
71 "requests_scheduler_request_success",
72 "Number of successful requests to each validator",
73 &["validator"],
74 )
75 });
76
77 pub(super) static REQUEST_CACHE_DEDUPLICATION: LazyLock<IntCounter> = LazyLock::new(|| {
79 register_int_counter(
80 "requests_scheduler_request_deduplication_total",
81 "Number of requests that were deduplicated by finding the result in the cache.",
82 )
83 });
84
85 pub static REQUEST_CACHE_HIT: LazyLock<IntCounter> = LazyLock::new(|| {
87 register_int_counter(
88 "requests_scheduler_request_cache_hit_total",
89 "Number of requests that were served from cache",
90 )
91 });
92}
93
94#[derive(Debug, Clone)]
124pub struct RequestsScheduler<Env: Environment> {
125 nodes: Arc<tokio::sync::RwLock<BTreeMap<ValidatorPublicKey, NodeInfo<Env>>>>,
128 weights: ScoringWeights,
130 alpha: f64,
132 max_expected_latency: f64,
134 in_flight_tracker: InFlightTracker<RemoteNode<Env::ValidatorNode>>,
136 cache: RequestsCache<RequestKey, RequestResult>,
138}
139
140impl<Env: Environment> RequestsScheduler<Env> {
141 pub fn new(
143 nodes: impl IntoIterator<Item = RemoteNode<Env::ValidatorNode>>,
144 config: RequestsSchedulerConfig,
145 ) -> Self {
146 Self::with_config(
147 nodes,
148 ScoringWeights::default(),
149 config.alpha,
150 config.max_accepted_latency_ms,
151 Duration::from_millis(config.cache_ttl_ms),
152 config.cache_max_size,
153 Duration::from_millis(config.max_request_ttl_ms),
154 )
155 }
156
157 pub fn with_config(
169 nodes: impl IntoIterator<Item = RemoteNode<Env::ValidatorNode>>,
170 weights: ScoringWeights,
171 alpha: f64,
172 max_expected_latency_ms: f64,
173 cache_ttl: Duration,
174 max_cache_size: usize,
175 max_request_ttl: Duration,
176 ) -> Self {
177 assert!(alpha > 0.0 && alpha < 1.0, "Alpha must be in (0, 1) range");
178 Self {
179 nodes: Arc::new(tokio::sync::RwLock::new(
180 nodes
181 .into_iter()
182 .map(|node| {
183 (
184 node.public_key,
185 NodeInfo::with_config(node, weights, alpha, max_expected_latency_ms),
186 )
187 })
188 .collect(),
189 )),
190 weights,
191 alpha,
192 max_expected_latency: max_expected_latency_ms,
193 in_flight_tracker: InFlightTracker::new(max_request_ttl),
194 cache: RequestsCache::new(cache_ttl, max_cache_size),
195 }
196 }
197
198 #[allow(unused)]
229 async fn with_best<R, F, Fut>(&self, key: RequestKey, operation: F) -> Result<R, NodeError>
230 where
231 R: Cacheable + Clone + Send + 'static,
232 F: FnOnce(RemoteNode<Env::ValidatorNode>) -> Fut,
233 Fut: Future<Output = Result<R, NodeError>>,
234 {
235 let peer = self
237 .select_best_peer()
238 .await
239 .ok_or_else(|| NodeError::WorkerError {
240 error: "No validators available".to_string(),
241 })?;
242 self.with_peer(key, peer, operation).await
243 }
244
245 async fn with_peer<R, F, Fut>(
264 &self,
265 key: RequestKey,
266 peer: RemoteNode<Env::ValidatorNode>,
267 operation: F,
268 ) -> Result<R, NodeError>
269 where
270 R: Cacheable + Clone + Send + 'static,
271 F: FnOnce(RemoteNode<Env::ValidatorNode>) -> Fut,
272 Fut: Future<Output = Result<R, NodeError>>,
273 {
274 self.add_peer(peer.clone()).await;
275 self.in_flight_tracker
276 .add_alternative_peer(&key, peer.clone())
277 .await;
278 self.deduplicated_request(key, peer, |peer| async {
279 self.track_request(peer, operation).await
280 })
281 .await
282 }
283
284 #[instrument(level = "trace", skip_all)]
285 async fn download_blob(
286 &self,
287 peers: &[RemoteNode<Env::ValidatorNode>],
288 blob_id: BlobId,
289 timeout: Duration,
290 ) -> Result<Option<Blob>, NodeError> {
291 let key = RequestKey::Blob(blob_id);
292 let mut peers = peers.to_vec();
293 peers.shuffle(&mut rand::thread_rng());
294 communicate_concurrently(
295 &peers,
296 async move |peer| {
297 self.with_peer(key, peer, |peer| async move {
298 peer.download_blob(blob_id).await
299 })
300 .await
301 },
302 |errors| errors.last().cloned().unwrap(),
303 timeout,
304 )
305 .await
306 .map_err(|(_validator, error)| error)
307 }
308
309 #[instrument(level = "trace", skip_all)]
313 pub async fn download_blobs(
314 &self,
315 peers: &[RemoteNode<Env::ValidatorNode>],
316 blob_ids: &[BlobId],
317 timeout: Duration,
318 ) -> Result<Option<Vec<Blob>>, NodeError> {
319 let mut stream = blob_ids
320 .iter()
321 .map(|blob_id| self.download_blob(peers, *blob_id, timeout))
322 .collect::<FuturesUnordered<_>>();
323
324 let mut blobs = Vec::new();
325 while let Some(maybe_blob) = stream.next().await {
326 blobs.push(maybe_blob?);
327 }
328 Ok(blobs.into_iter().collect::<Option<Vec<_>>>())
329 }
330
331 pub async fn download_certificates(
332 &self,
333 peer: &RemoteNode<Env::ValidatorNode>,
334 chain_id: ChainId,
335 start: BlockHeight,
336 limit: u64,
337 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
338 let heights = (start.0..start.0 + limit)
339 .map(BlockHeight)
340 .collect::<Vec<_>>();
341 self.with_peer(
342 RequestKey::Certificates {
343 chain_id,
344 heights: heights.clone(),
345 },
346 peer.clone(),
347 |peer| async move {
348 Box::pin(peer.download_certificates_by_heights(chain_id, heights)).await
349 },
350 )
351 .await
352 }
353
354 pub async fn download_certificates_by_heights(
355 &self,
356 peer: &RemoteNode<Env::ValidatorNode>,
357 chain_id: ChainId,
358 heights: Vec<BlockHeight>,
359 ) -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
360 self.with_peer(
361 RequestKey::Certificates {
362 chain_id,
363 heights: heights.clone(),
364 },
365 peer.clone(),
366 |peer| async move {
367 peer.download_certificates_by_heights(chain_id, heights)
368 .await
369 },
370 )
371 .await
372 }
373
374 pub async fn download_certificate_for_blob(
375 &self,
376 peer: &RemoteNode<Env::ValidatorNode>,
377 blob_id: BlobId,
378 ) -> Result<ConfirmedBlockCertificate, NodeError> {
379 self.with_peer(
380 RequestKey::CertificateForBlob(blob_id),
381 peer.clone(),
382 |peer| async move { peer.download_certificate_for_blob(blob_id).await },
383 )
384 .await
385 }
386
387 pub async fn download_pending_blob(
388 &self,
389 peer: &RemoteNode<Env::ValidatorNode>,
390 chain_id: ChainId,
391 blob_id: BlobId,
392 ) -> Result<BlobContent, NodeError> {
393 self.with_peer(
394 RequestKey::PendingBlob { chain_id, blob_id },
395 peer.clone(),
396 |peer| async move { peer.node.download_pending_blob(chain_id, blob_id).await },
397 )
398 .await
399 }
400
401 pub async fn get_alternative_peers(
406 &self,
407 key: &RequestKey,
408 ) -> Option<Vec<RemoteNode<Env::ValidatorNode>>> {
409 self.in_flight_tracker.get_alternative_peers(key).await
410 }
411
412 pub async fn get_node_scores(&self) -> BTreeMap<ValidatorPublicKey, (f64, f64, u64)> {
421 let nodes = self.nodes.read().await;
422 let mut result = BTreeMap::new();
423
424 for (key, info) in nodes.iter() {
425 let score = info.calculate_score().await;
426 result.insert(
427 *key,
428 (score, info.ema_success_rate(), info.total_requests()),
429 );
430 }
431
432 result
433 }
434
435 async fn track_request<T, F, Fut>(
453 &self,
454 peer: RemoteNode<Env::ValidatorNode>,
455 operation: F,
456 ) -> Result<T, NodeError>
457 where
458 F: FnOnce(RemoteNode<Env::ValidatorNode>) -> Fut,
459 Fut: Future<Output = Result<T, NodeError>>,
460 {
461 let start_time = Instant::now();
462 let public_key = peer.public_key;
463
464 let result = operation(peer).await;
466
467 let response_time_ms = start_time.elapsed().as_millis() as u64;
469 let is_success = result.is_ok();
470 {
471 let mut nodes = self.nodes.write().await;
472 if let Some(info) = nodes.get_mut(&public_key) {
473 info.update_metrics(is_success, response_time_ms);
474 let score = info.calculate_score().await;
475 tracing::trace!(
476 node = %public_key,
477 address = %info.node.node.address(),
478 success = %is_success,
479 response_time_ms = %response_time_ms,
480 score = %score,
481 total_requests = %info.total_requests(),
482 "Request completed"
483 );
484 }
485 }
486
487 #[cfg(with_metrics)]
489 {
490 let validator_name = public_key.to_string();
491 metrics::VALIDATOR_RESPONSE_TIME
492 .with_label_values(&[&validator_name])
493 .observe(response_time_ms as f64);
494 metrics::VALIDATOR_REQUEST_TOTAL
495 .with_label_values(&[&validator_name])
496 .inc();
497 if is_success {
498 metrics::VALIDATOR_REQUEST_SUCCESS
499 .with_label_values(&[&validator_name])
500 .inc();
501 }
502 }
503
504 result
505 }
506
507 async fn deduplicated_request<T, F, Fut, N>(
524 &self,
525 key: RequestKey,
526 peer: N,
527 operation: F,
528 ) -> Result<T, NodeError>
529 where
530 T: Cacheable + Clone + Send + 'static,
531 F: FnOnce(N) -> Fut,
532 Fut: Future<Output = Result<T, NodeError>>,
533 {
534 if let Some(result) = self.cache.get(&key).await {
536 return Ok(result);
537 }
538
539 if let Some(in_flight_match) = self.in_flight_tracker.try_subscribe(&key).await {
541 match in_flight_match {
542 InFlightMatch::Exact(Subscribed(mut receiver)) => {
543 tracing::trace!(
544 key = ?key,
545 "deduplicating request (exact match) - joining existing in-flight request"
546 );
547 #[cfg(with_metrics)]
548 metrics::REQUEST_CACHE_DEDUPLICATION.inc();
549 match receiver.recv().await {
551 Ok(result) => match result.as_ref().clone() {
552 Ok(res) => match T::try_from(res) {
553 Ok(converted) => {
554 tracing::trace!(
555 key = ?key,
556 "received result from deduplicated in-flight request"
557 );
558 return Ok(converted);
559 }
560 Err(_) => {
561 tracing::trace!(
562 key = ?key,
563 "failed to convert result from deduplicated in-flight request, will execute independently"
564 );
565 }
566 },
567 Err(e) => {
568 tracing::trace!(
569 key = ?key,
570 error = %e,
571 "in-flight request failed",
572 );
573 }
575 },
576 Err(_) => {
577 tracing::trace!(
578 key = ?key,
579 "in-flight request sender dropped"
580 );
581 }
583 }
584 }
585 InFlightMatch::Subsuming {
586 key: subsuming_key,
587 outcome: Subscribed(mut receiver),
588 } => {
589 tracing::trace!(
590 key = ?key,
591 subsumed_by = ?subsuming_key,
592 "deduplicating request (subsumption) - joining larger in-flight request"
593 );
594 #[cfg(with_metrics)]
595 metrics::REQUEST_CACHE_DEDUPLICATION.inc();
596 match receiver.recv().await {
598 Ok(result) => {
599 match result.as_ref() {
600 Ok(res) => {
601 if let Some(extracted) =
602 key.try_extract_result(&subsuming_key, res)
603 {
604 tracing::trace!(
605 key = ?key,
606 "extracted subset result from larger in-flight request"
607 );
608 match T::try_from(extracted) {
609 Ok(converted) => return Ok(converted),
610 Err(_) => {
611 tracing::trace!(
612 key = ?key,
613 "failed to convert extracted result, will execute independently"
614 );
615 }
616 }
617 } else {
618 tracing::trace!(
620 key = ?key,
621 "failed to extract from subsuming request, will execute independently"
622 );
623 }
624 }
625 Err(e) => {
626 tracing::trace!(
627 key = ?key,
628 error = %e,
629 "subsuming in-flight request failed",
630 );
631 }
633 }
634 }
635 Err(_) => {
636 tracing::trace!(
637 key = ?key,
638 "subsuming in-flight request sender dropped"
639 );
640 }
641 }
642 }
643 }
644 };
645
646 self.in_flight_tracker.insert_new(key.clone()).await;
648
649 tracing::trace!(key = ?key, "executing new request");
651 let result = operation(peer).await;
652 let result_for_broadcast: Result<RequestResult, NodeError> = result.clone().map(Into::into);
653 let shared_result = Arc::new(result_for_broadcast);
654
655 self.in_flight_tracker
657 .complete_and_broadcast(&key, shared_result.clone())
658 .await;
659
660 if let Ok(success) = shared_result.as_ref() {
661 self.cache
662 .store(key.clone(), Arc::new(success.clone()))
663 .await;
664 }
665 result
666 }
667
668 async fn peers_by_score(&self) -> Vec<(f64, RemoteNode<Env::ValidatorNode>)> {
677 let nodes = self.nodes.read().await;
678
679 let mut scored_nodes = Vec::new();
681 for info in nodes.values() {
682 let score = info.calculate_score().await;
683 scored_nodes.push((score, info.node.clone()));
684 }
685
686 scored_nodes.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(Ordering::Equal));
688
689 scored_nodes
690 }
691
692 async fn select_best_peer(&self) -> Option<RemoteNode<Env::ValidatorNode>> {
704 let scored_nodes = self.peers_by_score().await;
705
706 if scored_nodes.is_empty() {
707 return None;
708 }
709
710 let top_count = scored_nodes.len().min(3);
712 let top_nodes = &scored_nodes[..top_count];
713
714 let weights: Vec<f64> = top_nodes.iter().map(|(score, _)| score.max(0.01)).collect();
717
718 if let Ok(dist) = WeightedIndex::new(&weights) {
719 let mut rng = rand::thread_rng();
720 let index = dist.sample(&mut rng);
721 Some(top_nodes[index].1.clone())
722 } else {
723 tracing::warn!("failed to create weighted distribution, defaulting to best node");
725 Some(scored_nodes[0].1.clone())
726 }
727 }
728
729 async fn add_peer(&self, node: RemoteNode<Env::ValidatorNode>) {
731 let mut nodes = self.nodes.write().await;
732 let public_key = node.public_key;
733 nodes.entry(public_key).or_insert_with(|| {
734 NodeInfo::with_config(node, self.weights, self.alpha, self.max_expected_latency)
735 });
736 }
737}
738
739#[cfg(test)]
740mod tests {
741 use std::sync::{
742 atomic::{AtomicUsize, Ordering},
743 Arc,
744 };
745
746 use linera_base::{
747 crypto::{CryptoHash, InMemorySigner},
748 data_types::BlockHeight,
749 identifiers::ChainId,
750 time::Duration,
751 };
752 use linera_chain::types::ConfirmedBlockCertificate;
753 use tokio::sync::oneshot;
754
755 use super::{super::request::RequestKey, *};
756 use crate::{client::requests_scheduler::MAX_REQUEST_TTL_MS, node::NodeError};
757
758 type TestEnvironment = crate::environment::Test;
759
760 fn create_test_manager(
762 in_flight_timeout: Duration,
763 cache_ttl: Duration,
764 ) -> Arc<RequestsScheduler<TestEnvironment>> {
765 let mut manager = RequestsScheduler::with_config(
766 vec![], ScoringWeights::default(),
768 0.1,
769 1000.0,
770 cache_ttl,
771 100,
772 in_flight_timeout,
773 );
774 manager.in_flight_tracker = InFlightTracker::new(in_flight_timeout);
776 Arc::new(manager)
777 }
778
779 fn test_result_ok() -> Result<Vec<ConfirmedBlockCertificate>, NodeError> {
781 Ok(vec![])
782 }
783
784 fn test_key() -> RequestKey {
786 RequestKey::Certificates {
787 chain_id: ChainId(CryptoHash::test_hash("test")),
788 heights: vec![BlockHeight(0), BlockHeight(1)],
789 }
790 }
791
792 #[tokio::test]
793 async fn test_cache_hit_returns_cached_result() {
794 let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60));
796 let key = test_key();
797
798 let execution_count = Arc::new(AtomicUsize::new(0));
800 let execution_count_clone = execution_count.clone();
801
802 let result1: Result<Vec<ConfirmedBlockCertificate>, NodeError> = manager
804 .deduplicated_request(key.clone(), (), |_| async move {
805 execution_count_clone.fetch_add(1, Ordering::SeqCst);
806 test_result_ok()
807 })
808 .await;
809
810 assert!(result1.is_ok());
811 assert_eq!(execution_count.load(Ordering::SeqCst), 1);
812
813 let execution_count_clone2 = execution_count.clone();
815 let result2: Result<Vec<ConfirmedBlockCertificate>, NodeError> = manager
816 .deduplicated_request(key.clone(), (), |_| async move {
817 execution_count_clone2.fetch_add(1, Ordering::SeqCst);
818 test_result_ok()
819 })
820 .await;
821
822 assert_eq!(result1, result2);
823 assert_eq!(execution_count.load(Ordering::SeqCst), 1);
825 }
826
827 #[tokio::test]
828 async fn test_in_flight_request_deduplication() {
829 let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60));
830 let key = test_key();
831
832 let execution_count = Arc::new(AtomicUsize::new(0));
834
835 let (tx, rx) = oneshot::channel();
837
838 let manager_clone = Arc::clone(&manager);
840 let key_clone = key.clone();
841 let execution_count_clone = execution_count.clone();
842 let first_request = tokio::spawn(async move {
843 manager_clone
844 .deduplicated_request(key_clone, (), |_| async move {
845 execution_count_clone.fetch_add(1, Ordering::SeqCst);
846 rx.await.unwrap();
848 test_result_ok()
849 })
850 .await
851 });
852
853 let execution_count_clone2 = execution_count.clone();
855 let second_request = tokio::spawn(async move {
856 manager
857 .deduplicated_request(key, (), |_| async move {
858 execution_count_clone2.fetch_add(1, Ordering::SeqCst);
859 test_result_ok()
860 })
861 .await
862 });
863
864 tx.send(()).unwrap();
866
867 let result1: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
869 first_request.await.unwrap();
870 let result2: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
871 second_request.await.unwrap();
872
873 assert!(result1.is_ok());
874 assert_eq!(result1, result2);
875
876 assert_eq!(execution_count.load(Ordering::SeqCst), 1);
878 }
879
880 #[tokio::test]
881 async fn test_multiple_subscribers_all_notified() {
882 let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60));
883 let key = test_key();
884
885 let execution_count = Arc::new(AtomicUsize::new(0));
887
888 let (tx, rx) = oneshot::channel();
890
891 let manager_clone1 = Arc::clone(&manager);
893 let key_clone1 = key.clone();
894 let execution_count_clone = execution_count.clone();
895 let first_request = tokio::spawn(async move {
896 manager_clone1
897 .deduplicated_request(key_clone1, (), |_| async move {
898 execution_count_clone.fetch_add(1, Ordering::SeqCst);
899 rx.await.unwrap();
900 test_result_ok()
901 })
902 .await
903 });
904
905 let mut handles = vec![];
907 for _ in 0..5 {
908 let manager_clone = Arc::clone(&manager);
909 let key_clone = key.clone();
910 let execution_count_clone = execution_count.clone();
911 let handle = tokio::spawn(async move {
912 manager_clone
913 .deduplicated_request(key_clone, (), |_| async move {
914 execution_count_clone.fetch_add(1, Ordering::SeqCst);
915 test_result_ok()
916 })
917 .await
918 });
919 handles.push(handle);
920 }
921
922 tx.send(()).unwrap();
924
925 let result: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
927 first_request.await.unwrap();
928 assert!(result.is_ok());
929
930 for handle in handles {
932 assert_eq!(handle.await.unwrap(), result);
933 }
934
935 assert_eq!(execution_count.load(Ordering::SeqCst), 1);
937 }
938
939 #[tokio::test]
940 async fn test_timeout_triggers_new_request() {
941 let manager = create_test_manager(Duration::from_millis(50), Duration::from_secs(60));
943
944 let key = test_key();
945
946 let execution_count = Arc::new(AtomicUsize::new(0));
948
949 let (tx, rx) = oneshot::channel();
951
952 let manager_clone = Arc::clone(&manager);
954 let key_clone = key.clone();
955 let execution_count_clone = execution_count.clone();
956 let first_request = tokio::spawn(async move {
957 manager_clone
958 .deduplicated_request(key_clone, (), |_| async move {
959 execution_count_clone.fetch_add(1, Ordering::SeqCst);
960 rx.await.unwrap();
961 test_result_ok()
962 })
963 .await
964 });
965
966 tokio::time::sleep(Duration::from_millis(MAX_REQUEST_TTL_MS + 1)).await;
968
969 let execution_count_clone2 = execution_count.clone();
971 let second_request = tokio::spawn(async move {
972 manager
973 .deduplicated_request(key, (), |_| async move {
974 execution_count_clone2.fetch_add(1, Ordering::SeqCst);
975 test_result_ok()
976 })
977 .await
978 });
979
980 let result2: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
982 second_request.await.unwrap();
983 assert!(result2.is_ok());
984
985 tx.send(()).unwrap();
987 let result1: Result<Vec<ConfirmedBlockCertificate>, NodeError> =
988 first_request.await.unwrap();
989 assert!(result1.is_ok());
990
991 assert_eq!(execution_count.load(Ordering::SeqCst), 2);
993 }
994
995 #[tokio::test]
996 async fn test_alternative_peers_registered_on_deduplication() {
997 use linera_base::identifiers::BlobType;
998
999 use crate::test_utils::{MemoryStorageBuilder, TestBuilder};
1000
1001 let mut builder = TestBuilder::new(
1003 MemoryStorageBuilder::default(),
1004 3,
1005 0,
1006 InMemorySigner::new(None),
1007 )
1008 .await
1009 .unwrap();
1010
1011 let nodes: Vec<_> = (0..3)
1013 .map(|i| {
1014 let node = builder.node(i);
1015 let public_key = node.name();
1016 RemoteNode { public_key, node }
1017 })
1018 .collect();
1019
1020 let manager: Arc<RequestsScheduler<TestEnvironment>> =
1022 Arc::new(RequestsScheduler::with_config(
1023 nodes.clone(),
1024 ScoringWeights::default(),
1025 0.1,
1026 1000.0,
1027 Duration::from_secs(60),
1028 100,
1029 Duration::from_millis(MAX_REQUEST_TTL_MS),
1030 ));
1031
1032 let key = RequestKey::Blob(BlobId::new(
1033 CryptoHash::test_hash("test_blob"),
1034 BlobType::Data,
1035 ));
1036
1037 let (tx, rx) = oneshot::channel();
1039
1040 let manager_clone = Arc::clone(&manager);
1042 let node_clone = nodes[0].clone();
1043 let key_clone = key.clone();
1044 let first_request = tokio::spawn(async move {
1045 manager_clone
1046 .with_peer(key_clone, node_clone, |_peer| async move {
1047 rx.await.unwrap();
1049 Ok(None) })
1051 .await
1052 });
1053
1054 tokio::time::sleep(Duration::from_millis(100)).await;
1056
1057 let handles: Vec<_> = vec![nodes[1].clone(), nodes[2].clone()]
1060 .into_iter()
1061 .map(|node| {
1062 let manager_clone = Arc::clone(&manager);
1063 let key_clone = key.clone();
1064 tokio::spawn(async move {
1065 manager_clone
1066 .with_peer(key_clone, node, |_peer| async move {
1067 Ok(None) })
1069 .await
1070 })
1071 })
1072 .collect();
1073
1074 tokio::time::sleep(Duration::from_millis(100)).await;
1076
1077 let alt_peers = manager
1079 .get_alternative_peers(&key)
1080 .await
1081 .expect("in-flight entry")
1082 .into_iter()
1083 .map(|p| p.public_key)
1084 .collect::<Vec<_>>();
1085 assert_eq!(
1086 alt_peers,
1087 vec![nodes[1].public_key, nodes[2].public_key],
1088 "expected nodes 2 and 3 to be registered as alternative peers"
1089 );
1090
1091 tx.send(()).unwrap();
1093
1094 let _result1 = first_request.await.unwrap();
1096 for handle in handles {
1097 let _ = handle.await.unwrap();
1098 }
1099
1100 tokio::time::sleep(Duration::from_millis(50)).await;
1102 let alt_peers = manager.get_alternative_peers(&key).await;
1103 assert!(
1104 alt_peers.is_none(),
1105 "Expected in-flight entry to be removed after completion"
1106 );
1107 }
1108}