scylla/policies/load_balancing/
default.rs

1use self::latency_awareness::LatencyAwareness;
2pub use self::latency_awareness::LatencyAwarenessBuilder;
3
4use super::{FallbackPlan, LoadBalancingPolicy, NodeRef, RoutingInfo};
5use crate::cluster::ClusterState;
6use crate::{
7    cluster::metadata::Strategy,
8    cluster::node::Node,
9    errors::RequestAttemptError,
10    routing::locator::ReplicaSet,
11    routing::{Shard, Token},
12};
13use itertools::{Either, Itertools};
14use rand::{prelude::SliceRandom, rng, Rng};
15use rand_pcg::Pcg32;
16use scylla_cql::frame::response::result::TableSpec;
17use std::hash::{Hash, Hasher};
18use std::{fmt, sync::Arc, time::Duration};
19use tracing::{debug, warn};
20use uuid::Uuid;
21
22#[derive(Clone, Copy)]
23enum NodeLocationCriteria<'a> {
24    Any,
25    Datacenter(&'a str),
26    DatacenterAndRack(&'a str, &'a str),
27}
28
29impl<'a> NodeLocationCriteria<'a> {
30    fn datacenter(&self) -> Option<&'a str> {
31        match self {
32            Self::Any => None,
33            Self::Datacenter(dc) | Self::DatacenterAndRack(dc, _) => Some(dc),
34        }
35    }
36}
37
38#[derive(Debug, Clone)]
39enum NodeLocationPreference {
40    Any,
41    Datacenter(String),
42    DatacenterAndRack(String, String),
43}
44
45impl NodeLocationPreference {
46    fn datacenter(&self) -> Option<&str> {
47        match self {
48            Self::Any => None,
49            Self::Datacenter(dc) | Self::DatacenterAndRack(dc, _) => Some(dc),
50        }
51    }
52
53    #[allow(unused)]
54    fn rack(&self) -> Option<&str> {
55        match self {
56            Self::Any | Self::Datacenter(_) => None,
57            Self::DatacenterAndRack(_, rack) => Some(rack),
58        }
59    }
60}
61
62/// An ordering requirement for replicas.
63#[derive(Clone, Copy)]
64enum ReplicaOrder {
65    /// No requirement. Replicas can be returned in arbitrary order.
66    Arbitrary,
67
68    /// A requirement for the order to be deterministic, not only across statement executions
69    /// but also across drivers. This is used for LWT optimisation, to avoid Paxos conflicts.
70    Deterministic,
71}
72
73/// Statement kind, used to enable specific load balancing patterns for certain cases.
74///
75/// Currently, there is a distinguished case of LWT statements, which should always be routed
76/// to replicas in a deterministic order to avoid Paxos conflicts. Other statements
77/// are routed to random replicas to balance the load.
78#[derive(Clone, Copy)]
79enum StatementType {
80    /// The statement is a confirmed LWT. It's to be routed specifically.
81    Lwt,
82
83    /// The statement is not a confirmed LWT. It's to be routed in a default way.
84    NonLwt,
85}
86
87/// A result of `pick_replica`.
88enum PickedReplica<'a> {
89    /// A replica that could be computed cheaply.
90    Computed((NodeRef<'a>, Shard)),
91
92    /// A replica that could not be computed cheaply. `pick` should therefore return None
93    /// and `fallback` will then return that replica as the first in the iterator.
94    ToBeComputedInFallback,
95}
96
97/// The default load balancing policy.
98///
99/// It can be configured to be datacenter-aware, rack-aware and token-aware.
100/// Datacenter failover (sending query to a node from a remote datacenter)
101/// for queries with non local consistency mode is also supported.
102///
103/// Latency awareness is available, although **not recommended**:
104/// the penalisation mechanism it involves may interact badly with other
105/// mechanisms, such as LWT optimisation. Also, the very tactics of penalising
106/// nodes for recently measures latencies is believed to not be very stable
107/// and beneficial. The number of in-flight requests, for instance, seems
108/// to be a better metric showing how (over)loaded a target node/shard is.
109/// For now, however, we don't have an implementation of the
110/// in-flight-requests-aware policy.
111#[allow(clippy::type_complexity)]
112pub struct DefaultPolicy {
113    /// Preferences regarding node location. One of: rack and DC, DC, or no preference.
114    preferences: NodeLocationPreference,
115
116    /// Configures whether the policy takes token into consideration when creating plans.
117    /// If this is set to `true` AND token, keyspace and table are available,
118    /// then policy prefers replicas and puts them earlier in the query plan.
119    is_token_aware: bool,
120
121    /// Whether to permit remote nodes (those not located in the preferred DC) in plans.
122    /// If no preferred DC is set, this has no effect.
123    permit_dc_failover: bool,
124
125    /// A predicate that a target (node + shard) must satisfy in order to be picked.
126    /// This was introduced to make latency awareness cleaner.
127    /// - if latency awareness is disabled, then `pick_predicate` is just `Self::is_alive()`;
128    /// - if latency awareness is enabled, then it is `Self::is_alive() && latency_predicate()`,
129    ///   which checks that the target is not penalised due to high latencies.
130    pick_predicate: Box<dyn Fn(NodeRef<'_>, Option<Shard>) -> bool + Send + Sync>,
131
132    /// Additional layer that penalises targets that are too slow compared to others
133    /// in terms of latency. It works in the following way:
134    /// - for `pick`, it uses `latency_predicate` to filter out penalised nodes,
135    ///   so that a penalised node will never be `pick`ed;
136    /// - for `fallback`, it wraps the returned iterator, moving all penalised nodes
137    ///   to the end, in a stable way.
138    ///
139    /// Penalisation is done based on collected and updated latencies.
140    latency_awareness: Option<LatencyAwareness>,
141
142    /// The policy chooses (in `pick`) and shuffles (in `fallback`) replicas and nodes
143    /// based on random number generator. For sake of deterministic testing,
144    /// a fixed seed can be used.
145    fixed_seed: Option<u64>,
146}
147
148impl fmt::Debug for DefaultPolicy {
149    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
150        f.debug_struct("DefaultPolicy")
151            .field("preferences", &self.preferences)
152            .field("is_token_aware", &self.is_token_aware)
153            .field("permit_dc_failover", &self.permit_dc_failover)
154            .field("latency_awareness", &self.latency_awareness)
155            .field("fixed_seed", &self.fixed_seed)
156            .finish_non_exhaustive()
157    }
158}
159
160impl LoadBalancingPolicy for DefaultPolicy {
161    fn pick<'a>(
162        &'a self,
163        query: &'a RoutingInfo,
164        cluster: &'a ClusterState,
165    ) -> Option<(NodeRef<'a>, Option<Shard>)> {
166        /* For prepared statements, token-aware logic is available, we know what are the replicas
167         * for the statement, so that we can pick one of them. */
168        let routing_info = self.routing_info(query, cluster);
169
170        if let Some(ref token_with_strategy) = routing_info.token_with_strategy {
171            if self.preferences.datacenter().is_some()
172                && !self.permit_dc_failover
173                && matches!(
174                    token_with_strategy.strategy,
175                    Strategy::SimpleStrategy { .. }
176                )
177            {
178                warn!("\
179Combining SimpleStrategy with preferred_datacenter set to Some and disabled datacenter failover may lead to empty query plans for some tokens.\
180It is better to give up using one of them: either operate in a keyspace with NetworkTopologyStrategy, which explicitly states\
181how many replicas there are in each datacenter (you probably want at least 1 to avoid empty plans while preferring that datacenter), \
182or refrain from preferring datacenters (which may ban all other datacenters, if datacenter failover happens to be not possible)."
183                );
184            }
185        }
186
187        /* LWT statements need to be routed differently: always to the same replica, to avoid Paxos contention. */
188        let statement_type = if query.is_confirmed_lwt {
189            StatementType::Lwt
190        } else {
191            StatementType::NonLwt
192        };
193
194        /* Token-aware logic - if routing info is available, we know what are the replicas
195         * for the statement. Try to pick one of them. */
196        if let (Some(ts), Some(table_spec)) = (&routing_info.token_with_strategy, query.table) {
197            if let NodeLocationPreference::DatacenterAndRack(dc, rack) = &self.preferences {
198                // Try to pick some alive local rack random replica.
199                let local_rack_picked = self.pick_replica(
200                    ts,
201                    NodeLocationCriteria::DatacenterAndRack(dc, rack),
202                    |node, shard| (self.pick_predicate)(node, Some(shard)),
203                    cluster,
204                    statement_type,
205                    table_spec,
206                );
207
208                if let Some(picked) = local_rack_picked {
209                    return match picked {
210                        PickedReplica::Computed((alive_local_rack_replica, shard)) => {
211                            Some((alive_local_rack_replica, Some(shard)))
212                        }
213                        // Let call to fallback() compute the replica, because it requires allocation.
214                        PickedReplica::ToBeComputedInFallback => None,
215                    };
216                }
217            }
218
219            if let NodeLocationPreference::DatacenterAndRack(dc, _)
220            | NodeLocationPreference::Datacenter(dc) = &self.preferences
221            {
222                // Try to pick some alive local random replica.
223                let picked = self.pick_replica(
224                    ts,
225                    NodeLocationCriteria::Datacenter(dc),
226                    |node, shard| (self.pick_predicate)(node, Some(shard)),
227                    cluster,
228                    statement_type,
229                    table_spec,
230                );
231
232                if let Some(picked) = picked {
233                    return match picked {
234                        PickedReplica::Computed((alive_local_replica, shard)) => {
235                            Some((alive_local_replica, Some(shard)))
236                        }
237                        // Let call to fallback() compute the replica, because it requires allocation.
238                        PickedReplica::ToBeComputedInFallback => None,
239                    };
240                }
241            }
242
243            // If preferred datacenter is not specified, or if datacenter failover is possible, loosen restriction about locality.
244            if self.preferences.datacenter().is_none() || self.is_datacenter_failover_possible() {
245                // Try to pick some alive random replica.
246                let picked = self.pick_replica(
247                    ts,
248                    NodeLocationCriteria::Any,
249                    |node, shard| (self.pick_predicate)(node, Some(shard)),
250                    cluster,
251                    statement_type,
252                    table_spec,
253                );
254                if let Some(picked) = picked {
255                    return match picked {
256                        PickedReplica::Computed((alive_remote_replica, shard)) => {
257                            Some((alive_remote_replica, Some(shard)))
258                        }
259                        // Let call to fallback() compute the replica, because it requires allocation.
260                        PickedReplica::ToBeComputedInFallback => None,
261                    };
262                }
263            }
264        };
265
266        /* Token-unaware logic - if routing info is not available (e.g. for unprepared statements),
267         * or no replica was suitable for targeting it (e.g. disabled or down), try to choose
268         * a random node, not necessarily a replica. */
269
270        /* We start having not alive nodes filtered out. This is done by `pick_predicate`,
271         * which always contains `Self::is_alive()`. */
272
273        // Let's start with local nodes, i.e. those in the preferred datacenter.
274        // If there was no preferred datacenter specified, all nodes are treated as local.
275        let local_nodes = self.preferred_node_set(cluster);
276
277        if let NodeLocationPreference::DatacenterAndRack(dc, rack) = &self.preferences {
278            // Try to pick some alive random local rack node.
279            let rack_predicate = Self::make_rack_predicate(
280                |node| (self.pick_predicate)(node, None),
281                NodeLocationCriteria::DatacenterAndRack(dc, rack),
282            );
283            let local_rack_node_picked = self.pick_node(local_nodes, rack_predicate);
284
285            if let Some(alive_local_rack_node) = local_rack_node_picked {
286                return Some((alive_local_rack_node, None));
287            }
288        }
289
290        // Try to pick some alive random local node.
291        let local_node_picked =
292            self.pick_node(local_nodes, |node| (self.pick_predicate)(node, None));
293        if let Some(alive_local_node) = local_node_picked {
294            return Some((alive_local_node, None));
295        }
296
297        let all_nodes = cluster.replica_locator().unique_nodes_in_global_ring();
298        // If a datacenter failover is possible, loosen restriction about locality.
299        if self.is_datacenter_failover_possible() {
300            let maybe_remote_node_picked =
301                self.pick_node(all_nodes, |node| (self.pick_predicate)(node, None));
302            if let Some(alive_maybe_remote_node) = maybe_remote_node_picked {
303                return Some((alive_maybe_remote_node, None));
304            }
305        }
306
307        /* As we are here, we failed to pick any alive node. Now let's consider even down nodes. */
308
309        // Previous checks imply that every node we could have selected is down.
310        // Let's try to return a down node that wasn't disabled.
311        let maybe_down_local_node_picked = self.pick_node(local_nodes, |node| node.is_enabled());
312        if let Some(down_but_enabled_local_node) = maybe_down_local_node_picked {
313            return Some((down_but_enabled_local_node, None));
314        }
315
316        // If a datacenter failover is possible, loosen restriction about locality.
317        if self.is_datacenter_failover_possible() {
318            let maybe_down_maybe_remote_node_picked =
319                self.pick_node(all_nodes, |node| node.is_enabled());
320            if let Some(down_but_enabled_maybe_remote_node) = maybe_down_maybe_remote_node_picked {
321                return Some((down_but_enabled_maybe_remote_node, None));
322            }
323        }
324
325        // Every node is disabled. This could be due to a bad host filter - configuration error.
326        // It makes no sense to return disabled nodes (there are no open connections to them anyway),
327        // so let's return None. `fallback()` will return empty iterator, and so the whole plan
328        // will be empty.
329        None
330    }
331
332    fn fallback<'a>(
333        &'a self,
334        query: &'a RoutingInfo,
335        cluster: &'a ClusterState,
336    ) -> FallbackPlan<'a> {
337        /* For prepared statements, token-aware logic is available, we know what are the replicas
338         * for the statement, so that we can pick one of them. */
339        let routing_info = self.routing_info(query, cluster);
340
341        /* LWT statements need to be routed differently: always to the same replica, to avoid Paxos contention. */
342        let statement_type = if query.is_confirmed_lwt {
343            StatementType::Lwt
344        } else {
345            StatementType::NonLwt
346        };
347
348        /* Token-aware logic - if routing info is available, we know what are the replicas for the statement.
349         * Get a list of alive replicas:
350         * - shuffled list in case of non-LWTs,
351         * - deterministically ordered in case of LWTs. */
352        let maybe_replicas = if let (Some(ts), Some(table_spec)) =
353            (&routing_info.token_with_strategy, query.table)
354        {
355            // Iterator over alive local rack replicas (shuffled or deterministically ordered,
356            // depending on the statement being LWT or not).
357            let maybe_local_rack_replicas =
358                if let NodeLocationPreference::DatacenterAndRack(dc, rack) = &self.preferences {
359                    let local_rack_replicas = self.maybe_shuffled_replicas(
360                        ts,
361                        NodeLocationCriteria::DatacenterAndRack(dc, rack),
362                        |node, shard| Self::is_alive(node, Some(shard)),
363                        cluster,
364                        statement_type,
365                        table_spec,
366                    );
367                    Either::Left(local_rack_replicas)
368                } else {
369                    Either::Right(std::iter::empty())
370                };
371
372            // Iterator over alive local datacenter replicas (shuffled or deterministically ordered,
373            // depending on the statement being LWT or not).
374            let maybe_local_replicas = if let NodeLocationPreference::DatacenterAndRack(dc, _)
375            | NodeLocationPreference::Datacenter(dc) =
376                &self.preferences
377            {
378                let local_replicas = self.maybe_shuffled_replicas(
379                    ts,
380                    NodeLocationCriteria::Datacenter(dc),
381                    |node, shard| Self::is_alive(node, Some(shard)),
382                    cluster,
383                    statement_type,
384                    table_spec,
385                );
386                Either::Left(local_replicas)
387            } else {
388                Either::Right(std::iter::empty())
389            };
390
391            // If no datacenter is preferred, or datacenter failover is possible, loosen restriction about locality.
392            let maybe_remote_replicas = if self.preferences.datacenter().is_none()
393                || self.is_datacenter_failover_possible()
394            {
395                // Iterator over alive replicas (shuffled or deterministically ordered,
396                // depending on the statement being LWT or not).
397                let remote_replicas = self.maybe_shuffled_replicas(
398                    ts,
399                    NodeLocationCriteria::Any,
400                    |node, shard| Self::is_alive(node, Some(shard)),
401                    cluster,
402                    statement_type,
403                    table_spec,
404                );
405                Either::Left(remote_replicas)
406            } else {
407                Either::Right(std::iter::empty())
408            };
409
410            // Produce an iterator, prioritizing local replicas.
411            // If preferred datacenter is not specified, every replica is treated as a remote one.
412            Either::Left(
413                maybe_local_rack_replicas
414                    .chain(maybe_local_replicas)
415                    .chain(maybe_remote_replicas)
416                    .map(|(node, shard)| (node, Some(shard))),
417            )
418        } else {
419            Either::Right(std::iter::empty::<(NodeRef<'a>, Option<Shard>)>())
420        };
421
422        /* Token-unaware logic - if routing info is not available (e.g. for unprepared statements),
423         * or no replica is suitable for targeting it (e.g. disabled or down), try targetting nodes
424         * that are not necessarily replicas. */
425
426        /* We start having not alive nodes filtered out. */
427
428        // All nodes in the local datacenter (if one is given).
429        let local_nodes = self.preferred_node_set(cluster);
430
431        let robinned_local_rack_nodes =
432            if let NodeLocationPreference::DatacenterAndRack(dc, rack) = &self.preferences {
433                let rack_predicate = Self::make_rack_predicate(
434                    |node| Self::is_alive(node, None),
435                    NodeLocationCriteria::DatacenterAndRack(dc, rack),
436                );
437                Either::Left(
438                    self.round_robin_nodes(local_nodes, rack_predicate)
439                        .map(|node| (node, None)),
440                )
441            } else {
442                Either::Right(std::iter::empty::<(NodeRef<'a>, Option<Shard>)>())
443            };
444
445        let robinned_local_nodes = self
446            .round_robin_nodes(local_nodes, |node| Self::is_alive(node, None))
447            .map(|node| (node, None));
448
449        // All nodes in the cluster.
450        let all_nodes = cluster.replica_locator().unique_nodes_in_global_ring();
451
452        // If a datacenter failover is possible, loosen restriction about locality.
453        let maybe_remote_nodes = if self.is_datacenter_failover_possible() {
454            let robinned_all_nodes =
455                self.round_robin_nodes(all_nodes, |node| Self::is_alive(node, None));
456
457            Either::Left(robinned_all_nodes.map(|node| (node, None)))
458        } else {
459            Either::Right(std::iter::empty::<(NodeRef<'a>, Option<Shard>)>())
460        };
461
462        // Even if we consider some enabled nodes to be down, we should try contacting them in the last resort.
463        let maybe_down_local_nodes = local_nodes
464            .iter()
465            .filter(|node| node.is_enabled())
466            .map(|node| (node, None));
467
468        // If a datacenter failover is possible, loosen restriction about locality.
469        let maybe_down_nodes = if self.is_datacenter_failover_possible() {
470            Either::Left(
471                all_nodes
472                    .iter()
473                    .filter(|node| node.is_enabled())
474                    .map(|node| (node, None)),
475            )
476        } else {
477            Either::Right(std::iter::empty())
478        };
479
480        /// *Plan* should return unique elements. It is not however obvious what it means,
481        /// because some nodes in the plan may have shards and some may not.
482        ///
483        /// This helper structure defines equality of plan elements.
484        /// How the comparison works:
485        /// - If at least one of elements is shard-less, then compare just nodes.
486        /// - If both elements have shards, then compare both nodes and shards.
487        ///
488        /// Why is it implemented this way?
489        /// Driver should not attempt to send a request to the same destination twice.
490        /// If a plan element doesn't have shard specified, then a random shard will be
491        /// chosen by the driver. If the plan also contains the same node but with
492        /// a shard present, and we randomly choose the same shard for the shard-less element,
493        /// then we have duplication.
494        ///
495        /// Example: plan is `[(Node1, Some(1)), (Node1, None)]` - if the driver uses
496        /// the second element and randomly chooses shard 1, then we have duplication.
497        ///
498        /// On the other hand, if a plan has a duplicate node, but with different shards,
499        /// then we want to use both elements - so we can't just make the list unique by node,
500        /// and so this struct was created.
501        struct DefaultPolicyTargetComparator {
502            host_id: Uuid,
503            shard: Option<Shard>,
504        }
505
506        impl PartialEq for DefaultPolicyTargetComparator {
507            fn eq(&self, other: &Self) -> bool {
508                match (self.shard, other.shard) {
509                    (_, None) | (None, _) => self.host_id.eq(&other.host_id),
510                    (Some(shard_left), Some(shard_right)) => {
511                        self.host_id.eq(&other.host_id) && shard_left.eq(&shard_right)
512                    }
513                }
514            }
515        }
516
517        impl Eq for DefaultPolicyTargetComparator {}
518
519        impl Hash for DefaultPolicyTargetComparator {
520            fn hash<H: Hasher>(&self, state: &mut H) {
521                self.host_id.hash(state);
522            }
523        }
524
525        // Construct a fallback plan as a composition of:
526        // - local rack alive replicas,
527        // - local datacenter alive replicas (or all alive replicas is no DC is preferred),
528        // - remote alive replicas (if DC failover is enabled),
529        // - local rack alive nodes,
530        // - local datacenter alive nodes (or all alive nodes is no DC is preferred),
531        // - remote alive nodes (if DC failover is enabled),
532        // - local datacenter nodes,
533        // - remote nodes (if DC failover is enabled).
534        let plan = maybe_replicas
535            .chain(robinned_local_rack_nodes)
536            .chain(robinned_local_nodes)
537            .chain(maybe_remote_nodes)
538            .chain(maybe_down_local_nodes)
539            .chain(maybe_down_nodes)
540            .unique_by(|(node, shard)| DefaultPolicyTargetComparator {
541                host_id: node.host_id,
542                shard: *shard,
543            });
544
545        // If latency awareness is enabled, wrap the plan by applying latency penalisation:
546        // all penalised nodes are moved behind non-penalised nodes, in a stable fashion.
547        if let Some(latency_awareness) = self.latency_awareness.as_ref() {
548            Box::new(latency_awareness.wrap(plan))
549        } else {
550            Box::new(plan)
551        }
552    }
553
554    fn name(&self) -> String {
555        "DefaultPolicy".to_string()
556    }
557
558    fn on_request_success(
559        &self,
560        _routing_info: &RoutingInfo,
561        latency: Duration,
562        node: NodeRef<'_>,
563    ) {
564        if let Some(latency_awareness) = self.latency_awareness.as_ref() {
565            latency_awareness.report_request(node, latency);
566        }
567    }
568
569    fn on_request_failure(
570        &self,
571        _routing_info: &RoutingInfo,
572        latency: Duration,
573        node: NodeRef<'_>,
574        error: &RequestAttemptError,
575    ) {
576        if let Some(latency_awareness) = self.latency_awareness.as_ref() {
577            if LatencyAwareness::reliable_latency_measure(error) {
578                latency_awareness.report_request(node, latency);
579            }
580        }
581    }
582}
583
584impl DefaultPolicy {
585    /// Creates a builder used to customise configuration of a new DefaultPolicy.
586    pub fn builder() -> DefaultPolicyBuilder {
587        DefaultPolicyBuilder::new()
588    }
589
590    /// Returns the given routing info processed based on given cluster state.
591    fn routing_info<'a>(
592        &'a self,
593        query: &'a RoutingInfo,
594        cluster: &'a ClusterState,
595    ) -> ProcessedRoutingInfo<'a> {
596        let mut routing_info = ProcessedRoutingInfo::new(query, cluster);
597
598        if !self.is_token_aware {
599            routing_info.token_with_strategy = None;
600        }
601
602        routing_info
603    }
604
605    /// Returns all nodes in the local datacenter if one is given,
606    /// or else all nodes in the cluster.
607    fn preferred_node_set<'a>(&'a self, cluster: &'a ClusterState) -> &'a [Arc<Node>] {
608        if let Some(preferred_datacenter) = self.preferences.datacenter() {
609            if let Some(nodes) = cluster
610                .replica_locator()
611                .unique_nodes_in_datacenter_ring(preferred_datacenter)
612            {
613                nodes
614            } else {
615                tracing::warn!(
616                    "Datacenter specified as the preferred one ({}) does not exist!",
617                    preferred_datacenter
618                );
619                // We won't guess any DC, as it could lead to possible violation of dc failover ban.
620                &[]
621            }
622        } else {
623            cluster.replica_locator().unique_nodes_in_global_ring()
624        }
625    }
626
627    /// Returns a full replica set for given datacenter (if given, else for all DCs),
628    /// cluster state and table spec.
629    fn nonfiltered_replica_set<'a>(
630        &'a self,
631        ts: &TokenWithStrategy<'a>,
632        replica_location: NodeLocationCriteria<'a>,
633        cluster: &'a ClusterState,
634        table_spec: &TableSpec,
635    ) -> ReplicaSet<'a> {
636        let datacenter = replica_location.datacenter();
637
638        cluster
639            .replica_locator()
640            .replicas_for_token(ts.token, ts.strategy, datacenter, table_spec)
641    }
642
643    /// Wraps the provided predicate, adding the requirement for rack to match.
644    fn make_rack_predicate<'a>(
645        predicate: impl Fn(NodeRef<'a>) -> bool + 'a,
646        replica_location: NodeLocationCriteria<'a>,
647    ) -> impl Fn(NodeRef<'a>) -> bool {
648        move |node| match replica_location {
649            NodeLocationCriteria::Any | NodeLocationCriteria::Datacenter(_) => predicate(node),
650            NodeLocationCriteria::DatacenterAndRack(_, rack) => {
651                predicate(node) && node.rack.as_deref() == Some(rack)
652            }
653        }
654    }
655
656    /// Wraps the provided predicate, adding the requirement for rack to match.
657    fn make_sharded_rack_predicate<'a>(
658        predicate: impl Fn(NodeRef<'a>, Shard) -> bool + 'a,
659        replica_location: NodeLocationCriteria<'a>,
660    ) -> impl Fn(NodeRef<'a>, Shard) -> bool {
661        move |node, shard| match replica_location {
662            NodeLocationCriteria::Any | NodeLocationCriteria::Datacenter(_) => {
663                predicate(node, shard)
664            }
665            NodeLocationCriteria::DatacenterAndRack(_, rack) => {
666                predicate(node, shard) && node.rack.as_deref() == Some(rack)
667            }
668        }
669    }
670
671    /// Returns iterator over replicas for given token and table spec, filtered
672    /// by provided location criteria and predicate.
673    /// Respects requested replica order, i.e. if requested, returns replicas ordered
674    /// deterministically (i.e. by token ring order or by tablet definition order),
675    /// else returns replicas in arbitrary order.
676    fn filtered_replicas<'a>(
677        &'a self,
678        ts: &TokenWithStrategy<'a>,
679        replica_location: NodeLocationCriteria<'a>,
680        predicate: impl Fn(NodeRef<'a>, Shard) -> bool + 'a,
681        cluster: &'a ClusterState,
682        order: ReplicaOrder,
683        table_spec: &TableSpec,
684    ) -> impl Iterator<Item = (NodeRef<'a>, Shard)> {
685        let predicate = Self::make_sharded_rack_predicate(predicate, replica_location);
686
687        let replica_iter = match order {
688            ReplicaOrder::Arbitrary => Either::Left(
689                self.nonfiltered_replica_set(ts, replica_location, cluster, table_spec)
690                    .into_iter(),
691            ),
692            ReplicaOrder::Deterministic => Either::Right(
693                self.nonfiltered_replica_set(ts, replica_location, cluster, table_spec)
694                    .into_replicas_ordered()
695                    .into_iter(),
696            ),
697        };
698        replica_iter.filter(move |(node, shard): &(NodeRef<'a>, Shard)| predicate(node, *shard))
699    }
700
701    /// Picks a replica for given token and table spec which meets the provided location criteria
702    /// and the predicate.
703    /// The replica is chosen randomly over all candidates that meet the criteria
704    /// unless the query is LWT; if so, the first replica meeting the criteria is chosen
705    /// to avoid Paxos contention.
706    fn pick_replica<'a>(
707        &'a self,
708        ts: &TokenWithStrategy<'a>,
709        replica_location: NodeLocationCriteria<'a>,
710        predicate: impl Fn(NodeRef<'a>, Shard) -> bool + 'a,
711        cluster: &'a ClusterState,
712        statement_type: StatementType,
713        table_spec: &TableSpec,
714    ) -> Option<PickedReplica<'a>> {
715        match statement_type {
716            StatementType::Lwt => {
717                self.pick_first_replica(ts, replica_location, predicate, cluster, table_spec)
718            }
719            StatementType::NonLwt => self
720                .pick_random_replica(ts, replica_location, predicate, cluster, table_spec)
721                .map(PickedReplica::Computed),
722        }
723    }
724
725    /// Picks the first (wrt the deterministic order imposed on the keyspace, see comment below)
726    /// replica for given token and table spec which meets the provided location criteria
727    /// and the predicate.
728    // This is to be used for LWT optimisation: in order to reduce contention
729    // caused by Paxos conflicts, we always try to query replicas in the same,
730    // deterministic order:
731    // - ring order for token ring keyspaces,
732    // - tablet definition order for tablet keyspaces.
733    //
734    // If preferred rack and DC are set, then the first (encountered on the ring) replica
735    // that resides in that rack in that DC **and** satisfies the `predicate` is returned.
736    //
737    // If preferred DC is set, then the first (encountered on the ring) replica
738    // that resides in that DC **and** satisfies the `predicate` is returned.
739    //
740    // If no DC/rack preferences are set, then the only possible replica to be returned
741    // (due to expensive computation of the others, and we avoid expensive computation in `pick()`)
742    // is the primary replica. If it exists, Some is returned, with either Computed(primary_replica)
743    // **iff** it satisfies the predicate or ToBeComputedInFallback otherwise.
744    fn pick_first_replica<'a>(
745        &'a self,
746        ts: &TokenWithStrategy<'a>,
747        replica_location: NodeLocationCriteria<'a>,
748        predicate: impl Fn(NodeRef<'a>, Shard) -> bool + 'a,
749        cluster: &'a ClusterState,
750        table_spec: &TableSpec,
751    ) -> Option<PickedReplica<'a>> {
752        match replica_location {
753            NodeLocationCriteria::Any => {
754                // ReplicaSet returned by ReplicaLocator for this case:
755                // 1) can be precomputed and later used cheaply,
756                // 2) returns replicas in the **non-ring order** (this because ReplicaSet chains
757                //    ring-ordered replicas sequences from different DCs, thus not preserving
758                //    the global ring order).
759                // Because of 2), we can't use a precomputed ReplicaSet, but instead we need ReplicasOrdered.
760                // As ReplicasOrdered can compute cheaply only the primary global replica
761                // (computation of the remaining ones is expensive), in case that the primary replica
762                // does not satisfy the `predicate`, ToBeComputedInFallback is returned.
763                // All expensive computation is to be done only when `fallback()` is called.
764                self.nonfiltered_replica_set(ts, replica_location, cluster, table_spec)
765                    .into_replicas_ordered()
766                    .into_iter()
767                    .next()
768                    .map(|(primary_replica, shard)| {
769                        if predicate(primary_replica, shard) {
770                            PickedReplica::Computed((primary_replica, shard))
771                        } else {
772                            PickedReplica::ToBeComputedInFallback
773                        }
774                    })
775            }
776            NodeLocationCriteria::Datacenter(_) | NodeLocationCriteria::DatacenterAndRack(_, _) => {
777                // ReplicaSet returned by ReplicaLocator for this case:
778                // 1) can be precomputed and later used cheaply,
779                // 2) returns replicas in the ring order (this is not true for the case
780                //    when multiple DCs are allowed, because ReplicaSet chains replicas sequences
781                //    from different DCs, thus not preserving the global ring order)
782                self.filtered_replicas(
783                    ts,
784                    replica_location,
785                    predicate,
786                    cluster,
787                    ReplicaOrder::Deterministic,
788                    table_spec,
789                )
790                .next()
791                .map(PickedReplica::Computed)
792            }
793        }
794    }
795
796    /// Picks a random replica for given token and table spec which meets the provided
797    /// location criteria and the predicate.
798    fn pick_random_replica<'a>(
799        &'a self,
800        ts: &TokenWithStrategy<'a>,
801        replica_location: NodeLocationCriteria<'a>,
802        predicate: impl Fn(NodeRef<'a>, Shard) -> bool + 'a,
803        cluster: &'a ClusterState,
804        table_spec: &TableSpec,
805    ) -> Option<(NodeRef<'a>, Shard)> {
806        let predicate = Self::make_sharded_rack_predicate(predicate, replica_location);
807
808        let replica_set = self.nonfiltered_replica_set(ts, replica_location, cluster, table_spec);
809
810        if let Some(fixed) = self.fixed_seed {
811            let mut gen = Pcg32::new(fixed, 0);
812            replica_set.choose_filtered(&mut gen, |(node, shard)| predicate(node, *shard))
813        } else {
814            replica_set.choose_filtered(&mut rng(), |(node, shard)| predicate(node, *shard))
815        }
816    }
817
818    /// Returns iterator over replicas for given token and table spec, filtered
819    /// by provided location criteria and predicate.
820    /// By default, the replicas are shuffled.
821    /// For LWTs, though, the replicas are instead returned in a deterministic order.
822    fn maybe_shuffled_replicas<'a>(
823        &'a self,
824        ts: &TokenWithStrategy<'a>,
825        replica_location: NodeLocationCriteria<'a>,
826        predicate: impl Fn(NodeRef<'a>, Shard) -> bool + 'a,
827        cluster: &'a ClusterState,
828        statement_type: StatementType,
829        table_spec: &TableSpec,
830    ) -> impl Iterator<Item = (NodeRef<'a>, Shard)> {
831        let order = match statement_type {
832            StatementType::Lwt => ReplicaOrder::Deterministic,
833            StatementType::NonLwt => ReplicaOrder::Arbitrary,
834        };
835
836        let replicas =
837            self.filtered_replicas(ts, replica_location, predicate, cluster, order, table_spec);
838
839        match statement_type {
840            // As an LWT optimisation: in order to reduce contention caused by Paxos conflicts,
841            // we always try to query replicas in the same order.
842            StatementType::Lwt => Either::Left(replicas),
843            StatementType::NonLwt => Either::Right(self.shuffle(replicas)),
844        }
845    }
846
847    /// Returns an iterator over the given slice of nodes, rotated by a random shift.
848    fn randomly_rotated_nodes(nodes: &[Arc<Node>]) -> impl Iterator<Item = NodeRef<'_>> {
849        // Create a randomly rotated slice view
850        let nodes_len = nodes.len();
851        if nodes_len > 0 {
852            let index = rng().random_range(0..nodes_len); // gen_range() panics when range is empty!
853            Either::Left(
854                nodes[index..]
855                    .iter()
856                    .chain(nodes[..index].iter())
857                    .take(nodes.len()),
858            )
859        } else {
860            Either::Right(std::iter::empty())
861        }
862    }
863
864    /// Picks a random node from the slice of nodes. The node must satisfy the given predicate.
865    fn pick_node<'a>(
866        &'a self,
867        nodes: &'a [Arc<Node>],
868        predicate: impl Fn(NodeRef<'a>) -> bool,
869    ) -> Option<NodeRef<'a>> {
870        // Select the first node that matches the predicate
871        Self::randomly_rotated_nodes(nodes).find(|&node| predicate(node))
872    }
873
874    /// Returns an iterator over the given slice of nodes, rotated by a random shift
875    /// and filtered by given predicate.
876    fn round_robin_nodes<'a>(
877        &'a self,
878        nodes: &'a [Arc<Node>],
879        predicate: impl Fn(NodeRef<'a>) -> bool,
880    ) -> impl Iterator<Item = NodeRef<'a>> {
881        Self::randomly_rotated_nodes(nodes).filter(move |node| predicate(node))
882    }
883
884    /// Wraps a given iterator by shuffling its contents.
885    fn shuffle<'a>(
886        &self,
887        iter: impl Iterator<Item = (NodeRef<'a>, Shard)>,
888    ) -> impl Iterator<Item = (NodeRef<'a>, Shard)> {
889        let mut vec: Vec<(NodeRef<'_>, Shard)> = iter.collect();
890
891        if let Some(fixed) = self.fixed_seed {
892            let mut gen = Pcg32::new(fixed, 0);
893            vec.shuffle(&mut gen);
894        } else {
895            vec.shuffle(&mut rng());
896        }
897
898        vec.into_iter()
899    }
900
901    /// Returns true iff the node should be considered to be alive.
902    fn is_alive(node: NodeRef, _shard: Option<Shard>) -> bool {
903        // For now, we leave this as stub, until we have time to improve node events.
904        // node.is_enabled() && !node.is_down()
905        node.is_enabled()
906    }
907
908    /// Returns true iff the datacenter failover is permitted for the statement being executed.
909    fn is_datacenter_failover_possible(&self) -> bool {
910        self.preferences.datacenter().is_some() && self.permit_dc_failover
911    }
912}
913
914impl Default for DefaultPolicy {
915    fn default() -> Self {
916        Self {
917            preferences: NodeLocationPreference::Any,
918            is_token_aware: true,
919            permit_dc_failover: false,
920            pick_predicate: Box::new(Self::is_alive),
921            latency_awareness: None,
922            fixed_seed: None,
923        }
924    }
925}
926
927/// The intended way to instantiate the DefaultPolicy.
928///
929/// # Example
930/// ```
931/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
932/// use scylla::policies::load_balancing::DefaultPolicy;
933///
934/// let default_policy = DefaultPolicy::builder()
935///     .prefer_datacenter("dc1".to_string())
936///     .token_aware(true)
937///     .permit_dc_failover(true)
938///     .build();
939/// # Ok(())
940/// # }
941#[derive(Clone, Debug)]
942pub struct DefaultPolicyBuilder {
943    preferences: NodeLocationPreference,
944    is_token_aware: bool,
945    permit_dc_failover: bool,
946    latency_awareness: Option<LatencyAwarenessBuilder>,
947    enable_replica_shuffle: bool,
948}
949
950impl DefaultPolicyBuilder {
951    /// Creates a builder used to customise configuration of a new DefaultPolicy.
952    pub fn new() -> Self {
953        Self {
954            preferences: NodeLocationPreference::Any,
955            is_token_aware: true,
956            permit_dc_failover: false,
957            latency_awareness: None,
958            enable_replica_shuffle: true,
959        }
960    }
961
962    /// Builds a new DefaultPolicy with the previously set configuration.
963    pub fn build(self) -> Arc<dyn LoadBalancingPolicy> {
964        let latency_awareness = self.latency_awareness.map(|builder| builder.build());
965        let pick_predicate = if let Some(ref latency_awareness) = latency_awareness {
966            let latency_predicate = latency_awareness.generate_predicate();
967            Box::new(move |node: NodeRef<'_>, shard| {
968                DefaultPolicy::is_alive(node, shard) && latency_predicate(node)
969            })
970                as Box<dyn Fn(NodeRef<'_>, Option<Shard>) -> bool + Send + Sync + 'static>
971        } else {
972            Box::new(DefaultPolicy::is_alive)
973        };
974
975        Arc::new(DefaultPolicy {
976            preferences: self.preferences,
977            is_token_aware: self.is_token_aware,
978            permit_dc_failover: self.permit_dc_failover,
979            pick_predicate,
980            latency_awareness,
981            fixed_seed: (!self.enable_replica_shuffle).then(|| {
982                let seed = rand::random();
983                debug!("DefaultPolicy: setting fixed seed to {}", seed);
984                seed
985            }),
986        })
987    }
988
989    /// Sets the datacenter to be preferred by this policy.
990    ///
991    /// Allows the load balancing policy to prioritize nodes based on their location.
992    /// When a preferred datacenter is set, the policy will treat nodes in that
993    /// datacenter as "local" nodes, and nodes in other datacenters as "remote" nodes.
994    /// This affects the order in which nodes are returned by the policy when
995    /// selecting replicas for read or write operations. If no preferred datacenter
996    /// is specified, the policy will treat all nodes as local nodes.
997    ///
998    /// When datacenter failover is disabled (`permit_dc_failover` is set to false),
999    /// the default policy will only include local nodes in load balancing plans.
1000    /// Remote nodes will be excluded, even if they are alive and available
1001    /// to serve requests.
1002    pub fn prefer_datacenter(mut self, datacenter_name: String) -> Self {
1003        self.preferences = NodeLocationPreference::Datacenter(datacenter_name);
1004        self
1005    }
1006
1007    /// Sets the datacenter and rack to be preferred by this policy.
1008    ///
1009    /// Allows the load balancing policy to prioritize nodes based on their location
1010    /// as well as their availability zones in the preferred datacenter.
1011    /// When a preferred datacenter is set, the policy will treat nodes in that
1012    /// datacenter as "local" nodes, and nodes in other datacenters as "remote" nodes.
1013    /// This affects the order in which nodes are returned by the policy when
1014    /// selecting replicas for read or write operations. If no preferred datacenter
1015    /// is specified, the policy will treat all nodes as local nodes.
1016    ///
1017    /// When datacenter failover is disabled (`permit_dc_failover` is set to false),
1018    /// the default policy will only include local nodes in load balancing plans.
1019    /// Remote nodes will be excluded, even if they are alive and available
1020    /// to serve requests.
1021    ///
1022    /// When a preferred rack is set, the policy will first return replicas in the local rack
1023    /// in the preferred datacenter, and then the other replicas in the datacenter.
1024    pub fn prefer_datacenter_and_rack(
1025        mut self,
1026        datacenter_name: String,
1027        rack_name: String,
1028    ) -> Self {
1029        self.preferences = NodeLocationPreference::DatacenterAndRack(datacenter_name, rack_name);
1030        self
1031    }
1032
1033    /// Sets whether this policy is token-aware (balances load more consciously) or not.
1034    ///
1035    /// Token awareness refers to a mechanism by which the driver is aware
1036    /// of the token range assigned to each node in the cluster. Tokens
1037    /// are assigned to nodes to partition the data and distribute it
1038    /// across the cluster.
1039    ///
1040    /// When a user wants to read or write data, the driver can use token awareness
1041    /// to route the request to the correct node based on the token range of the data
1042    /// being accessed. This can help to minimize network traffic and improve
1043    /// performance by ensuring that the data is accessed locally as much as possible.
1044    ///
1045    /// In the case of `DefaultPolicy`, token awareness is enabled by default,
1046    /// meaning that the policy will prefer to return alive local replicas
1047    /// if the token is available. This means that if the client is requesting data
1048    /// that falls within the token range of a particular node, the policy will try
1049    /// to route the request to that node first, assuming it is alive and responsive.
1050    ///
1051    /// Token awareness can significantly improve the performance and scalability
1052    /// of applications built on Scylla. By using token awareness, users can ensure
1053    /// that data is accessed locally as much as possible, reducing network overhead
1054    /// and improving throughput.
1055    pub fn token_aware(mut self, is_token_aware: bool) -> Self {
1056        self.is_token_aware = is_token_aware;
1057        self
1058    }
1059
1060    /// Sets whether this policy permits datacenter failover, i.e. ever attempts
1061    /// to send requests to nodes from a non-preferred datacenter.
1062    ///
1063    /// In the event of a datacenter outage or network failure, the nodes
1064    /// in that datacenter may become unavailable, and clients may no longer
1065    /// be able to access data stored on those nodes. To address this,
1066    /// the `DefaultPolicy` supports datacenter failover, which allows routing
1067    /// requests to nodes in other datacenters if the local nodes are unavailable.
1068    ///
1069    /// Datacenter failover can be enabled in `DefaultPolicy` setting this flag.
1070    /// When it is set, the policy will prefer to return alive remote replicas
1071    /// if datacenter failover is permitted and possible due to consistency
1072    /// constraints.
1073    pub fn permit_dc_failover(mut self, permit: bool) -> Self {
1074        self.permit_dc_failover = permit;
1075        self
1076    }
1077
1078    /// Latency awareness is a mechanism that penalises nodes whose measured
1079    /// recent average latency classifies it as falling behind the others.
1080    ///
1081    /// Every `update_rate` the global minimum average latency is computed,
1082    /// and all nodes whose average latency is worse than `exclusion_threshold`
1083    /// times the global minimum average latency become penalised for
1084    /// `retry_period`. Penalisation involves putting those nodes at the very end
1085    /// of the query plan. As it is often not truly beneficial to prefer
1086    /// faster non-replica than replicas lagging behind the non-replicas,
1087    /// this mechanism may as well worsen latencies and/or throughput.
1088    ///
1089    /// ATTENTION: using latency awareness is NOT recommended, unless prior
1090    /// benchmarks prove its beneficial impact on the specific workload's
1091    /// performance. Use with caution.
1092    pub fn latency_awareness(mut self, latency_awareness_builder: LatencyAwarenessBuilder) -> Self {
1093        self.latency_awareness = Some(latency_awareness_builder);
1094        self
1095    }
1096
1097    /// Sets whether this policy should shuffle replicas when token-awareness
1098    /// is enabled. Shuffling can help distribute the load over replicas, but
1099    /// can reduce the effectiveness of caching on the database side (e.g.
1100    /// for reads).
1101    ///
1102    /// This option is enabled by default. If disabled, replicas will be chosen
1103    /// in some random order that is chosen when the load balancing policy
1104    /// is created and will not change over its lifetime.
1105    pub fn enable_shuffling_replicas(mut self, enable: bool) -> Self {
1106        self.enable_replica_shuffle = enable;
1107        self
1108    }
1109}
1110
1111impl Default for DefaultPolicyBuilder {
1112    fn default() -> Self {
1113        Self::new()
1114    }
1115}
1116
1117struct ProcessedRoutingInfo<'a> {
1118    token_with_strategy: Option<TokenWithStrategy<'a>>,
1119}
1120
1121impl<'a> ProcessedRoutingInfo<'a> {
1122    fn new(query: &'a RoutingInfo, cluster: &'a ClusterState) -> ProcessedRoutingInfo<'a> {
1123        Self {
1124            token_with_strategy: TokenWithStrategy::new(query, cluster),
1125        }
1126    }
1127}
1128
1129struct TokenWithStrategy<'a> {
1130    strategy: &'a Strategy,
1131    token: Token,
1132}
1133
1134impl<'a> TokenWithStrategy<'a> {
1135    fn new(query: &'a RoutingInfo, cluster: &'a ClusterState) -> Option<TokenWithStrategy<'a>> {
1136        let token = query.token?;
1137        let keyspace_name = query.table?.ks_name();
1138        let keyspace = cluster.get_keyspace(keyspace_name)?;
1139        let strategy = &keyspace.strategy;
1140        Some(TokenWithStrategy { strategy, token })
1141    }
1142}
1143
1144#[cfg(test)]
1145mod tests {
1146    use std::collections::HashMap;
1147
1148    use scylla_cql::{frame::types::SerialConsistency, Consistency};
1149    use tracing::info;
1150
1151    use self::framework::{
1152        get_plan_and_collect_node_identifiers, mock_cluster_state_for_token_unaware_tests,
1153        ExpectedGroups, ExpectedGroupsBuilder,
1154    };
1155    use crate::policies::host_filter::HostFilter;
1156    use crate::routing::locator::tablets::TabletsInfo;
1157    use crate::routing::locator::test::{
1158        id_to_invalid_addr, mock_metadata_for_token_aware_tests, TABLE_NTS_RF_2, TABLE_NTS_RF_3,
1159        TABLE_SS_RF_2,
1160    };
1161    use crate::{
1162        cluster::ClusterState,
1163        policies::load_balancing::{
1164            default::tests::framework::mock_cluster_state_for_token_aware_tests, Plan, RoutingInfo,
1165        },
1166        routing::Token,
1167        test_utils::setup_tracing,
1168    };
1169
1170    use super::{DefaultPolicy, NodeLocationPreference};
1171
1172    pub(crate) mod framework {
1173        use crate::routing::locator::test::{
1174            id_to_invalid_addr, mock_metadata_for_token_aware_tests,
1175        };
1176        use std::collections::{HashMap, HashSet};
1177
1178        use uuid::Uuid;
1179
1180        use crate::{
1181            cluster::{
1182                metadata::{Metadata, Peer},
1183                ClusterState,
1184            },
1185            policies::load_balancing::{LoadBalancingPolicy, Plan, RoutingInfo},
1186            routing::Token,
1187            test_utils::setup_tracing,
1188        };
1189
1190        use super::TabletsInfo;
1191
1192        #[derive(Debug)]
1193        enum ExpectedGroup {
1194            NonDeterministic(HashSet<u16>),
1195            Deterministic(HashSet<u16>),
1196            Ordered(Vec<u16>),
1197        }
1198
1199        impl ExpectedGroup {
1200            fn len(&self) -> usize {
1201                match self {
1202                    Self::NonDeterministic(s) => s.len(),
1203                    Self::Deterministic(s) => s.len(),
1204                    Self::Ordered(v) => v.len(),
1205                }
1206            }
1207        }
1208
1209        pub(crate) struct ExpectedGroupsBuilder {
1210            groups: Vec<ExpectedGroup>,
1211        }
1212
1213        impl ExpectedGroupsBuilder {
1214            pub(crate) fn new() -> Self {
1215                Self { groups: Vec::new() }
1216            }
1217            /// Expects that the next group in the plan will have a set of nodes
1218            /// that is equal to the provided one. The groups are assumed to be
1219            /// non deterministic, i.e. the policy is expected to shuffle
1220            /// the nodes within that group.
1221            pub(crate) fn group(mut self, group: impl IntoIterator<Item = u16>) -> Self {
1222                self.groups
1223                    .push(ExpectedGroup::NonDeterministic(group.into_iter().collect()));
1224                self
1225            }
1226            /// Expects that the next group in the plan will have a set of nodes
1227            /// that is equal to the provided one, but the order of nodes in
1228            /// that group must be stable over multiple plans.
1229            pub(crate) fn deterministic(mut self, group: impl IntoIterator<Item = u16>) -> Self {
1230                self.groups
1231                    .push(ExpectedGroup::Deterministic(group.into_iter().collect()));
1232                self
1233            }
1234            /// Expects that the next group in the plan will have a sequence of nodes
1235            /// that is equal to the provided one, including order.
1236            pub(crate) fn ordered(mut self, group: impl IntoIterator<Item = u16>) -> Self {
1237                self.groups
1238                    .push(ExpectedGroup::Ordered(group.into_iter().collect()));
1239                self
1240            }
1241            pub(crate) fn build(self) -> ExpectedGroups {
1242                ExpectedGroups {
1243                    groups: self.groups,
1244                }
1245            }
1246        }
1247
1248        #[derive(Debug)]
1249        pub(crate) struct ExpectedGroups {
1250            groups: Vec<ExpectedGroup>,
1251        }
1252
1253        impl ExpectedGroups {
1254            pub(crate) fn assert_proper_grouping_in_plans(&self, gots: &[Vec<u16>]) {
1255                // For simplicity, assume that there is at least one plan
1256                // in `gots`
1257                assert!(!gots.is_empty());
1258
1259                // Each plan is assumed to have the same number of groups.
1260                // For group index `i`, the code below will go over all plans
1261                // and will collect their `i`-th groups and put them under
1262                // index `i` in `sets_of_groups`.
1263                let mut sets_of_groups: Vec<HashSet<Vec<u16>>> =
1264                    vec![HashSet::new(); self.groups.len()];
1265
1266                for got in gots {
1267                    // First, make sure that `got` has the right number of items,
1268                    // equal to the sum of sizes of all expected groups
1269                    let combined_groups_len: usize = self.groups.iter().map(|s| s.len()).sum();
1270                    assert_eq!(
1271                        got.len(),
1272                        combined_groups_len,
1273                        "Plan length different than expected. Got plan {:?}, expected groups {:?}",
1274                        got,
1275                        self.groups,
1276                    );
1277
1278                    // Now, split `got` into groups of expected sizes
1279                    // and just `assert_eq` them
1280                    let mut got = got.iter();
1281                    for (group_id, expected) in self.groups.iter().enumerate() {
1282                        // Collect the nodes that constitute the group
1283                        // in the actual plan
1284                        let got_group: Vec<_> = (&mut got).take(expected.len()).copied().collect();
1285
1286                        match expected {
1287                            ExpectedGroup::NonDeterministic(expected_set)
1288                            | ExpectedGroup::Deterministic(expected_set) => {
1289                                // Verify that the group has the same nodes as the
1290                                // expected one
1291                                let got_set: HashSet<_> = got_group.iter().copied().collect();
1292                                assert_eq!(&got_set, expected_set, "Unordered group mismatch");
1293                            }
1294                            ExpectedGroup::Ordered(sequence) => {
1295                                assert_eq!(&got_group, sequence, "Ordered group mismatch");
1296                            }
1297                        }
1298
1299                        // Put the group into sets_of_groups
1300                        sets_of_groups[group_id].insert(got_group);
1301                    }
1302                }
1303
1304                // Verify that the groups are either deterministic
1305                // or non-deterministic
1306                for (sets, expected) in sets_of_groups.iter().zip(self.groups.iter()) {
1307                    match expected {
1308                        ExpectedGroup::NonDeterministic(s) => {
1309                            // The group is supposed to have non-deterministic
1310                            // ordering. If the group size is larger than one,
1311                            // then expect there to be more than one group
1312                            // in the set.
1313                            if gots.len() > 1 && s.len() > 1 {
1314                                assert!(
1315                                    sets.len() > 1,
1316                                    "Group {:?} is expected to be nondeterministic, but it appears to be deterministic",
1317                                    expected
1318                                );
1319                            }
1320                        }
1321                        ExpectedGroup::Deterministic(_) | ExpectedGroup::Ordered(_) => {
1322                            // The group is supposed to be deterministic,
1323                            // i.e. a given instance of the default policy
1324                            // must always return the nodes within it using
1325                            // the same order.
1326                            // There will only be one, unique ordering shared
1327                            // by all plans - check this
1328                            assert_eq!(
1329                                sets.len(),
1330                                1,
1331                                "Group {:?} is expected to be deterministic, but it appears to be nondeterministic",
1332                                expected
1333                            );
1334                        }
1335                    }
1336                }
1337            }
1338        }
1339
1340        #[test]
1341        fn test_assert_proper_grouping_in_plan_good() {
1342            setup_tracing();
1343            let got = vec![1u16, 2, 3, 4, 5];
1344            let expected_groups = ExpectedGroupsBuilder::new()
1345                .group([1])
1346                .group([3, 2, 4])
1347                .group([5])
1348                .build();
1349
1350            expected_groups.assert_proper_grouping_in_plans(&[got]);
1351        }
1352
1353        #[test]
1354        #[should_panic]
1355        fn test_assert_proper_grouping_in_plan_too_many_nodes_in_the_end() {
1356            setup_tracing();
1357            let got = vec![1u16, 2, 3, 4, 5, 6];
1358            let expected_groups = ExpectedGroupsBuilder::new()
1359                .group([1])
1360                .group([3, 2, 4])
1361                .group([5])
1362                .build();
1363
1364            expected_groups.assert_proper_grouping_in_plans(&[got]);
1365        }
1366
1367        #[test]
1368        #[should_panic]
1369        fn test_assert_proper_grouping_in_plan_too_many_nodes_in_the_middle() {
1370            setup_tracing();
1371            let got = vec![1u16, 2, 6, 3, 4, 5];
1372            let expected_groups = ExpectedGroupsBuilder::new()
1373                .group([1])
1374                .group([3, 2, 4])
1375                .group([5])
1376                .build();
1377
1378            expected_groups.assert_proper_grouping_in_plans(&[got]);
1379        }
1380
1381        #[test]
1382        #[should_panic]
1383        fn test_assert_proper_grouping_in_plan_missing_node() {
1384            setup_tracing();
1385            let got = vec![1u16, 2, 3, 4];
1386            let expected_groups = ExpectedGroupsBuilder::new()
1387                .group([1])
1388                .group([3, 2, 4])
1389                .group([5])
1390                .build();
1391
1392            expected_groups.assert_proper_grouping_in_plans(&[got]);
1393        }
1394
1395        // based on locator mock cluster
1396        pub(crate) async fn mock_cluster_state_for_token_aware_tests() -> ClusterState {
1397            let metadata = mock_metadata_for_token_aware_tests();
1398            ClusterState::new(
1399                metadata,
1400                &Default::default(),
1401                &HashMap::new(),
1402                &None,
1403                None,
1404                TabletsInfo::new(),
1405                &HashMap::new(),
1406                #[cfg(feature = "metrics")]
1407                &Default::default(),
1408            )
1409            .await
1410        }
1411
1412        // creates ClusterState with info about 5 nodes living in 2 different datacenters
1413        // ring field is minimal, not intended to influence the tests
1414        pub(crate) async fn mock_cluster_state_for_token_unaware_tests() -> ClusterState {
1415            let peers = [("eu", 1), ("eu", 2), ("eu", 3), ("us", 4), ("us", 5)]
1416                .iter()
1417                .map(|(dc, id)| Peer {
1418                    datacenter: Some(dc.to_string()),
1419                    rack: None,
1420                    address: id_to_invalid_addr(*id),
1421                    tokens: vec![Token::new(*id as i64 * 100)],
1422                    host_id: Uuid::new_v4(),
1423                })
1424                .collect::<Vec<_>>();
1425
1426            let info = Metadata {
1427                peers,
1428                keyspaces: HashMap::new(),
1429            };
1430
1431            ClusterState::new(
1432                info,
1433                &Default::default(),
1434                &HashMap::new(),
1435                &None,
1436                None,
1437                TabletsInfo::new(),
1438                &HashMap::new(),
1439                #[cfg(feature = "metrics")]
1440                &Default::default(),
1441            )
1442            .await
1443        }
1444
1445        pub(crate) fn get_plan_and_collect_node_identifiers(
1446            policy: &impl LoadBalancingPolicy,
1447            query_info: &RoutingInfo,
1448            cluster: &ClusterState,
1449        ) -> Vec<u16> {
1450            let plan = Plan::new(policy, query_info, cluster);
1451            plan.map(|(node, _shard)| node.address.port())
1452                .collect::<Vec<_>>()
1453        }
1454    }
1455
1456    pub(crate) const EMPTY_ROUTING_INFO: RoutingInfo = RoutingInfo {
1457        token: None,
1458        table: None,
1459        is_confirmed_lwt: false,
1460        consistency: Consistency::Quorum,
1461        serial_consistency: Some(SerialConsistency::Serial),
1462    };
1463
1464    pub(super) async fn test_default_policy_with_given_cluster_and_routing_info(
1465        policy: &DefaultPolicy,
1466        cluster: &ClusterState,
1467        routing_info: &RoutingInfo<'_>,
1468        expected_groups: &ExpectedGroups,
1469    ) {
1470        let mut plans = Vec::new();
1471        for _ in 0..256 {
1472            let plan = get_plan_and_collect_node_identifiers(policy, routing_info, cluster);
1473            plans.push(plan);
1474        }
1475        let example_plan = Plan::new(policy, routing_info, cluster);
1476        info!("Example plan from policy:",);
1477        for (node, shard) in example_plan {
1478            info!(
1479                "Node port: {}, shard: {}, dc: {:?}, rack: {:?}, down: {:?}",
1480                node.address.port(),
1481                shard,
1482                node.datacenter,
1483                node.rack,
1484                node.is_down()
1485            );
1486        }
1487
1488        expected_groups.assert_proper_grouping_in_plans(&plans);
1489    }
1490
1491    async fn test_given_default_policy_with_token_unaware_statements(
1492        policy: DefaultPolicy,
1493        expected_groups: &ExpectedGroups,
1494    ) {
1495        let cluster = mock_cluster_state_for_token_unaware_tests().await;
1496
1497        test_default_policy_with_given_cluster_and_routing_info(
1498            &policy,
1499            &cluster,
1500            &EMPTY_ROUTING_INFO,
1501            expected_groups,
1502        )
1503        .await;
1504    }
1505
1506    #[tokio::test]
1507    async fn test_default_policy_with_token_unaware_statements() {
1508        setup_tracing();
1509        let local_dc = "eu".to_string();
1510        let policy_with_disabled_dc_failover = DefaultPolicy {
1511            preferences: NodeLocationPreference::Datacenter(local_dc.clone()),
1512            permit_dc_failover: false,
1513            ..Default::default()
1514        };
1515        let expected_groups = ExpectedGroupsBuilder::new()
1516            .group([1, 2, 3]) // pick + fallback local nodes
1517            .build();
1518        test_given_default_policy_with_token_unaware_statements(
1519            policy_with_disabled_dc_failover,
1520            &expected_groups,
1521        )
1522        .await;
1523
1524        let policy_with_enabled_dc_failover = DefaultPolicy {
1525            preferences: NodeLocationPreference::Datacenter(local_dc.clone()),
1526            permit_dc_failover: true,
1527            ..Default::default()
1528        };
1529        let expected_groups = ExpectedGroupsBuilder::new()
1530            .group([1, 2, 3]) // pick + fallback local nodes
1531            .group([4, 5]) // fallback remote nodes
1532            .build();
1533        test_given_default_policy_with_token_unaware_statements(
1534            policy_with_enabled_dc_failover,
1535            &expected_groups,
1536        )
1537        .await;
1538    }
1539
1540    #[tokio::test]
1541    async fn test_default_policy_with_token_aware_statements() {
1542        setup_tracing();
1543
1544        use crate::routing::locator::test::{A, B, C, D, E, F, G};
1545        let cluster = mock_cluster_state_for_token_aware_tests().await;
1546
1547        #[derive(Debug)]
1548        struct Test<'a> {
1549            policy: DefaultPolicy,
1550            routing_info: RoutingInfo<'a>,
1551            expected_groups: ExpectedGroups,
1552        }
1553
1554        let tests = [
1555            // Keyspace NTS with RF=2 with enabled DC failover
1556            Test {
1557                policy: DefaultPolicy {
1558                    preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
1559                    is_token_aware: true,
1560                    permit_dc_failover: true,
1561                    ..Default::default()
1562                },
1563                routing_info: RoutingInfo {
1564                    token: Some(Token::new(160)),
1565                    table: Some(TABLE_NTS_RF_2),
1566                    consistency: Consistency::Two,
1567                    ..Default::default()
1568                },
1569                // going through the ring, we get order: F , A , C , D , G , B , E
1570                //                                      us  eu  eu  us  eu  eu  us
1571                //                                      r2  r1  r1  r1  r2  r1  r1
1572                expected_groups: ExpectedGroupsBuilder::new()
1573                    .group([A, G]) // pick + fallback local replicas
1574                    .group([F, D]) // remote replicas
1575                    .group([C, B]) // local nodes
1576                    .group([E]) // remote nodes
1577                    .build(),
1578            },
1579            // Keyspace NTS with RF=2 with enabled DC failover, shuffling replicas disabled
1580            Test {
1581                policy: DefaultPolicy {
1582                    preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
1583                    is_token_aware: true,
1584                    permit_dc_failover: true,
1585                    fixed_seed: Some(123),
1586                    ..Default::default()
1587                },
1588                routing_info: RoutingInfo {
1589                    token: Some(Token::new(160)),
1590                    table: Some(TABLE_NTS_RF_2),
1591                    consistency: Consistency::Two,
1592                    ..Default::default()
1593                },
1594                // going through the ring, we get order: F , A , C , D , G , B , E
1595                //                                      us  eu  eu  us  eu  eu  us
1596                //                                      r2  r1  r1  r1  r2  r1  r1
1597                expected_groups: ExpectedGroupsBuilder::new()
1598                    .deterministic([A, G]) // pick + fallback local replicas
1599                    .deterministic([F, D]) // remote replicas
1600                    .group([C, B]) // local nodes
1601                    .group([E]) // remote nodes
1602                    .build(),
1603            },
1604            // Keyspace NTS with RF=2 with DC with local Consistency. DC failover should still work.
1605            Test {
1606                policy: DefaultPolicy {
1607                    preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
1608                    is_token_aware: true,
1609                    permit_dc_failover: true,
1610                    fixed_seed: Some(123),
1611                    ..Default::default()
1612                },
1613                routing_info: RoutingInfo {
1614                    token: Some(Token::new(160)),
1615                    table: Some(TABLE_NTS_RF_2),
1616                    consistency: Consistency::LocalOne,
1617                    ..Default::default()
1618                },
1619                // going through the ring, we get order: F , A , C , D , G , B , E
1620                //                                      us  eu  eu  us  eu  eu  us
1621                //                                      r2  r1  r1  r1  r2  r1  r1
1622                expected_groups: ExpectedGroupsBuilder::new()
1623                    .deterministic([A, G]) // pick + fallback local replicas
1624                    .deterministic([F, D]) // remote replicas
1625                    .group([C, B]) // local nodes
1626                    .group([E]) // remote nodes
1627                    .build(),
1628            },
1629            // Keyspace NTS with RF=2 with explicitly disabled DC failover
1630            Test {
1631                policy: DefaultPolicy {
1632                    preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
1633                    is_token_aware: true,
1634                    permit_dc_failover: false,
1635                    ..Default::default()
1636                },
1637                routing_info: RoutingInfo {
1638                    token: Some(Token::new(160)),
1639                    table: Some(TABLE_NTS_RF_2),
1640                    consistency: Consistency::One,
1641                    ..Default::default()
1642                },
1643                // going through the ring, we get order: F , A , C , D , G , B , E
1644                //                                      us  eu  eu  us  eu  eu  us
1645                //                                      r2  r1  r1  r1  r2  r1  r1
1646                expected_groups: ExpectedGroupsBuilder::new()
1647                    .group([A, G]) // pick + fallback local replicas
1648                    .group([C, B]) // local nodes
1649                    .build(), // failover is explicitly forbidden
1650            },
1651            // Keyspace NTS with RF=3 with enabled DC failover
1652            Test {
1653                policy: DefaultPolicy {
1654                    preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
1655                    is_token_aware: true,
1656                    permit_dc_failover: true,
1657                    ..Default::default()
1658                },
1659                routing_info: RoutingInfo {
1660                    token: Some(Token::new(160)),
1661                    table: Some(TABLE_NTS_RF_3),
1662                    consistency: Consistency::Quorum,
1663                    ..Default::default()
1664                },
1665                // going through the ring, we get order: F , A , C , D , G , B , E
1666                //                                      us  eu  eu  us  eu  eu  us
1667                //                                      r2  r1  r1  r1  r2  r1  r1
1668                expected_groups: ExpectedGroupsBuilder::new()
1669                    .group([A, C, G]) // pick + fallback local replicas
1670                    .group([F, D, E]) // remote replicas
1671                    .group([B]) // local nodes
1672                    .group([]) // remote nodes
1673                    .build(),
1674            },
1675            // Keyspace NTS with RF=3 with enabled DC failover, shuffling replicas disabled
1676            Test {
1677                policy: DefaultPolicy {
1678                    preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
1679                    is_token_aware: true,
1680                    permit_dc_failover: true,
1681                    fixed_seed: Some(123),
1682                    ..Default::default()
1683                },
1684                routing_info: RoutingInfo {
1685                    token: Some(Token::new(160)),
1686                    table: Some(TABLE_NTS_RF_3),
1687                    consistency: Consistency::Quorum,
1688                    ..Default::default()
1689                },
1690                // going through the ring, we get order: F , A , C , D , G , B , E
1691                //                                      us  eu  eu  us  eu  eu  us
1692                //                                      r2  r1  r1  r1  r2  r1  r1
1693                expected_groups: ExpectedGroupsBuilder::new()
1694                    .deterministic([A, C, G]) // pick + fallback local replicas
1695                    .deterministic([F, D, E]) // remote replicas
1696                    .group([B]) // local nodes
1697                    .group([]) // remote nodes
1698                    .build(),
1699            },
1700            // Keyspace NTS with RF=3 with disabled DC failover
1701            Test {
1702                policy: DefaultPolicy {
1703                    preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
1704                    is_token_aware: true,
1705                    permit_dc_failover: false,
1706                    ..Default::default()
1707                },
1708                routing_info: RoutingInfo {
1709                    token: Some(Token::new(160)),
1710                    table: Some(TABLE_NTS_RF_3),
1711                    consistency: Consistency::Quorum,
1712                    ..Default::default()
1713                },
1714                // going through the ring, we get order: F , A , C , D , G , B , E
1715                //                                      us  eu  eu  us  eu  eu  us
1716                //                                      r2  r1  r1  r1  r2  r1  r1
1717                expected_groups: ExpectedGroupsBuilder::new()
1718                    .group([A, C, G]) // pick + fallback local replicas
1719                    .group([B]) // local nodes
1720                    .build(), // failover explicitly forbidden
1721            },
1722            // Keyspace SS with RF=2 with enabled DC failover
1723            Test {
1724                policy: DefaultPolicy {
1725                    preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
1726                    is_token_aware: true,
1727                    permit_dc_failover: true,
1728                    ..Default::default()
1729                },
1730                routing_info: RoutingInfo {
1731                    token: Some(Token::new(160)),
1732                    table: Some(TABLE_SS_RF_2),
1733                    consistency: Consistency::Two,
1734                    ..Default::default()
1735                },
1736                // going through the ring, we get order: F , A , C , D , G , B , E
1737                //                                      us  eu  eu  us  eu  eu  us
1738                //                                      r2  r1  r1  r1  r2  r1  r1
1739                expected_groups: ExpectedGroupsBuilder::new()
1740                    .group([A]) // pick + fallback local replicas
1741                    .group([F]) // remote replicas
1742                    .group([C, G, B]) // local nodes
1743                    .group([D, E]) // remote nodes
1744                    .build(),
1745            },
1746            // Keyspace SS with RF=2 with DC failover and local Consistency. DC failover should still work.
1747            Test {
1748                policy: DefaultPolicy {
1749                    preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
1750                    is_token_aware: true,
1751                    permit_dc_failover: true,
1752                    ..Default::default()
1753                },
1754                routing_info: RoutingInfo {
1755                    token: Some(Token::new(160)),
1756                    table: Some(TABLE_SS_RF_2),
1757                    consistency: Consistency::LocalOne,
1758                    ..Default::default()
1759                },
1760                // going through the ring, we get order: F , A , C , D , G , B , E
1761                //                                      us  eu  eu  us  eu  eu  us
1762                //                                      r2  r1  r1  r1  r2  r1  r1
1763                // going through the ring, we get order: F , A , C , D , G , B , E
1764                //                                      us  eu  eu  us  eu  eu  us
1765                //                                      r2  r1  r1  r1  r2  r1  r1
1766                expected_groups: ExpectedGroupsBuilder::new()
1767                    .group([A]) // pick + fallback local replicas
1768                    .group([F]) // remote replicas
1769                    .group([C, G, B]) // local nodes
1770                    .group([D, E]) // remote nodes
1771                    .build(),
1772            },
1773            // No token implies no token awareness
1774            Test {
1775                policy: DefaultPolicy {
1776                    preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
1777                    is_token_aware: true,
1778                    permit_dc_failover: true,
1779                    ..Default::default()
1780                },
1781                routing_info: RoutingInfo {
1782                    token: None, // no token
1783                    table: Some(TABLE_NTS_RF_3),
1784                    consistency: Consistency::Quorum,
1785                    ..Default::default()
1786                },
1787                expected_groups: ExpectedGroupsBuilder::new()
1788                    .group([A, B, C, G]) // local nodes
1789                    .group([D, E, F]) // remote nodes
1790                    .build(),
1791            },
1792            // No keyspace implies no token awareness
1793            Test {
1794                policy: DefaultPolicy {
1795                    preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
1796                    is_token_aware: true,
1797                    permit_dc_failover: true,
1798                    ..Default::default()
1799                },
1800                routing_info: RoutingInfo {
1801                    token: Some(Token::new(160)),
1802                    table: None, // no keyspace
1803                    consistency: Consistency::Quorum,
1804                    ..Default::default()
1805                },
1806                expected_groups: ExpectedGroupsBuilder::new()
1807                    .group([A, B, C, G]) // local nodes
1808                    .group([D, E, F]) // remote nodes
1809                    .build(),
1810            },
1811            // Unknown preferred DC, failover permitted
1812            Test {
1813                policy: DefaultPolicy {
1814                    preferences: NodeLocationPreference::Datacenter("au".to_owned()),
1815                    is_token_aware: true,
1816                    permit_dc_failover: true,
1817                    ..Default::default()
1818                },
1819                routing_info: RoutingInfo {
1820                    token: Some(Token::new(160)),
1821                    table: Some(TABLE_NTS_RF_2),
1822                    consistency: Consistency::Quorum,
1823                    ..Default::default()
1824                },
1825                // going through the ring, we get order: F , A , C , D , G , B , E
1826                //                                      us  eu  eu  us  eu  eu  us
1827                //                                      r2  r1  r1  r1  r2  r1  r1
1828                expected_groups: ExpectedGroupsBuilder::new()
1829                    .group([A, D, F, G]) // remote replicas
1830                    .group([B, C, E]) // remote nodes
1831                    .build(),
1832            },
1833            // Unknown preferred DC, failover forbidden
1834            Test {
1835                policy: DefaultPolicy {
1836                    preferences: NodeLocationPreference::Datacenter("au".to_owned()),
1837                    is_token_aware: true,
1838                    permit_dc_failover: false,
1839                    ..Default::default()
1840                },
1841                routing_info: RoutingInfo {
1842                    token: Some(Token::new(160)),
1843                    table: Some(TABLE_NTS_RF_2),
1844                    consistency: Consistency::Quorum,
1845                    ..Default::default()
1846                },
1847                // going through the ring, we get order: F , A , C , D , G , B , E
1848                //                                      us  eu  eu  us  eu  eu  us
1849                //                                      r2  r1  r1  r1  r2  r1  r1
1850                expected_groups: ExpectedGroupsBuilder::new().build(), // empty plan, because all nodes are remote and failover is forbidden
1851            },
1852            // No preferred DC, failover permitted
1853            Test {
1854                policy: DefaultPolicy {
1855                    preferences: NodeLocationPreference::Any,
1856                    is_token_aware: true,
1857                    permit_dc_failover: true,
1858                    ..Default::default()
1859                },
1860                routing_info: RoutingInfo {
1861                    token: Some(Token::new(160)),
1862                    table: Some(TABLE_NTS_RF_2),
1863                    consistency: Consistency::Quorum,
1864                    ..Default::default()
1865                },
1866                // going through the ring, we get order: F , A , C , D , G , B , E
1867                //                                      us  eu  eu  us  eu  eu  us
1868                //                                      r2  r1  r1  r1  r2  r1  r1
1869                expected_groups: ExpectedGroupsBuilder::new()
1870                    .group([A, D, F, G]) // remote replicas
1871                    .group([B, C, E]) // remote nodes
1872                    .build(),
1873            },
1874            // No preferred DC, failover forbidden
1875            Test {
1876                policy: DefaultPolicy {
1877                    preferences: NodeLocationPreference::Any,
1878                    is_token_aware: true,
1879                    permit_dc_failover: false,
1880                    ..Default::default()
1881                },
1882                routing_info: RoutingInfo {
1883                    token: Some(Token::new(160)),
1884                    table: Some(TABLE_NTS_RF_2),
1885                    consistency: Consistency::Quorum,
1886                    ..Default::default()
1887                },
1888                // going through the ring, we get order: F , A , C , D , G , B , E
1889                //                                      us  eu  eu  us  eu  eu  us
1890                //                                      r2  r1  r1  r1  r2  r1  r1
1891                expected_groups: ExpectedGroupsBuilder::new()
1892                    .group([A, D, F, G]) // remote replicas
1893                    .group([B, C, E]) // remote nodes
1894                    .build(),
1895            },
1896            // Keyspace NTS with RF=3 with enabled DC failover and rack-awareness
1897            Test {
1898                policy: DefaultPolicy {
1899                    preferences: NodeLocationPreference::DatacenterAndRack(
1900                        "eu".to_owned(),
1901                        "r1".to_owned(),
1902                    ),
1903                    is_token_aware: true,
1904                    permit_dc_failover: true,
1905                    ..Default::default()
1906                },
1907                routing_info: RoutingInfo {
1908                    token: Some(Token::new(160)),
1909                    table: Some(TABLE_NTS_RF_3),
1910                    consistency: Consistency::One,
1911                    ..Default::default()
1912                },
1913                // going through the ring, we get order: F , A , C , D , G , B , E
1914                //                                      us  eu  eu  us  eu  eu  us
1915                //                                      r2  r1  r1  r1  r2  r1  r1
1916                expected_groups: ExpectedGroupsBuilder::new()
1917                    .group([A, C]) // pick local rack replicas
1918                    .group([G]) // local DC replicas
1919                    .group([F, D, E]) // remote replicas
1920                    .group([B]) // local nodes
1921                    .build(),
1922            },
1923            // Keyspace SS with RF=2 with enabled rack-awareness, shuffling replicas disabled
1924            Test {
1925                policy: DefaultPolicy {
1926                    preferences: NodeLocationPreference::DatacenterAndRack(
1927                        "eu".to_owned(),
1928                        "r1".to_owned(),
1929                    ),
1930                    is_token_aware: true,
1931                    permit_dc_failover: false,
1932                    fixed_seed: Some(123),
1933                    ..Default::default()
1934                },
1935                routing_info: RoutingInfo {
1936                    token: Some(Token::new(560)),
1937                    table: Some(TABLE_SS_RF_2),
1938                    consistency: Consistency::Two,
1939                    ..Default::default()
1940                },
1941                // going through the ring, we get order: B , C , E , G , A , F , D
1942                //                                      eu  eu  us  eu  eu  us  us
1943                //                                      r1  r1  r1  r2  r1  r2  r1
1944                expected_groups: ExpectedGroupsBuilder::new()
1945                    .deterministic([B]) // pick local rack replicas
1946                    .deterministic([C]) // fallback replicas
1947                    .group([A]) // local rack nodes
1948                    .group([G]) // local DC nodes
1949                    .build(),
1950            },
1951            // Keyspace SS with RF=2 with enabled rack-awareness and no local-rack replica
1952            Test {
1953                policy: DefaultPolicy {
1954                    preferences: NodeLocationPreference::DatacenterAndRack(
1955                        "eu".to_owned(),
1956                        "r2".to_owned(),
1957                    ),
1958                    is_token_aware: true,
1959                    permit_dc_failover: false,
1960                    ..Default::default()
1961                },
1962                routing_info: RoutingInfo {
1963                    token: Some(Token::new(160)),
1964                    table: Some(TABLE_SS_RF_2),
1965                    consistency: Consistency::One,
1966                    ..Default::default()
1967                },
1968                // going through the ring, we get order: F , A , C , D , G , B , E
1969                //                                      us  eu  eu  us  eu  eu  us
1970                //                                      r2  r1  r1  r1  r2  r1  r1
1971                expected_groups: ExpectedGroupsBuilder::new()
1972                    .group([A]) // pick local DC
1973                    .group([G]) // local rack nodes
1974                    .group([C, B]) // local DC nodes
1975                    .build(),
1976            },
1977            // Keyspace NTS with RF=3 with enabled DC failover and rack-awareness, no token provided
1978            Test {
1979                policy: DefaultPolicy {
1980                    preferences: NodeLocationPreference::DatacenterAndRack(
1981                        "eu".to_owned(),
1982                        "r1".to_owned(),
1983                    ),
1984                    is_token_aware: true,
1985                    permit_dc_failover: true,
1986                    ..Default::default()
1987                },
1988                routing_info: RoutingInfo {
1989                    token: None,
1990                    table: Some(TABLE_NTS_RF_3),
1991                    consistency: Consistency::One,
1992                    ..Default::default()
1993                },
1994                // going through the ring, we get order: F , A , C , D , G , B , E
1995                //                                      us  eu  eu  us  eu  eu  us
1996                //                                      r2  r1  r1  r1  r2  r1  r1
1997                expected_groups: ExpectedGroupsBuilder::new()
1998                    .group([A, C, B]) // local rack nodes
1999                    .group([G]) // local DC nodes
2000                    .group([F, D, E]) // remote nodes
2001                    .build(),
2002            },
2003        ];
2004
2005        for test in tests {
2006            info!("Test: {:?}", test);
2007            let Test {
2008                policy,
2009                routing_info,
2010                expected_groups,
2011            } = test;
2012            test_default_policy_with_given_cluster_and_routing_info(
2013                &policy,
2014                &cluster,
2015                &routing_info,
2016                &expected_groups,
2017            )
2018            .await;
2019        }
2020    }
2021
2022    #[tokio::test]
2023    async fn test_default_policy_with_lwt_statements() {
2024        setup_tracing();
2025        use crate::routing::locator::test::{A, B, C, D, E, F, G};
2026
2027        let cluster = mock_cluster_state_for_token_aware_tests().await;
2028        struct Test<'a> {
2029            policy: DefaultPolicy,
2030            routing_info: RoutingInfo<'a>,
2031            expected_groups: ExpectedGroups,
2032        }
2033
2034        let tests = [
2035            // Keyspace NTS with RF=2 with enabled DC failover
2036            Test {
2037                policy: DefaultPolicy {
2038                    preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
2039                    is_token_aware: true,
2040                    permit_dc_failover: true,
2041                    ..Default::default()
2042                },
2043                routing_info: RoutingInfo {
2044                    token: Some(Token::new(160)),
2045                    table: Some(TABLE_NTS_RF_2),
2046                    consistency: Consistency::Two,
2047                    is_confirmed_lwt: true,
2048                    ..Default::default()
2049                },
2050                // going through the ring, we get order: F , A , C , D , G , B , E
2051                //                                      us  eu  eu  us  eu  eu  us
2052                //                                      r2  r1  r1  r1  r2  r1  r1
2053                expected_groups: ExpectedGroupsBuilder::new()
2054                    .ordered([A, G]) // pick + fallback local replicas
2055                    .ordered([F, D]) // remote replicas
2056                    .group([C, B]) // local nodes
2057                    .group([E]) // remote nodes
2058                    .build(),
2059            },
2060            // Keyspace NTS with RF=2 with enabled DC failover, shuffling replicas disabled
2061            Test {
2062                policy: DefaultPolicy {
2063                    preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
2064                    is_token_aware: true,
2065                    permit_dc_failover: true,
2066                    fixed_seed: Some(123),
2067                    ..Default::default()
2068                },
2069                routing_info: RoutingInfo {
2070                    token: Some(Token::new(160)),
2071                    table: Some(TABLE_NTS_RF_2),
2072                    consistency: Consistency::Two,
2073                    is_confirmed_lwt: true,
2074                    ..Default::default()
2075                },
2076                // going through the ring, we get order: F , A , C , D , G , B , E
2077                //                                      us  eu  eu  us  eu  eu  us
2078                //                                      r2  r1  r1  r1  r2  r1  r1
2079                expected_groups: ExpectedGroupsBuilder::new()
2080                    .ordered([A, G]) // pick + fallback local replicas
2081                    .ordered([F, D]) // remote replicas
2082                    .group([C, B]) // local nodes
2083                    .group([E]) // remote nodes
2084                    .build(),
2085            },
2086            // Keyspace NTS with RF=2 with DC failover and local Consistency. DC failover should still work.
2087            Test {
2088                policy: DefaultPolicy {
2089                    preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
2090                    is_token_aware: true,
2091                    permit_dc_failover: true,
2092                    ..Default::default()
2093                },
2094                routing_info: RoutingInfo {
2095                    token: Some(Token::new(160)),
2096                    table: Some(TABLE_NTS_RF_2),
2097                    consistency: Consistency::LocalOne,
2098                    is_confirmed_lwt: true,
2099                    ..Default::default()
2100                },
2101                // going through the ring, we get order: F , A , C , D , G , B , E
2102                //                                      us  eu  eu  us  eu  eu  us
2103                //                                      r2  r1  r1  r1  r2  r1  r1
2104                expected_groups: ExpectedGroupsBuilder::new()
2105                    .ordered([A, G]) // pick + fallback local replicas
2106                    .ordered([F, D]) // remote replicas
2107                    .group([C, B]) // local nodes
2108                    .group([E]) // remote nodes
2109                    .build(),
2110            },
2111            // Keyspace NTS with RF=2 with explicitly disabled DC failover
2112            Test {
2113                policy: DefaultPolicy {
2114                    preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
2115                    is_token_aware: true,
2116                    permit_dc_failover: false,
2117                    ..Default::default()
2118                },
2119                routing_info: RoutingInfo {
2120                    token: Some(Token::new(160)),
2121                    table: Some(TABLE_NTS_RF_2),
2122                    consistency: Consistency::One,
2123                    is_confirmed_lwt: true,
2124                    ..Default::default()
2125                },
2126                // going through the ring, we get order: F , A , C , D , G , B , E
2127                //                                      us  eu  eu  us  eu  eu  us
2128                //                                      r2  r1  r1  r1  r2  r1  r1
2129                expected_groups: ExpectedGroupsBuilder::new()
2130                    .ordered([A, G]) // pick + fallback local replicas
2131                    .group([C, B]) // local nodes
2132                    .build(), // failover is explicitly forbidden
2133            },
2134            // Keyspace NTS with RF=3 with enabled DC failover
2135            Test {
2136                policy: DefaultPolicy {
2137                    preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
2138                    is_token_aware: true,
2139                    permit_dc_failover: true,
2140                    ..Default::default()
2141                },
2142                routing_info: RoutingInfo {
2143                    token: Some(Token::new(160)),
2144                    table: Some(TABLE_NTS_RF_3),
2145                    consistency: Consistency::Quorum,
2146                    is_confirmed_lwt: true,
2147                    ..Default::default()
2148                },
2149                // going through the ring, we get order: F , A , C , D , G , B , E
2150                //                                      us  eu  eu  us  eu  eu  us
2151                //                                      r2  r1  r1  r1  r2  r1  r1
2152                expected_groups: ExpectedGroupsBuilder::new()
2153                    .ordered([A, C, G]) // pick + fallback local replicas
2154                    .ordered([F, D, E]) // remote replicas
2155                    .group([B]) // local nodes
2156                    .group([]) // remote nodes
2157                    .build(),
2158            },
2159            // Keyspace NTS with RF=3 with enabled DC failover, shuffling replicas disabled
2160            Test {
2161                policy: DefaultPolicy {
2162                    preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
2163                    is_token_aware: true,
2164                    permit_dc_failover: true,
2165                    fixed_seed: Some(123),
2166                    ..Default::default()
2167                },
2168                routing_info: RoutingInfo {
2169                    token: Some(Token::new(160)),
2170                    table: Some(TABLE_NTS_RF_3),
2171                    consistency: Consistency::Quorum,
2172                    is_confirmed_lwt: true,
2173                    ..Default::default()
2174                },
2175                // going through the ring, we get order: F , A , C , D , G , B , E
2176                //                                      us  eu  eu  us  eu  eu  us
2177                //                                      r2  r1  r1  r1  r2  r1  r1
2178                expected_groups: ExpectedGroupsBuilder::new()
2179                    .ordered([A, C, G]) // pick + fallback local replicas
2180                    .ordered([F, D, E]) // remote replicas
2181                    .group([B]) // local nodes
2182                    .group([]) // remote nodes
2183                    .build(),
2184            },
2185            // Keyspace NTS with RF=3 with disabled DC failover
2186            Test {
2187                policy: DefaultPolicy {
2188                    preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
2189                    is_token_aware: true,
2190                    permit_dc_failover: false,
2191                    ..Default::default()
2192                },
2193                routing_info: RoutingInfo {
2194                    token: Some(Token::new(160)),
2195                    table: Some(TABLE_NTS_RF_3),
2196                    consistency: Consistency::Quorum,
2197                    is_confirmed_lwt: true,
2198                    ..Default::default()
2199                },
2200                // going through the ring, we get order: F , A , C , D , G , B , E
2201                //                                      us  eu  eu  us  eu  eu  us
2202                //                                      r2  r1  r1  r1  r2  r1  r1
2203                expected_groups: ExpectedGroupsBuilder::new()
2204                    .ordered([A, C, G]) // pick + fallback local replicas
2205                    .group([B]) // local nodes
2206                    .build(), // failover explicitly forbidden
2207            },
2208            // Keyspace SS with RF=2 with enabled DC failover
2209            Test {
2210                policy: DefaultPolicy {
2211                    preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
2212                    is_token_aware: true,
2213                    permit_dc_failover: true,
2214                    ..Default::default()
2215                },
2216                routing_info: RoutingInfo {
2217                    token: Some(Token::new(160)),
2218                    table: Some(TABLE_SS_RF_2),
2219                    consistency: Consistency::Two,
2220                    is_confirmed_lwt: true,
2221                    ..Default::default()
2222                },
2223                // going through the ring, we get order: F , A , C , D , G , B , E
2224                //                                      us  eu  eu  us  eu  eu  us
2225                //                                      r2  r1  r1  r1  r2  r1  r1
2226                expected_groups: ExpectedGroupsBuilder::new()
2227                    .ordered([A]) // pick + fallback local replicas
2228                    .ordered([F]) // remote replicas
2229                    .group([C, G, B]) // local nodes
2230                    .group([D, E]) // remote nodes
2231                    .build(),
2232            },
2233            // Keyspace SS with RF=2 with DC failover and local Consistency. DC failover should still work.
2234            Test {
2235                policy: DefaultPolicy {
2236                    preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
2237                    is_token_aware: true,
2238                    permit_dc_failover: true,
2239                    ..Default::default()
2240                },
2241                routing_info: RoutingInfo {
2242                    token: Some(Token::new(160)),
2243                    table: Some(TABLE_SS_RF_2),
2244                    consistency: Consistency::LocalOne,
2245                    is_confirmed_lwt: true,
2246                    ..Default::default()
2247                },
2248                // going through the ring, we get order: F , A , C , D , G , B , E
2249                //                                      us  eu  eu  us  eu  eu  us
2250                //                                      r2  r1  r1  r1  r2  r1  r1
2251                expected_groups: ExpectedGroupsBuilder::new()
2252                    .ordered([A]) // pick + fallback local replicas
2253                    .ordered([F]) // remote replicas
2254                    .group([C, G, B]) // local nodes
2255                    .group([D, E]) // remote nodes
2256                    .build(),
2257            },
2258            // No token implies no token awareness
2259            Test {
2260                policy: DefaultPolicy {
2261                    preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
2262                    is_token_aware: true,
2263                    permit_dc_failover: true,
2264                    ..Default::default()
2265                },
2266                routing_info: RoutingInfo {
2267                    token: None, // no token
2268                    table: Some(TABLE_NTS_RF_3),
2269                    consistency: Consistency::Quorum,
2270                    is_confirmed_lwt: true,
2271                    ..Default::default()
2272                },
2273                expected_groups: ExpectedGroupsBuilder::new()
2274                    .group([A, B, C, G]) // local nodes
2275                    .group([D, E, F]) // remote nodes
2276                    .build(),
2277            },
2278            // No keyspace implies no token awareness
2279            Test {
2280                policy: DefaultPolicy {
2281                    preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
2282                    is_token_aware: true,
2283                    permit_dc_failover: true,
2284                    ..Default::default()
2285                },
2286                routing_info: RoutingInfo {
2287                    token: Some(Token::new(160)),
2288                    table: None, // no keyspace
2289                    consistency: Consistency::Quorum,
2290                    is_confirmed_lwt: true,
2291                    ..Default::default()
2292                },
2293                expected_groups: ExpectedGroupsBuilder::new()
2294                    .group([A, B, C, G]) // local nodes
2295                    .group([D, E, F]) // remote nodes
2296                    .build(),
2297            },
2298            // Unknown preferred DC, failover permitted
2299            Test {
2300                policy: DefaultPolicy {
2301                    preferences: NodeLocationPreference::Datacenter("au".to_owned()),
2302                    is_token_aware: true,
2303                    permit_dc_failover: true,
2304                    ..Default::default()
2305                },
2306                routing_info: RoutingInfo {
2307                    token: Some(Token::new(160)),
2308                    table: Some(TABLE_NTS_RF_2),
2309                    consistency: Consistency::Quorum,
2310                    is_confirmed_lwt: true,
2311                    ..Default::default()
2312                },
2313                // going through the ring, we get order: F , A , C , D , G , B , E
2314                //                                      us  eu  eu  us  eu  eu  us
2315                //                                      r2  r1  r1  r1  r2  r1  r1
2316                expected_groups: ExpectedGroupsBuilder::new()
2317                    .ordered([F, A, D, G]) // remote replicas
2318                    .group([B, C, E]) // remote nodes
2319                    .build(),
2320            },
2321            // Unknown preferred DC, failover forbidden
2322            Test {
2323                policy: DefaultPolicy {
2324                    preferences: NodeLocationPreference::Datacenter("au".to_owned()),
2325                    is_token_aware: true,
2326                    permit_dc_failover: false,
2327                    ..Default::default()
2328                },
2329                routing_info: RoutingInfo {
2330                    token: Some(Token::new(160)),
2331                    table: Some(TABLE_NTS_RF_2),
2332                    consistency: Consistency::Quorum,
2333                    is_confirmed_lwt: true,
2334                    ..Default::default()
2335                },
2336                // going through the ring, we get order: F , A , C , D , G , B , E
2337                //                                      us  eu  eu  us  eu  eu  us
2338                //                                      r2  r1  r1  r1  r2  r1  r1
2339                expected_groups: ExpectedGroupsBuilder::new().build(), // empty plan, because all nodes are remote and failover is forbidden
2340            },
2341            // No preferred DC, failover permitted
2342            Test {
2343                policy: DefaultPolicy {
2344                    preferences: NodeLocationPreference::Any,
2345                    is_token_aware: true,
2346                    permit_dc_failover: true,
2347                    ..Default::default()
2348                },
2349                routing_info: RoutingInfo {
2350                    token: Some(Token::new(160)),
2351                    table: Some(TABLE_NTS_RF_2),
2352                    consistency: Consistency::Quorum,
2353                    is_confirmed_lwt: true,
2354                    ..Default::default()
2355                },
2356                // going through the ring, we get order: F , A , C , D , G , B , E
2357                //                                      us  eu  eu  us  eu  eu  us
2358                //                                      r2  r1  r1  r1  r2  r1  r1
2359                expected_groups: ExpectedGroupsBuilder::new()
2360                    .ordered([F, A, D, G]) // remote replicas
2361                    .group([B, C, E]) // remote nodes
2362                    .build(),
2363            },
2364            // No preferred DC, failover forbidden
2365            Test {
2366                policy: DefaultPolicy {
2367                    preferences: NodeLocationPreference::Any,
2368                    is_token_aware: true,
2369                    permit_dc_failover: false,
2370                    ..Default::default()
2371                },
2372                routing_info: RoutingInfo {
2373                    token: Some(Token::new(160)),
2374                    table: Some(TABLE_NTS_RF_2),
2375                    consistency: Consistency::Quorum,
2376                    is_confirmed_lwt: true,
2377                    ..Default::default()
2378                },
2379                // going through the ring, we get order: F , A , C , D , G , B , E
2380                //                                      us  eu  eu  us  eu  eu  us
2381                //                                      r2  r1  r1  r1  r2  r1  r1
2382                expected_groups: ExpectedGroupsBuilder::new()
2383                    .ordered([F, A, D, G]) // remote replicas
2384                    .group([B, C, E]) // remote nodes
2385                    .build(),
2386            },
2387            // Keyspace NTS with RF=3 with enabled DC failover and rack-awareness
2388            Test {
2389                policy: DefaultPolicy {
2390                    preferences: NodeLocationPreference::DatacenterAndRack(
2391                        "eu".to_owned(),
2392                        "r1".to_owned(),
2393                    ),
2394                    is_token_aware: true,
2395                    permit_dc_failover: true,
2396                    ..Default::default()
2397                },
2398                routing_info: RoutingInfo {
2399                    token: Some(Token::new(160)),
2400                    table: Some(TABLE_NTS_RF_3),
2401                    consistency: Consistency::One,
2402                    is_confirmed_lwt: true,
2403                    ..Default::default()
2404                },
2405                // going through the ring, we get order: F , A , C , D , G , B , E
2406                //                                      us  eu  eu  us  eu  eu  us
2407                //                                      r2  r1  r1  r1  r2  r1  r1
2408                expected_groups: ExpectedGroupsBuilder::new()
2409                    .ordered([A, C]) // pick local rack replicas
2410                    .ordered([G]) // local DC replicas
2411                    .ordered([F, D, E]) // remote replicas
2412                    .group([B]) // local nodes
2413                    .build(),
2414            },
2415            // Keyspace SS with RF=2 with enabled rack-awareness, shuffling replicas disabled
2416            Test {
2417                policy: DefaultPolicy {
2418                    preferences: NodeLocationPreference::DatacenterAndRack(
2419                        "eu".to_owned(),
2420                        "r1".to_owned(),
2421                    ),
2422                    is_token_aware: true,
2423                    permit_dc_failover: false,
2424                    fixed_seed: Some(123),
2425                    ..Default::default()
2426                },
2427                routing_info: RoutingInfo {
2428                    token: Some(Token::new(760)),
2429                    table: Some(TABLE_SS_RF_2),
2430                    consistency: Consistency::Two,
2431                    is_confirmed_lwt: true,
2432                    ..Default::default()
2433                },
2434                // going through the ring, we get order: G , B , A , E , F , C , D
2435                //                                      eu  eu  eu  us  us  eu  us
2436                //                                      r2  r1  r1  r1  r2  r1  r1
2437                expected_groups: ExpectedGroupsBuilder::new()
2438                    .ordered([B]) // pick local rack replicas
2439                    .ordered([G]) // local DC replicas
2440                    .group([A, C]) // local nodes
2441                    .build(),
2442            },
2443            // Keyspace SS with RF=2 with enabled rack-awareness and no local-rack replica
2444            Test {
2445                policy: DefaultPolicy {
2446                    preferences: NodeLocationPreference::DatacenterAndRack(
2447                        "eu".to_owned(),
2448                        "r2".to_owned(),
2449                    ),
2450                    is_token_aware: true,
2451                    permit_dc_failover: false,
2452                    ..Default::default()
2453                },
2454                routing_info: RoutingInfo {
2455                    token: Some(Token::new(160)),
2456                    table: Some(TABLE_SS_RF_2),
2457                    consistency: Consistency::One,
2458                    is_confirmed_lwt: true,
2459                    ..Default::default()
2460                },
2461                // going through the ring, we get order: F , A , C , D , G , B , E
2462                //                                      us  eu  eu  us  eu  eu  us
2463                //                                      r2  r1  r1  r1  r2  r1  r1
2464                expected_groups: ExpectedGroupsBuilder::new()
2465                    .group([A]) // pick local DC
2466                    .group([C, G, B]) // local nodes
2467                    .build(),
2468            },
2469        ];
2470
2471        for Test {
2472            policy,
2473            routing_info,
2474            expected_groups,
2475        } in tests
2476        {
2477            test_default_policy_with_given_cluster_and_routing_info(
2478                &policy,
2479                &cluster,
2480                &routing_info,
2481                &expected_groups,
2482            )
2483            .await;
2484        }
2485
2486        let cluster_with_disabled_node_f = ClusterState::new(
2487            mock_metadata_for_token_aware_tests(),
2488            &Default::default(),
2489            &HashMap::new(),
2490            &None,
2491            {
2492                struct FHostFilter;
2493                impl HostFilter for FHostFilter {
2494                    fn accept(&self, peer: &crate::cluster::metadata::Peer) -> bool {
2495                        peer.address != id_to_invalid_addr(F)
2496                    }
2497                }
2498
2499                Some(&FHostFilter)
2500            },
2501            TabletsInfo::new(),
2502            &HashMap::new(),
2503            #[cfg(feature = "metrics")]
2504            &Default::default(),
2505        )
2506        .await;
2507
2508        let tests_with_disabled_node_f = [
2509            // Keyspace NTS with RF=3 without preferred DC.
2510            // The primary replica does not satisfy the predicate (being disabled by HostFilter),
2511            // so pick() should return None and fallback should return A first.
2512            //
2513            // This is a regression test after a bug was fixed.
2514            Test {
2515                policy: DefaultPolicy {
2516                    preferences: NodeLocationPreference::Any,
2517                    is_token_aware: true,
2518                    permit_dc_failover: true,
2519                    pick_predicate: Box::new(|node, _shard| node.address != id_to_invalid_addr(F)),
2520                    ..Default::default()
2521                },
2522                routing_info: RoutingInfo {
2523                    token: Some(Token::new(160)),
2524                    table: Some(TABLE_NTS_RF_3),
2525                    consistency: Consistency::One,
2526                    is_confirmed_lwt: true,
2527                    ..Default::default()
2528                },
2529                // going through the ring, we get order: F , A , C , D , G , B , E
2530                //                                      us  eu  eu  us  eu  eu  us
2531                //                                      r2  r1  r1  r1  r2  r1  r1
2532                expected_groups: ExpectedGroupsBuilder::new()
2533                    // pick is empty, because the primary replica does not satisfy pick predicate,
2534                    // and with LWT we cannot compute other replicas for NTS without allocations.
2535                    .ordered([A, C, D, G, E]) // replicas
2536                    .group([B]) // nodes
2537                    .build(),
2538            },
2539        ];
2540
2541        for Test {
2542            policy,
2543            routing_info,
2544            expected_groups,
2545        } in tests_with_disabled_node_f
2546        {
2547            test_default_policy_with_given_cluster_and_routing_info(
2548                &policy,
2549                &cluster_with_disabled_node_f,
2550                &routing_info,
2551                &expected_groups,
2552            )
2553            .await;
2554        }
2555    }
2556}
2557
2558mod latency_awareness {
2559    use futures::{future::RemoteHandle, FutureExt};
2560    use itertools::Either;
2561    use tokio::time::{Duration, Instant};
2562    use tracing::{trace, warn};
2563    use uuid::Uuid;
2564
2565    use crate::cluster::node::Node;
2566    use crate::errors::{DbError, RequestAttemptError};
2567    use crate::policies::load_balancing::NodeRef;
2568    use crate::routing::Shard;
2569    use std::{
2570        collections::HashMap,
2571        ops::Deref,
2572        sync::{
2573            atomic::{AtomicU64, Ordering},
2574            Arc, RwLock,
2575        },
2576    };
2577
2578    #[derive(Debug)]
2579    struct AtomicDuration(AtomicU64);
2580
2581    impl AtomicDuration {
2582        fn new() -> Self {
2583            Self(AtomicU64::new(u64::MAX))
2584        }
2585
2586        fn store(&self, duration: Duration) {
2587            self.0.store(duration.as_micros() as u64, Ordering::Relaxed)
2588        }
2589
2590        fn load(&self) -> Option<Duration> {
2591            let micros = self.0.load(Ordering::Relaxed);
2592            if micros == u64::MAX {
2593                None
2594            } else {
2595                Some(Duration::from_micros(micros))
2596            }
2597        }
2598    }
2599
2600    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
2601    pub(super) struct TimestampedAverage {
2602        pub(super) timestamp: Instant,
2603        pub(super) average: Duration,
2604        pub(super) num_measures: usize,
2605    }
2606
2607    impl TimestampedAverage {
2608        pub(crate) fn compute_next(
2609            previous: Option<Self>,
2610            last_latency: Duration,
2611            scale_secs: f64,
2612        ) -> Option<Self> {
2613            let now = Instant::now();
2614            match previous {
2615                prev if last_latency.is_zero() => prev,
2616                None => Some(Self {
2617                    num_measures: 1,
2618                    average: last_latency,
2619                    timestamp: now,
2620                }),
2621                Some(prev_avg) => Some({
2622                    let delay = now
2623                        .saturating_duration_since(prev_avg.timestamp)
2624                        .as_secs_f64();
2625                    let scaled_delay = delay / scale_secs;
2626                    let prev_weight = if scaled_delay <= 0. {
2627                        1.
2628                    } else {
2629                        (scaled_delay + 1.).ln() / scaled_delay
2630                    };
2631
2632                    let last_latency_secs = last_latency.as_secs_f64();
2633                    let prev_avg_secs = prev_avg.average.as_secs_f64();
2634                    let average = match Duration::try_from_secs_f64(
2635                        (1. - prev_weight) * last_latency_secs + prev_weight * prev_avg_secs,
2636                    ) {
2637                        Ok(ts) => ts,
2638                        Err(e) => {
2639                            warn!(
2640                                "Error while calculating average: {e}. \
2641                                prev_avg_secs: {prev_avg_secs}, \
2642                                last_latency_secs: {last_latency_secs}, \
2643                                prev_weight: {prev_weight}, \
2644                                scaled_delay: {scaled_delay}, \
2645                                delay: {delay}, \
2646                                prev_avg.timestamp: {:?}, \
2647                                now: {now:?}",
2648                                prev_avg.timestamp
2649                            );
2650
2651                            // Not sure when we could enter this branch,
2652                            // so I have no idea what would be a sensible value to return here,
2653                            // this does not seem like a very bad choice.
2654                            prev_avg.average
2655                        }
2656                    };
2657                    Self {
2658                        num_measures: prev_avg.num_measures + 1,
2659                        timestamp: now,
2660                        average,
2661                    }
2662                }),
2663            }
2664        }
2665    }
2666
2667    /// A latency-aware load balancing policy module, which enables penalising nodes that are too slow.
2668    #[derive(Debug)]
2669    pub(super) struct LatencyAwareness {
2670        pub(super) exclusion_threshold: f64,
2671        pub(super) retry_period: Duration,
2672        pub(super) _update_rate: Duration,
2673        pub(super) minimum_measurements: usize,
2674        pub(super) scale_secs: f64,
2675
2676        /// Last minimum average latency that was noted among the nodes. It is updated every
2677        /// [update_rate](Self::_update_rate).
2678        last_min_latency: Arc<AtomicDuration>,
2679
2680        node_avgs: Arc<RwLock<HashMap<Uuid, RwLock<Option<TimestampedAverage>>>>>,
2681
2682        // This is Some iff there is an associated updater running on a separate Tokio task
2683        // For some tests, not to rely on timing, this is None. The updater is then tick'ed
2684        // explicitly from outside this struct.
2685        _updater_handle: Option<RemoteHandle<()>>,
2686    }
2687
2688    impl LatencyAwareness {
2689        pub(super) fn builder() -> LatencyAwarenessBuilder {
2690            LatencyAwarenessBuilder::new()
2691        }
2692
2693        fn new_for_test(
2694            exclusion_threshold: f64,
2695            retry_period: Duration,
2696            update_rate: Duration,
2697            minimum_measurements: usize,
2698            scale: Duration,
2699        ) -> (Self, MinAvgUpdater) {
2700            let min_latency = Arc::new(AtomicDuration::new());
2701
2702            let min_latency_clone = min_latency.clone();
2703            let node_avgs = Arc::new(RwLock::new(HashMap::new()));
2704            let node_avgs_clone = node_avgs.clone();
2705
2706            let updater = MinAvgUpdater {
2707                node_avgs,
2708                min_latency,
2709                minimum_measurements,
2710            };
2711
2712            (
2713                Self {
2714                    exclusion_threshold,
2715                    retry_period,
2716                    _update_rate: update_rate,
2717                    minimum_measurements,
2718                    scale_secs: scale.as_secs_f64(),
2719                    last_min_latency: min_latency_clone,
2720                    node_avgs: node_avgs_clone,
2721                    _updater_handle: None,
2722                },
2723                updater,
2724            )
2725        }
2726
2727        fn new(
2728            exclusion_threshold: f64,
2729            retry_period: Duration,
2730            update_rate: Duration,
2731            minimum_measurements: usize,
2732            scale: Duration,
2733        ) -> Self {
2734            let (self_, updater) = Self::new_for_test(
2735                exclusion_threshold,
2736                retry_period,
2737                update_rate,
2738                minimum_measurements,
2739                scale,
2740            );
2741
2742            let (updater_fut, updater_handle) = async move {
2743                let mut update_scheduler = tokio::time::interval(update_rate);
2744                loop {
2745                    update_scheduler.tick().await;
2746                    updater.tick().await;
2747                }
2748            }
2749            .remote_handle();
2750            tokio::task::spawn(updater_fut);
2751
2752            Self {
2753                _updater_handle: Some(updater_handle),
2754                ..self_
2755            }
2756        }
2757
2758        pub(super) fn generate_predicate(&self) -> impl Fn(&Node) -> bool {
2759            let last_min_latency = self.last_min_latency.clone();
2760            let node_avgs = self.node_avgs.clone();
2761            let exclusion_threshold = self.exclusion_threshold;
2762            let minimum_measurements = self.minimum_measurements;
2763            let retry_period = self.retry_period;
2764
2765            move |node| {
2766                last_min_latency.load().map(|min_avg| match fast_enough(&node_avgs.read().unwrap(), node.host_id, exclusion_threshold, retry_period, minimum_measurements, min_avg) {
2767                    FastEnough::Yes => true,
2768                    FastEnough::No { average } => {
2769                        trace!("Latency awareness: Penalising node {{address={}, datacenter={:?}, rack={:?}}} for being on average at least {} times slower (latency: {}ms) than the fastest ({}ms).",
2770                                node.address, node.datacenter, node.rack, exclusion_threshold, average.as_millis(), min_avg.as_millis());
2771                        false
2772                    }
2773                }).unwrap_or(true)
2774            }
2775        }
2776
2777        pub(super) fn wrap<'a>(
2778            &self,
2779            fallback: impl Iterator<Item = (NodeRef<'a>, Option<Shard>)>,
2780        ) -> impl Iterator<Item = (NodeRef<'a>, Option<Shard>)> {
2781            let min_avg_latency = match self.last_min_latency.load() {
2782                Some(min_avg) => min_avg,
2783                None => return Either::Left(fallback), // noop, as no latency data has been collected yet
2784            };
2785
2786            let average_latencies = self.node_avgs.read().unwrap();
2787            let targets = fallback;
2788
2789            let mut fast_targets = vec![];
2790            let mut penalised_targets = vec![];
2791
2792            for node_and_shard @ (node, _shard) in targets {
2793                match fast_enough(
2794                    average_latencies.deref(),
2795                    node.host_id,
2796                    self.exclusion_threshold,
2797                    self.retry_period,
2798                    self.minimum_measurements,
2799                    min_avg_latency,
2800                ) {
2801                    FastEnough::Yes => fast_targets.push(node_and_shard),
2802                    FastEnough::No { average } => {
2803                        trace!("Latency awareness: Penalising node {{address={}, datacenter={:?}, rack={:?}}} for being on average at least {} times slower (latency: {}ms) than the fastest ({}ms).",
2804                                node.address, node.datacenter, node.rack, self.exclusion_threshold, average.as_millis(), min_avg_latency.as_millis());
2805                        penalised_targets.push(node_and_shard);
2806                    }
2807                }
2808            }
2809
2810            let mut fast_targets = fast_targets.into_iter();
2811            let mut penalised_targets = penalised_targets.into_iter();
2812
2813            let skipping_penalised_targets_iterator = std::iter::from_fn(move || {
2814                fast_targets.next().or_else(|| penalised_targets.next())
2815            });
2816
2817            Either::Right(skipping_penalised_targets_iterator)
2818        }
2819
2820        pub(super) fn report_request(&self, node: &Node, latency: Duration) {
2821            let node_avgs_guard = self.node_avgs.read().unwrap();
2822            if let Some(previous_node_avg) = node_avgs_guard.get(&node.host_id) {
2823                // The usual path, the node has been already noticed.
2824                let mut node_avg_guard = previous_node_avg.write().unwrap();
2825                let previous_node_avg = *node_avg_guard;
2826                *node_avg_guard =
2827                    TimestampedAverage::compute_next(previous_node_avg, latency, self.scale_secs);
2828            } else {
2829                // We drop the read lock not to deadlock while taking write lock.
2830                std::mem::drop(node_avgs_guard);
2831                let mut node_avgs_guard = self.node_avgs.write().unwrap();
2832
2833                // We have to read this again, as other threads may race with us.
2834                let previous_node_avg = node_avgs_guard
2835                    .get(&node.host_id)
2836                    .and_then(|rwlock| *rwlock.read().unwrap());
2837
2838                // We most probably need to add the node to the map.
2839                // (this will be Some only in an unlikely case that another thread raced with us and won)
2840                node_avgs_guard.insert(
2841                    node.host_id,
2842                    RwLock::new(TimestampedAverage::compute_next(
2843                        previous_node_avg,
2844                        latency,
2845                        self.scale_secs,
2846                    )),
2847                );
2848            }
2849        }
2850
2851        pub(crate) fn reliable_latency_measure(error: &RequestAttemptError) -> bool {
2852            match error {
2853                // "fast" errors, i.e. ones that are returned quickly after the query begins
2854                RequestAttemptError::CqlRequestSerialization(_)
2855                | RequestAttemptError::BrokenConnectionError(_)
2856                | RequestAttemptError::UnableToAllocStreamId
2857                | RequestAttemptError::DbError(DbError::IsBootstrapping, _)
2858                | RequestAttemptError::DbError(DbError::Unavailable { .. }, _)
2859                | RequestAttemptError::DbError(DbError::Unprepared { .. }, _)
2860                | RequestAttemptError::DbError(DbError::Overloaded, _)
2861                | RequestAttemptError::DbError(DbError::RateLimitReached { .. }, _)
2862                | RequestAttemptError::SerializationError(_) => false,
2863
2864                // "slow" errors, i.e. ones that are returned after considerable time of query being run
2865                RequestAttemptError::DbError(_, _)
2866                | RequestAttemptError::CqlResultParseError(_)
2867                | RequestAttemptError::CqlErrorParseError(_)
2868                | RequestAttemptError::BodyExtensionsParseError(_)
2869                | RequestAttemptError::RepreparedIdChanged { .. }
2870                | RequestAttemptError::RepreparedIdMissingInBatch
2871                | RequestAttemptError::UnexpectedResponse(_)
2872                | RequestAttemptError::NonfinishedPagingState => true,
2873            }
2874        }
2875    }
2876
2877    impl Default for LatencyAwareness {
2878        fn default() -> Self {
2879            Self::builder().build()
2880        }
2881    }
2882
2883    /// Updates minimum average latency upon request each request to `tick()`.
2884    /// The said average is a crucial criterium for penalising "too slow" nodes.
2885    struct MinAvgUpdater {
2886        node_avgs: Arc<RwLock<HashMap<Uuid, RwLock<Option<TimestampedAverage>>>>>,
2887        min_latency: Arc<AtomicDuration>,
2888        minimum_measurements: usize,
2889    }
2890
2891    impl MinAvgUpdater {
2892        async fn tick(&self) {
2893            let averages: &HashMap<Uuid, RwLock<Option<TimestampedAverage>>> =
2894                &self.node_avgs.read().unwrap();
2895            if averages.is_empty() {
2896                return; // No nodes queries registered to LAP performed yet.
2897            }
2898
2899            let min_avg = averages
2900                .values()
2901                .filter_map(|avg| {
2902                    avg.read().unwrap().and_then(|timestamped_average| {
2903                        (timestamped_average.num_measures >= self.minimum_measurements)
2904                            .then_some(timestamped_average.average)
2905                    })
2906                })
2907                .min();
2908            if let Some(min_avg) = min_avg {
2909                self.min_latency.store(min_avg);
2910                trace!(
2911                    "Latency awareness: updated min average latency to {} ms",
2912                    min_avg.as_secs_f64() * 1000.
2913                );
2914            }
2915        }
2916    }
2917
2918    /// The builder of LatencyAwareness module of DefaultPolicy.
2919    ///
2920    /// (For more information about latency awareness, see [DefaultPolicyBuilder::latency_awareness()](super::DefaultPolicyBuilder::latency_awareness)).
2921    /// It is intended to be created and configured by the user and then
2922    /// passed to DefaultPolicyBuilder, like this:
2923    ///
2924    /// # Example
2925    /// ```
2926    /// # fn example() {
2927    /// use scylla::policies::load_balancing::{
2928    ///     LatencyAwarenessBuilder, DefaultPolicy
2929    /// };
2930    ///
2931    /// let latency_awareness_builder = LatencyAwarenessBuilder::new()
2932    ///     .exclusion_threshold(3.)
2933    ///     .minimum_measurements(200);
2934    ///
2935    /// let policy = DefaultPolicy::builder()
2936    ///     .latency_awareness(latency_awareness_builder)
2937    ///     .build();
2938    /// # }
2939    #[derive(Debug, Clone)]
2940    pub struct LatencyAwarenessBuilder {
2941        exclusion_threshold: f64,
2942        retry_period: Duration,
2943        update_rate: Duration,
2944        minimum_measurements: usize,
2945        scale: Duration,
2946    }
2947
2948    impl LatencyAwarenessBuilder {
2949        /// Creates a builder of LatencyAwareness module of DefaultPolicy.
2950        pub fn new() -> Self {
2951            Self {
2952                exclusion_threshold: 2_f64,
2953                retry_period: Duration::from_secs(10),
2954                update_rate: Duration::from_millis(100),
2955                minimum_measurements: 50,
2956                scale: Duration::from_millis(100),
2957            }
2958        }
2959
2960        /// Sets minimum measurements for latency awareness (if there have been fewer measurements taken for a node,
2961        /// the node will not be penalised).
2962        ///
2963        /// Penalising nodes is based on an average of their recently measured average latency.
2964        /// This average is only meaningful if a minimum of measurements have been collected.
2965        /// This is what this option controls. If fewer than [minimum_measurements](Self::minimum_measurements)
2966        /// data points have been collected for a given host, the policy will never penalise that host.
2967        /// Note that the number of collected measurements for a given host is reset if the node
2968        /// is restarted.
2969        /// The default for this option is **50**.
2970        pub fn minimum_measurements(self, minimum_measurements: usize) -> Self {
2971            Self {
2972                minimum_measurements,
2973                ..self
2974            }
2975        }
2976
2977        /// Sets retry period for latency awareness (max time that a node is being penalised).
2978        ///
2979        /// The retry period defines how long a node may be penalised by the policy before it is given
2980        /// a 2nd chance. More precisely, a node is excluded from query plans if both his calculated
2981        /// average latency is [exclusion_threshold](Self::exclusion_threshold) times slower than
2982        /// the fastest node average latency (at the time the query plan is computed) **and** his
2983        /// calculated average latency has been updated since less than [retry_period](Self::retry_period).
2984        /// Since penalised nodes will likely not see their latency updated, this is basically how long
2985        /// the policy will exclude a node.
2986        pub fn retry_period(self, retry_period: Duration) -> Self {
2987            Self {
2988                retry_period,
2989                ..self
2990            }
2991        }
2992
2993        /// Sets exclusion threshold for latency awareness (a threshold for a node to be penalised).
2994        ///
2995        /// The exclusion threshold controls how much worse the average latency of a node must be
2996        /// compared to the fastest performing node for it to be penalised by the policy.
2997        /// For example, if set to 2, the resulting policy excludes nodes that are more than twice
2998        /// slower than the fastest node.
2999        pub fn exclusion_threshold(self, exclusion_threshold: f64) -> Self {
3000            Self {
3001                exclusion_threshold,
3002                ..self
3003            }
3004        }
3005
3006        /// Sets update rate for latency awareness (how often is the global minimal average latency updated).
3007        ///
3008        /// The update rate defines how often the minimum average latency is recomputed. While the
3009        /// average latency score of each node is computed iteratively (updated each time a new latency
3010        /// is collected), the minimum score needs to be recomputed from scratch every time, which is
3011        /// slightly more costly. For this reason, the minimum is only re-calculated at the given fixed
3012        /// rate and cached between re-calculation.
3013        /// The default update rate if **100 milliseconds**, which should be appropriate for most
3014        /// applications. In particular, note that while we want to avoid to recompute the minimum for
3015        /// every query, that computation is not particularly intensive either and there is no reason to
3016        /// use a very slow rate (more than second is probably unnecessarily slow for instance).
3017        pub fn update_rate(self, update_rate: Duration) -> Self {
3018            Self {
3019                update_rate,
3020                ..self
3021            }
3022        }
3023
3024        /// Sets the scale to use for the resulting latency aware policy.
3025        ///
3026        /// The `scale` provides control on how the weight given to older latencies decreases
3027        /// over time. For a given host, if a new latency `l` is received at time `t`, and
3028        /// the previously calculated average is `prev` calculated at time `t'`, then the
3029        /// newly calculated average `avg` for that host is calculated thusly:
3030        ///
3031        /// ```text
3032        /// d = (t - t') / scale
3033        /// alpha = 1 - (ln(d+1) / d)
3034        /// avg = alpha * l + (1 - alpha) * prev
3035        /// ```
3036        ///
3037        /// Typically, with a `scale` of 100 milliseconds (the default), if a new latency is
3038        /// measured and the previous measure is 10 millisecond old (so `d=0.1`), then `alpha`
3039        /// will be around `0.05`. In other words, the new latency will weight 5% of the
3040        /// updated average. A bigger scale will get less weight to new measurements (compared to
3041        /// previous ones), a smaller one will give them more weight.
3042        ///
3043        /// The default scale (if this method is not used) is of **100 milliseconds**. If unsure,
3044        /// try this default scale first and experiment only if it doesn't provide acceptable results
3045        /// (hosts are excluded too quickly or not fast enough and tuning the exclusion threshold doesn't
3046        /// help).
3047        pub fn scale(self, scale: Duration) -> Self {
3048            Self { scale, ..self }
3049        }
3050
3051        pub(super) fn build(self) -> LatencyAwareness {
3052            let Self {
3053                exclusion_threshold,
3054                retry_period,
3055                update_rate,
3056                minimum_measurements,
3057                scale,
3058            } = self;
3059            LatencyAwareness::new(
3060                exclusion_threshold,
3061                retry_period,
3062                update_rate,
3063                minimum_measurements,
3064                scale,
3065            )
3066        }
3067
3068        #[cfg(test)]
3069        fn build_for_test(self) -> (LatencyAwareness, MinAvgUpdater) {
3070            let Self {
3071                exclusion_threshold,
3072                retry_period,
3073                update_rate,
3074                minimum_measurements,
3075                scale,
3076            } = self;
3077            LatencyAwareness::new_for_test(
3078                exclusion_threshold,
3079                retry_period,
3080                update_rate,
3081                minimum_measurements,
3082                scale,
3083            )
3084        }
3085    }
3086
3087    impl Default for LatencyAwarenessBuilder {
3088        fn default() -> Self {
3089            Self::new()
3090        }
3091    }
3092
3093    pub(super) enum FastEnough {
3094        Yes,
3095        No { average: Duration },
3096    }
3097
3098    pub(super) fn fast_enough(
3099        average_latencies: &HashMap<Uuid, RwLock<Option<TimestampedAverage>>>,
3100        node: Uuid,
3101        exclusion_threshold: f64,
3102        retry_period: Duration,
3103        minimum_measurements: usize,
3104        min_avg: Duration,
3105    ) -> FastEnough {
3106        let avg = match average_latencies
3107            .get(&node)
3108            .and_then(|avgs| *avgs.read().unwrap())
3109        {
3110            Some(avg) => avg,
3111            None => return FastEnough::Yes,
3112        };
3113        if avg.num_measures >= minimum_measurements
3114            && avg.timestamp.elapsed() < retry_period
3115            && avg.average.as_micros() as f64 > exclusion_threshold * min_avg.as_micros() as f64
3116        {
3117            FastEnough::No {
3118                average: avg.average,
3119            }
3120        } else {
3121            FastEnough::Yes
3122        }
3123    }
3124
3125    #[cfg(test)]
3126    mod tests {
3127        use scylla_cql::Consistency;
3128
3129        use super::{
3130            super::tests::{framework::*, EMPTY_ROUTING_INFO},
3131            super::DefaultPolicy,
3132            *,
3133        };
3134
3135        use crate::{
3136            cluster::ClusterState,
3137            cluster::NodeAddr,
3138            policies::load_balancing::default::NodeLocationPreference,
3139            policies::load_balancing::{
3140                default::tests::test_default_policy_with_given_cluster_and_routing_info,
3141                RoutingInfo,
3142            },
3143            routing::locator::test::{id_to_invalid_addr, A, B, C, D, E, F, G},
3144            routing::locator::test::{TABLE_INVALID, TABLE_NTS_RF_2, TABLE_NTS_RF_3},
3145            routing::Shard,
3146            routing::Token,
3147            test_utils::setup_tracing,
3148        };
3149        use tokio::time::Instant;
3150
3151        trait DefaultPolicyTestExt {
3152            fn set_nodes_latency_stats(
3153                &self,
3154                cluster: &ClusterState,
3155                averages: &[(u16, Option<TimestampedAverage>)],
3156            );
3157        }
3158
3159        impl DefaultPolicyTestExt for DefaultPolicy {
3160            fn set_nodes_latency_stats(
3161                &self,
3162                cluster: &ClusterState,
3163                averages: &[(u16, Option<TimestampedAverage>)],
3164            ) {
3165                let addr_to_host_id: HashMap<NodeAddr, Uuid> = cluster
3166                    .known_peers
3167                    .values()
3168                    .map(|node| (node.address, node.host_id))
3169                    .collect();
3170
3171                for (id, average) in averages.iter().copied() {
3172                    let host_id = *addr_to_host_id.get(&id_to_invalid_addr(id)).unwrap();
3173                    let mut node_latencies = self
3174                        .latency_awareness
3175                        .as_ref()
3176                        .unwrap()
3177                        .node_avgs
3178                        .write()
3179                        .unwrap();
3180                    let mut node_latency =
3181                        node_latencies.entry(host_id).or_default().write().unwrap();
3182                    println!("Set latency: node {}, latency {:?}.", id, average);
3183                    *node_latency = average;
3184                }
3185                println!("Set node latency stats.")
3186            }
3187        }
3188
3189        fn default_policy_with_given_latency_awareness(
3190            latency_awareness: LatencyAwareness,
3191        ) -> DefaultPolicy {
3192            let pick_predicate = {
3193                let latency_predicate = latency_awareness.generate_predicate();
3194                Box::new(move |node: NodeRef<'_>, shard| {
3195                    DefaultPolicy::is_alive(node, shard) && latency_predicate(node)
3196                })
3197                    as Box<dyn Fn(NodeRef<'_>, Option<Shard>) -> bool + Send + Sync + 'static>
3198            };
3199
3200            DefaultPolicy {
3201                preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
3202                permit_dc_failover: true,
3203                is_token_aware: true,
3204                pick_predicate,
3205                latency_awareness: Some(latency_awareness),
3206                fixed_seed: None,
3207            }
3208        }
3209
3210        fn latency_aware_default_policy_customised(
3211            configurer: impl FnOnce(LatencyAwarenessBuilder) -> LatencyAwarenessBuilder,
3212        ) -> DefaultPolicy {
3213            let latency_awareness = configurer(LatencyAwareness::builder()).build();
3214            default_policy_with_given_latency_awareness(latency_awareness)
3215        }
3216
3217        fn latency_aware_default_policy() -> DefaultPolicy {
3218            latency_aware_default_policy_customised(|b| b)
3219        }
3220
3221        fn latency_aware_policy_with_explicit_updater_customised(
3222            configurer: impl FnOnce(LatencyAwarenessBuilder) -> LatencyAwarenessBuilder,
3223        ) -> (DefaultPolicy, MinAvgUpdater) {
3224            let (latency_awareness, updater) =
3225                configurer(LatencyAwareness::builder()).build_for_test();
3226            (
3227                default_policy_with_given_latency_awareness(latency_awareness),
3228                updater,
3229            )
3230        }
3231
3232        fn latency_aware_default_policy_with_explicit_updater() -> (DefaultPolicy, MinAvgUpdater) {
3233            latency_aware_policy_with_explicit_updater_customised(|b| b)
3234        }
3235
3236        #[tokio::test]
3237        async fn latency_aware_default_policy_does_not_penalise_if_no_latency_info_available_yet() {
3238            setup_tracing();
3239            let policy = latency_aware_default_policy();
3240            let cluster = tests::mock_cluster_state_for_token_unaware_tests().await;
3241
3242            let expected_groups = ExpectedGroupsBuilder::new()
3243                .group([1, 2, 3]) // pick + fallback local nodes
3244                .group([4, 5]) // fallback remote nodes
3245                .build();
3246
3247            test_default_policy_with_given_cluster_and_routing_info(
3248                &policy,
3249                &cluster,
3250                &EMPTY_ROUTING_INFO,
3251                &expected_groups,
3252            )
3253            .await;
3254        }
3255
3256        #[tokio::test]
3257        async fn latency_aware_default_policy_does_not_penalise_if_not_enough_measurements() {
3258            setup_tracing();
3259            let policy = latency_aware_default_policy();
3260            let cluster = tests::mock_cluster_state_for_token_unaware_tests().await;
3261
3262            let min_avg = Duration::from_millis(10);
3263
3264            policy.set_nodes_latency_stats(
3265                &cluster,
3266                &[
3267                    (
3268                        1,
3269                        Some(TimestampedAverage {
3270                            timestamp: Instant::now(),
3271                            average: Duration::from_secs_f64(
3272                                policy
3273                                    .latency_awareness
3274                                    .as_ref()
3275                                    .unwrap()
3276                                    .exclusion_threshold
3277                                    * 1.5
3278                                    * min_avg.as_secs_f64(),
3279                            ),
3280                            num_measures: policy
3281                                .latency_awareness
3282                                .as_ref()
3283                                .unwrap()
3284                                .minimum_measurements
3285                                - 1,
3286                        }),
3287                    ),
3288                    (
3289                        3,
3290                        Some(TimestampedAverage {
3291                            timestamp: Instant::now(),
3292                            average: min_avg,
3293                            num_measures: policy
3294                                .latency_awareness
3295                                .as_ref()
3296                                .unwrap()
3297                                .minimum_measurements,
3298                        }),
3299                    ),
3300                ],
3301            );
3302
3303            let expected_groups = ExpectedGroupsBuilder::new()
3304                .group([1, 2, 3]) // pick + fallback local nodes
3305                .group([4, 5]) // fallback remote nodes
3306                .build();
3307
3308            test_default_policy_with_given_cluster_and_routing_info(
3309                &policy,
3310                &cluster,
3311                &EMPTY_ROUTING_INFO,
3312                &expected_groups,
3313            )
3314            .await;
3315        }
3316
3317        #[tokio::test]
3318        async fn latency_aware_default_policy_does_not_penalise_if_exclusion_threshold_not_crossed()
3319        {
3320            setup_tracing();
3321            let policy = latency_aware_default_policy();
3322            let cluster = tests::mock_cluster_state_for_token_unaware_tests().await;
3323
3324            let min_avg = Duration::from_millis(10);
3325
3326            policy.set_nodes_latency_stats(
3327                &cluster,
3328                &[
3329                    (
3330                        1,
3331                        Some(TimestampedAverage {
3332                            timestamp: Instant::now(),
3333                            average: Duration::from_secs_f64(
3334                                policy
3335                                    .latency_awareness
3336                                    .as_ref()
3337                                    .unwrap()
3338                                    .exclusion_threshold
3339                                    * 0.95
3340                                    * min_avg.as_secs_f64(),
3341                            ),
3342                            num_measures: policy
3343                                .latency_awareness
3344                                .as_ref()
3345                                .unwrap()
3346                                .minimum_measurements,
3347                        }),
3348                    ),
3349                    (
3350                        3,
3351                        Some(TimestampedAverage {
3352                            timestamp: Instant::now(),
3353                            average: min_avg,
3354                            num_measures: policy
3355                                .latency_awareness
3356                                .as_ref()
3357                                .unwrap()
3358                                .minimum_measurements,
3359                        }),
3360                    ),
3361                ],
3362            );
3363
3364            let expected_groups = ExpectedGroupsBuilder::new()
3365                .group([1, 2, 3]) // pick + fallback local nodes
3366                .group([4, 5]) // fallback remote nodes
3367                .build();
3368
3369            test_default_policy_with_given_cluster_and_routing_info(
3370                &policy,
3371                &cluster,
3372                &EMPTY_ROUTING_INFO,
3373                &expected_groups,
3374            )
3375            .await;
3376        }
3377
3378        #[tokio::test]
3379        async fn latency_aware_default_policy_does_not_penalise_if_retry_period_expired() {
3380            setup_tracing();
3381            let policy = latency_aware_default_policy_customised(|b| {
3382                b.retry_period(Duration::from_millis(10))
3383            });
3384
3385            let cluster = tests::mock_cluster_state_for_token_unaware_tests().await;
3386
3387            let min_avg = Duration::from_millis(10);
3388
3389            policy.set_nodes_latency_stats(
3390                &cluster,
3391                &[
3392                    (
3393                        1,
3394                        Some(TimestampedAverage {
3395                            timestamp: Instant::now(),
3396                            average: Duration::from_secs_f64(
3397                                policy
3398                                    .latency_awareness
3399                                    .as_ref()
3400                                    .unwrap()
3401                                    .exclusion_threshold
3402                                    * 1.5
3403                                    * min_avg.as_secs_f64(),
3404                            ),
3405                            num_measures: policy
3406                                .latency_awareness
3407                                .as_ref()
3408                                .unwrap()
3409                                .minimum_measurements,
3410                        }),
3411                    ),
3412                    (
3413                        3,
3414                        Some(TimestampedAverage {
3415                            timestamp: Instant::now(),
3416                            average: min_avg,
3417                            num_measures: policy
3418                                .latency_awareness
3419                                .as_ref()
3420                                .unwrap()
3421                                .minimum_measurements,
3422                        }),
3423                    ),
3424                ],
3425            );
3426
3427            tokio::time::sleep(2 * policy.latency_awareness.as_ref().unwrap().retry_period).await;
3428
3429            let expected_groups = ExpectedGroupsBuilder::new()
3430                .group([1, 2, 3]) // pick + fallback local nodes
3431                .group([4, 5]) // fallback remote nodes
3432                .build();
3433
3434            test_default_policy_with_given_cluster_and_routing_info(
3435                &policy,
3436                &cluster,
3437                &EMPTY_ROUTING_INFO,
3438                &expected_groups,
3439            )
3440            .await;
3441        }
3442
3443        #[tokio::test]
3444        async fn latency_aware_default_policy_penalises_if_conditions_met() {
3445            setup_tracing();
3446            let (policy, updater) = latency_aware_default_policy_with_explicit_updater();
3447            let cluster = tests::mock_cluster_state_for_token_unaware_tests().await;
3448
3449            let min_avg = Duration::from_millis(10);
3450
3451            policy.set_nodes_latency_stats(
3452                &cluster,
3453                &[
3454                    // 3 is fast enough to make 1 and 4 penalised.
3455                    (
3456                        1,
3457                        Some(TimestampedAverage {
3458                            timestamp: Instant::now(),
3459                            average: Duration::from_secs_f64(
3460                                policy
3461                                    .latency_awareness
3462                                    .as_ref()
3463                                    .unwrap()
3464                                    .exclusion_threshold
3465                                    * 1.05
3466                                    * min_avg.as_secs_f64(),
3467                            ),
3468                            num_measures: policy
3469                                .latency_awareness
3470                                .as_ref()
3471                                .unwrap()
3472                                .minimum_measurements,
3473                        }),
3474                    ),
3475                    (
3476                        3,
3477                        Some(TimestampedAverage {
3478                            timestamp: Instant::now(),
3479                            average: min_avg,
3480                            num_measures: policy
3481                                .latency_awareness
3482                                .as_ref()
3483                                .unwrap()
3484                                .minimum_measurements,
3485                        }),
3486                    ),
3487                    (
3488                        4,
3489                        Some(TimestampedAverage {
3490                            timestamp: Instant::now(),
3491                            average: Duration::from_secs_f64(
3492                                policy
3493                                    .latency_awareness
3494                                    .as_ref()
3495                                    .unwrap()
3496                                    .exclusion_threshold
3497                                    * 1.05
3498                                    * min_avg.as_secs_f64(),
3499                            ),
3500                            num_measures: policy
3501                                .latency_awareness
3502                                .as_ref()
3503                                .unwrap()
3504                                .minimum_measurements,
3505                        }),
3506                    ),
3507                ],
3508            );
3509
3510            // Await last min average updater.
3511            updater.tick().await;
3512
3513            let expected_groups = ExpectedGroupsBuilder::new()
3514                .group([2, 3]) // pick + fallback local nodes
3515                .group([5]) // fallback remote nodes
3516                .group([1]) // local node that was penalised due to high latency
3517                .group([4]) // remote node that was penalised due to high latency
3518                .build();
3519
3520            test_default_policy_with_given_cluster_and_routing_info(
3521                &policy,
3522                &cluster,
3523                &EMPTY_ROUTING_INFO,
3524                &expected_groups,
3525            )
3526            .await;
3527        }
3528
3529        #[tokio::test]
3530        async fn latency_aware_default_policy_stops_penalising_after_min_average_increases_enough_only_after_update_rate_elapses(
3531        ) {
3532            setup_tracing();
3533
3534            let (policy, updater) = latency_aware_default_policy_with_explicit_updater();
3535
3536            let cluster = tests::mock_cluster_state_for_token_unaware_tests().await;
3537
3538            let min_avg = Duration::from_millis(10);
3539
3540            policy.set_nodes_latency_stats(
3541                &cluster,
3542                &[
3543                    (
3544                        1,
3545                        Some(TimestampedAverage {
3546                            timestamp: Instant::now(),
3547                            average: Duration::from_secs_f64(
3548                                policy
3549                                    .latency_awareness
3550                                    .as_ref()
3551                                    .unwrap()
3552                                    .exclusion_threshold
3553                                    * 1.05
3554                                    * min_avg.as_secs_f64(),
3555                            ),
3556                            num_measures: policy
3557                                .latency_awareness
3558                                .as_ref()
3559                                .unwrap()
3560                                .minimum_measurements,
3561                        }),
3562                    ),
3563                    (
3564                        3,
3565                        Some(TimestampedAverage {
3566                            timestamp: Instant::now(),
3567                            average: min_avg,
3568                            num_measures: policy
3569                                .latency_awareness
3570                                .as_ref()
3571                                .unwrap()
3572                                .minimum_measurements,
3573                        }),
3574                    ),
3575                ],
3576            );
3577
3578            // Await last min average updater.
3579            updater.tick().await;
3580            {
3581                // min_avg is low enough to penalise node 1
3582                let expected_groups = ExpectedGroupsBuilder::new()
3583                    .group([2, 3]) // pick + fallback local nodes
3584                    .group([4, 5]) // fallback remote nodes
3585                    .group([1]) // local node that was penalised due to high latency
3586                    .build();
3587
3588                test_default_policy_with_given_cluster_and_routing_info(
3589                    &policy,
3590                    &cluster,
3591                    &EMPTY_ROUTING_INFO,
3592                    &expected_groups,
3593                )
3594                .await;
3595            }
3596
3597            // node 3 becomes as slow as node 1
3598            policy.set_nodes_latency_stats(
3599                &cluster,
3600                &[(
3601                    3,
3602                    Some(TimestampedAverage {
3603                        timestamp: Instant::now(),
3604                        average: Duration::from_secs_f64(
3605                            policy
3606                                .latency_awareness
3607                                .as_ref()
3608                                .unwrap()
3609                                .exclusion_threshold
3610                                * min_avg.as_secs_f64(),
3611                        ),
3612                        num_measures: policy
3613                            .latency_awareness
3614                            .as_ref()
3615                            .unwrap()
3616                            .minimum_measurements,
3617                    }),
3618                )],
3619            );
3620            {
3621                // min_avg has not yet been updated and so node 1 is still being penalised
3622                let expected_groups = ExpectedGroupsBuilder::new()
3623                    .group([2, 3]) // pick + fallback local nodes
3624                    .group([4, 5]) // fallback remote nodes
3625                    .group([1]) // local node that was penalised due to high latency
3626                    .build();
3627
3628                test_default_policy_with_given_cluster_and_routing_info(
3629                    &policy,
3630                    &cluster,
3631                    &EMPTY_ROUTING_INFO,
3632                    &expected_groups,
3633                )
3634                .await;
3635            }
3636
3637            updater.tick().await;
3638            {
3639                // min_avg has been updated and is already high enough to stop penalising node 1
3640                let expected_groups = ExpectedGroupsBuilder::new()
3641                    .group([1, 2, 3]) // pick + fallback local nodes
3642                    .group([4, 5]) // fallback remote nodes
3643                    .build();
3644
3645                test_default_policy_with_given_cluster_and_routing_info(
3646                    &policy,
3647                    &cluster,
3648                    &EMPTY_ROUTING_INFO,
3649                    &expected_groups,
3650                )
3651                .await;
3652            }
3653        }
3654
3655        #[tokio::test]
3656        async fn latency_aware_default_policy_is_correctly_token_aware() {
3657            setup_tracing();
3658
3659            struct Test<'a, 'b> {
3660                // If Some, then the provided value is set as a min_avg.
3661                // Else, the min_avg is updated based on values provided to set_latency_stats().
3662                preset_min_avg: Option<Duration>,
3663                latency_stats: &'b [(u16, Option<TimestampedAverage>)],
3664                routing_info: RoutingInfo<'a>,
3665                expected_groups: ExpectedGroups,
3666            }
3667
3668            let cluster = tests::mock_cluster_state_for_token_aware_tests().await;
3669            let latency_awareness_defaults =
3670                latency_aware_default_policy().latency_awareness.unwrap();
3671            let min_avg = Duration::from_millis(10);
3672
3673            let fast_leader = || {
3674                Some(TimestampedAverage {
3675                    timestamp: Instant::now(),
3676                    average: min_avg,
3677                    num_measures: latency_awareness_defaults.minimum_measurements,
3678                })
3679            };
3680
3681            let fast_enough = || {
3682                Some(TimestampedAverage {
3683                    timestamp: Instant::now(),
3684                    average: Duration::from_secs_f64(
3685                        latency_awareness_defaults.exclusion_threshold
3686                            * 0.95
3687                            * min_avg.as_secs_f64(),
3688                    ),
3689                    num_measures: latency_awareness_defaults.minimum_measurements,
3690                })
3691            };
3692
3693            let slow_penalised = || {
3694                Some(TimestampedAverage {
3695                    timestamp: Instant::now(),
3696                    average: Duration::from_secs_f64(
3697                        latency_awareness_defaults.exclusion_threshold
3698                            * 1.05
3699                            * min_avg.as_secs_f64(),
3700                    ),
3701                    num_measures: latency_awareness_defaults.minimum_measurements,
3702                })
3703            };
3704
3705            let too_few_measurements_slow = || {
3706                Some(TimestampedAverage {
3707                    timestamp: Instant::now(),
3708                    average: Duration::from_secs_f64(
3709                        latency_awareness_defaults.exclusion_threshold
3710                            * 1.05
3711                            * min_avg.as_secs_f64(),
3712                    ),
3713                    num_measures: latency_awareness_defaults.minimum_measurements - 1,
3714                })
3715            };
3716
3717            let too_few_measurements_fast_leader = || {
3718                Some(TimestampedAverage {
3719                    timestamp: Instant::now(),
3720                    average: min_avg,
3721                    num_measures: 1,
3722                })
3723            };
3724
3725            let tests = [
3726                Test {
3727                    // Latency-awareness penalisation fires up and moves C and D to the end.
3728                    preset_min_avg: None,
3729                    latency_stats: &[
3730                        (A, fast_leader()),
3731                        (C, slow_penalised()),
3732                        (D, slow_penalised()),
3733                        (E, too_few_measurements_slow()),
3734                    ],
3735                    routing_info: RoutingInfo {
3736                        token: Some(Token::new(160)),
3737                        table: Some(TABLE_NTS_RF_3),
3738                        consistency: Consistency::Quorum,
3739                        ..Default::default()
3740                    },
3741                    // going through the ring, we get order: F , A , C , D , G , B , E
3742                    //                                      us  eu  eu  us  eu  eu  us
3743                    //                                      r2  r1  r1  r1  r2  r1  r1
3744                    expected_groups: ExpectedGroupsBuilder::new()
3745                        .group([A, G]) // fast enough local replicas
3746                        .group([F, E]) // fast enough remote replicas
3747                        .group([B]) // fast enough local nodes
3748                        .group([C]) // penalised local replica
3749                        .group([D]) // penalised remote replica
3750                        .build(),
3751                },
3752                Test {
3753                    // Latency-awareness has old minimum average cached, so does not fire.
3754                    preset_min_avg: Some(100 * min_avg),
3755                    routing_info: RoutingInfo {
3756                        token: Some(Token::new(160)),
3757                        table: Some(TABLE_NTS_RF_3),
3758                        consistency: Consistency::Quorum,
3759                        ..Default::default()
3760                    },
3761                    latency_stats: &[
3762                        (A, fast_leader()),
3763                        (B, fast_enough()),
3764                        (C, slow_penalised()),
3765                        (D, slow_penalised()),
3766                    ],
3767                    // going through the ring, we get order: F , A , C , D , G , B , E
3768                    //                                      us  eu  eu  us  eu  eu  us
3769                    //                                      r2  r1  r1  r1  r2  r1  r1
3770                    expected_groups: ExpectedGroupsBuilder::new()
3771                        .group([A, C, G]) // fast enough local replicas
3772                        .group([F, D, E]) // fast enough remote replicas
3773                        .group([B]) // fast enough local nodes
3774                        .build(),
3775                },
3776                Test {
3777                    // Both A and B are slower than C, but only B has enough measurements collected.
3778                    preset_min_avg: None,
3779                    latency_stats: &[
3780                        (A, slow_penalised()), // not really penalised, because no fast leader here
3781                        (B, slow_penalised()), // ditto
3782                        (C, too_few_measurements_fast_leader()),
3783                    ],
3784                    routing_info: RoutingInfo {
3785                        token: Some(Token::new(160)),
3786                        table: Some(TABLE_NTS_RF_2),
3787                        consistency: Consistency::Quorum,
3788                        ..Default::default()
3789                    },
3790                    // going through the ring, we get order: F , A , C , D , G , B , E
3791                    //                                      us  eu  eu  us  eu  eu  us
3792                    //                                      r2  r1  r1  r1  r2  r1  r1
3793                    expected_groups: ExpectedGroupsBuilder::new()
3794                        .group([A, G]) // pick + fallback local replicas
3795                        .group([F, D]) // remote replicas
3796                        .group([C, B]) // local nodes
3797                        .group([E]) // remote nodes
3798                        .build(),
3799                },
3800                Test {
3801                    // No latency stats, so latency-awareness is a no-op.
3802                    preset_min_avg: None,
3803                    routing_info: RoutingInfo {
3804                        token: Some(Token::new(160)),
3805                        table: Some(TABLE_INVALID),
3806                        consistency: Consistency::Quorum,
3807                        ..Default::default()
3808                    },
3809                    latency_stats: &[],
3810                    expected_groups: ExpectedGroupsBuilder::new()
3811                        .group([A, B, C, G]) // local nodes
3812                        .group([D, E, F]) // remote nodes
3813                        .build(),
3814                },
3815            ];
3816
3817            for test in &tests {
3818                let (policy, updater) = latency_aware_default_policy_with_explicit_updater();
3819
3820                if let Some(preset_min_avg) = test.preset_min_avg {
3821                    policy.set_nodes_latency_stats(
3822                        &cluster,
3823                        &[(
3824                            1,
3825                            Some(TimestampedAverage {
3826                                timestamp: Instant::now(),
3827                                average: preset_min_avg,
3828                                num_measures: latency_awareness_defaults.minimum_measurements,
3829                            }),
3830                        )],
3831                    );
3832                    // Await last min average updater for update with a forged min_avg.
3833                    updater.tick().await;
3834                    policy.set_nodes_latency_stats(&cluster, &[(1, None)]);
3835                }
3836                policy.set_nodes_latency_stats(&cluster, test.latency_stats);
3837
3838                if test.preset_min_avg.is_none() {
3839                    // Await last min average updater for update with None min_avg.
3840                    updater.tick().await;
3841                }
3842
3843                test_default_policy_with_given_cluster_and_routing_info(
3844                    &policy,
3845                    &cluster,
3846                    &test.routing_info,
3847                    &test.expected_groups,
3848                )
3849                .await;
3850            }
3851        }
3852
3853        #[tokio::test(start_paused = true)]
3854        async fn timestamped_average_works_when_clock_stops() {
3855            setup_tracing();
3856            let avg = Some(TimestampedAverage {
3857                timestamp: Instant::now(),
3858                average: Duration::from_secs(123),
3859                num_measures: 1,
3860            });
3861            let new_avg = TimestampedAverage::compute_next(avg, Duration::from_secs(456), 10.0);
3862            assert_eq!(
3863                new_avg,
3864                Some(TimestampedAverage {
3865                    timestamp: Instant::now(),
3866                    average: Duration::from_secs(123),
3867                    num_measures: 2,
3868                }),
3869            );
3870        }
3871    }
3872}