scylla/policies/load_balancing/default.rs
1use self::latency_awareness::LatencyAwareness;
2pub use self::latency_awareness::LatencyAwarenessBuilder;
3
4use super::{FallbackPlan, LoadBalancingPolicy, NodeRef, RoutingInfo};
5use crate::cluster::ClusterState;
6use crate::{
7 cluster::metadata::Strategy,
8 cluster::node::Node,
9 errors::RequestAttemptError,
10 routing::locator::ReplicaSet,
11 routing::{Shard, Token},
12};
13use itertools::{Either, Itertools};
14use rand::{prelude::SliceRandom, rng, Rng};
15use rand_pcg::Pcg32;
16use scylla_cql::frame::response::result::TableSpec;
17use std::hash::{Hash, Hasher};
18use std::{fmt, sync::Arc, time::Duration};
19use tracing::{debug, warn};
20use uuid::Uuid;
21
22#[derive(Clone, Copy)]
23enum NodeLocationCriteria<'a> {
24 Any,
25 Datacenter(&'a str),
26 DatacenterAndRack(&'a str, &'a str),
27}
28
29impl<'a> NodeLocationCriteria<'a> {
30 fn datacenter(&self) -> Option<&'a str> {
31 match self {
32 Self::Any => None,
33 Self::Datacenter(dc) | Self::DatacenterAndRack(dc, _) => Some(dc),
34 }
35 }
36}
37
38#[derive(Debug, Clone)]
39enum NodeLocationPreference {
40 Any,
41 Datacenter(String),
42 DatacenterAndRack(String, String),
43}
44
45impl NodeLocationPreference {
46 fn datacenter(&self) -> Option<&str> {
47 match self {
48 Self::Any => None,
49 Self::Datacenter(dc) | Self::DatacenterAndRack(dc, _) => Some(dc),
50 }
51 }
52
53 #[allow(unused)]
54 fn rack(&self) -> Option<&str> {
55 match self {
56 Self::Any | Self::Datacenter(_) => None,
57 Self::DatacenterAndRack(_, rack) => Some(rack),
58 }
59 }
60}
61
62/// An ordering requirement for replicas.
63#[derive(Clone, Copy)]
64enum ReplicaOrder {
65 /// No requirement. Replicas can be returned in arbitrary order.
66 Arbitrary,
67
68 /// A requirement for the order to be deterministic, not only across statement executions
69 /// but also across drivers. This is used for LWT optimisation, to avoid Paxos conflicts.
70 Deterministic,
71}
72
73/// Statement kind, used to enable specific load balancing patterns for certain cases.
74///
75/// Currently, there is a distinguished case of LWT statements, which should always be routed
76/// to replicas in a deterministic order to avoid Paxos conflicts. Other statements
77/// are routed to random replicas to balance the load.
78#[derive(Clone, Copy)]
79enum StatementType {
80 /// The statement is a confirmed LWT. It's to be routed specifically.
81 Lwt,
82
83 /// The statement is not a confirmed LWT. It's to be routed in a default way.
84 NonLwt,
85}
86
87/// A result of `pick_replica`.
88enum PickedReplica<'a> {
89 /// A replica that could be computed cheaply.
90 Computed((NodeRef<'a>, Shard)),
91
92 /// A replica that could not be computed cheaply. `pick` should therefore return None
93 /// and `fallback` will then return that replica as the first in the iterator.
94 ToBeComputedInFallback,
95}
96
97/// The default load balancing policy.
98///
99/// It can be configured to be datacenter-aware, rack-aware and token-aware.
100/// Datacenter failover (sending query to a node from a remote datacenter)
101/// for queries with non local consistency mode is also supported.
102///
103/// Latency awareness is available, although **not recommended**:
104/// the penalisation mechanism it involves may interact badly with other
105/// mechanisms, such as LWT optimisation. Also, the very tactics of penalising
106/// nodes for recently measures latencies is believed to not be very stable
107/// and beneficial. The number of in-flight requests, for instance, seems
108/// to be a better metric showing how (over)loaded a target node/shard is.
109/// For now, however, we don't have an implementation of the
110/// in-flight-requests-aware policy.
111#[allow(clippy::type_complexity)]
112pub struct DefaultPolicy {
113 /// Preferences regarding node location. One of: rack and DC, DC, or no preference.
114 preferences: NodeLocationPreference,
115
116 /// Configures whether the policy takes token into consideration when creating plans.
117 /// If this is set to `true` AND token, keyspace and table are available,
118 /// then policy prefers replicas and puts them earlier in the query plan.
119 is_token_aware: bool,
120
121 /// Whether to permit remote nodes (those not located in the preferred DC) in plans.
122 /// If no preferred DC is set, this has no effect.
123 permit_dc_failover: bool,
124
125 /// A predicate that a target (node + shard) must satisfy in order to be picked.
126 /// This was introduced to make latency awareness cleaner.
127 /// - if latency awareness is disabled, then `pick_predicate` is just `Self::is_alive()`;
128 /// - if latency awareness is enabled, then it is `Self::is_alive() && latency_predicate()`,
129 /// which checks that the target is not penalised due to high latencies.
130 pick_predicate: Box<dyn Fn(NodeRef<'_>, Option<Shard>) -> bool + Send + Sync>,
131
132 /// Additional layer that penalises targets that are too slow compared to others
133 /// in terms of latency. It works in the following way:
134 /// - for `pick`, it uses `latency_predicate` to filter out penalised nodes,
135 /// so that a penalised node will never be `pick`ed;
136 /// - for `fallback`, it wraps the returned iterator, moving all penalised nodes
137 /// to the end, in a stable way.
138 ///
139 /// Penalisation is done based on collected and updated latencies.
140 latency_awareness: Option<LatencyAwareness>,
141
142 /// The policy chooses (in `pick`) and shuffles (in `fallback`) replicas and nodes
143 /// based on random number generator. For sake of deterministic testing,
144 /// a fixed seed can be used.
145 fixed_seed: Option<u64>,
146}
147
148impl fmt::Debug for DefaultPolicy {
149 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
150 f.debug_struct("DefaultPolicy")
151 .field("preferences", &self.preferences)
152 .field("is_token_aware", &self.is_token_aware)
153 .field("permit_dc_failover", &self.permit_dc_failover)
154 .field("latency_awareness", &self.latency_awareness)
155 .field("fixed_seed", &self.fixed_seed)
156 .finish_non_exhaustive()
157 }
158}
159
160impl LoadBalancingPolicy for DefaultPolicy {
161 fn pick<'a>(
162 &'a self,
163 query: &'a RoutingInfo,
164 cluster: &'a ClusterState,
165 ) -> Option<(NodeRef<'a>, Option<Shard>)> {
166 /* For prepared statements, token-aware logic is available, we know what are the replicas
167 * for the statement, so that we can pick one of them. */
168 let routing_info = self.routing_info(query, cluster);
169
170 if let Some(ref token_with_strategy) = routing_info.token_with_strategy {
171 if self.preferences.datacenter().is_some()
172 && !self.permit_dc_failover
173 && matches!(
174 token_with_strategy.strategy,
175 Strategy::SimpleStrategy { .. }
176 )
177 {
178 warn!("\
179Combining SimpleStrategy with preferred_datacenter set to Some and disabled datacenter failover may lead to empty query plans for some tokens.\
180It is better to give up using one of them: either operate in a keyspace with NetworkTopologyStrategy, which explicitly states\
181how many replicas there are in each datacenter (you probably want at least 1 to avoid empty plans while preferring that datacenter), \
182or refrain from preferring datacenters (which may ban all other datacenters, if datacenter failover happens to be not possible)."
183 );
184 }
185 }
186
187 /* LWT statements need to be routed differently: always to the same replica, to avoid Paxos contention. */
188 let statement_type = if query.is_confirmed_lwt {
189 StatementType::Lwt
190 } else {
191 StatementType::NonLwt
192 };
193
194 /* Token-aware logic - if routing info is available, we know what are the replicas
195 * for the statement. Try to pick one of them. */
196 if let (Some(ts), Some(table_spec)) = (&routing_info.token_with_strategy, query.table) {
197 if let NodeLocationPreference::DatacenterAndRack(dc, rack) = &self.preferences {
198 // Try to pick some alive local rack random replica.
199 let local_rack_picked = self.pick_replica(
200 ts,
201 NodeLocationCriteria::DatacenterAndRack(dc, rack),
202 |node, shard| (self.pick_predicate)(node, Some(shard)),
203 cluster,
204 statement_type,
205 table_spec,
206 );
207
208 if let Some(picked) = local_rack_picked {
209 return match picked {
210 PickedReplica::Computed((alive_local_rack_replica, shard)) => {
211 Some((alive_local_rack_replica, Some(shard)))
212 }
213 // Let call to fallback() compute the replica, because it requires allocation.
214 PickedReplica::ToBeComputedInFallback => None,
215 };
216 }
217 }
218
219 if let NodeLocationPreference::DatacenterAndRack(dc, _)
220 | NodeLocationPreference::Datacenter(dc) = &self.preferences
221 {
222 // Try to pick some alive local random replica.
223 let picked = self.pick_replica(
224 ts,
225 NodeLocationCriteria::Datacenter(dc),
226 |node, shard| (self.pick_predicate)(node, Some(shard)),
227 cluster,
228 statement_type,
229 table_spec,
230 );
231
232 if let Some(picked) = picked {
233 return match picked {
234 PickedReplica::Computed((alive_local_replica, shard)) => {
235 Some((alive_local_replica, Some(shard)))
236 }
237 // Let call to fallback() compute the replica, because it requires allocation.
238 PickedReplica::ToBeComputedInFallback => None,
239 };
240 }
241 }
242
243 // If preferred datacenter is not specified, or if datacenter failover is possible, loosen restriction about locality.
244 if self.preferences.datacenter().is_none() || self.is_datacenter_failover_possible() {
245 // Try to pick some alive random replica.
246 let picked = self.pick_replica(
247 ts,
248 NodeLocationCriteria::Any,
249 |node, shard| (self.pick_predicate)(node, Some(shard)),
250 cluster,
251 statement_type,
252 table_spec,
253 );
254 if let Some(picked) = picked {
255 return match picked {
256 PickedReplica::Computed((alive_remote_replica, shard)) => {
257 Some((alive_remote_replica, Some(shard)))
258 }
259 // Let call to fallback() compute the replica, because it requires allocation.
260 PickedReplica::ToBeComputedInFallback => None,
261 };
262 }
263 }
264 };
265
266 /* Token-unaware logic - if routing info is not available (e.g. for unprepared statements),
267 * or no replica was suitable for targeting it (e.g. disabled or down), try to choose
268 * a random node, not necessarily a replica. */
269
270 /* We start having not alive nodes filtered out. This is done by `pick_predicate`,
271 * which always contains `Self::is_alive()`. */
272
273 // Let's start with local nodes, i.e. those in the preferred datacenter.
274 // If there was no preferred datacenter specified, all nodes are treated as local.
275 let local_nodes = self.preferred_node_set(cluster);
276
277 if let NodeLocationPreference::DatacenterAndRack(dc, rack) = &self.preferences {
278 // Try to pick some alive random local rack node.
279 let rack_predicate = Self::make_rack_predicate(
280 |node| (self.pick_predicate)(node, None),
281 NodeLocationCriteria::DatacenterAndRack(dc, rack),
282 );
283 let local_rack_node_picked = self.pick_node(local_nodes, rack_predicate);
284
285 if let Some(alive_local_rack_node) = local_rack_node_picked {
286 return Some((alive_local_rack_node, None));
287 }
288 }
289
290 // Try to pick some alive random local node.
291 let local_node_picked =
292 self.pick_node(local_nodes, |node| (self.pick_predicate)(node, None));
293 if let Some(alive_local_node) = local_node_picked {
294 return Some((alive_local_node, None));
295 }
296
297 let all_nodes = cluster.replica_locator().unique_nodes_in_global_ring();
298 // If a datacenter failover is possible, loosen restriction about locality.
299 if self.is_datacenter_failover_possible() {
300 let maybe_remote_node_picked =
301 self.pick_node(all_nodes, |node| (self.pick_predicate)(node, None));
302 if let Some(alive_maybe_remote_node) = maybe_remote_node_picked {
303 return Some((alive_maybe_remote_node, None));
304 }
305 }
306
307 /* As we are here, we failed to pick any alive node. Now let's consider even down nodes. */
308
309 // Previous checks imply that every node we could have selected is down.
310 // Let's try to return a down node that wasn't disabled.
311 let maybe_down_local_node_picked = self.pick_node(local_nodes, |node| node.is_enabled());
312 if let Some(down_but_enabled_local_node) = maybe_down_local_node_picked {
313 return Some((down_but_enabled_local_node, None));
314 }
315
316 // If a datacenter failover is possible, loosen restriction about locality.
317 if self.is_datacenter_failover_possible() {
318 let maybe_down_maybe_remote_node_picked =
319 self.pick_node(all_nodes, |node| node.is_enabled());
320 if let Some(down_but_enabled_maybe_remote_node) = maybe_down_maybe_remote_node_picked {
321 return Some((down_but_enabled_maybe_remote_node, None));
322 }
323 }
324
325 // Every node is disabled. This could be due to a bad host filter - configuration error.
326 // It makes no sense to return disabled nodes (there are no open connections to them anyway),
327 // so let's return None. `fallback()` will return empty iterator, and so the whole plan
328 // will be empty.
329 None
330 }
331
332 fn fallback<'a>(
333 &'a self,
334 query: &'a RoutingInfo,
335 cluster: &'a ClusterState,
336 ) -> FallbackPlan<'a> {
337 /* For prepared statements, token-aware logic is available, we know what are the replicas
338 * for the statement, so that we can pick one of them. */
339 let routing_info = self.routing_info(query, cluster);
340
341 /* LWT statements need to be routed differently: always to the same replica, to avoid Paxos contention. */
342 let statement_type = if query.is_confirmed_lwt {
343 StatementType::Lwt
344 } else {
345 StatementType::NonLwt
346 };
347
348 /* Token-aware logic - if routing info is available, we know what are the replicas for the statement.
349 * Get a list of alive replicas:
350 * - shuffled list in case of non-LWTs,
351 * - deterministically ordered in case of LWTs. */
352 let maybe_replicas = if let (Some(ts), Some(table_spec)) =
353 (&routing_info.token_with_strategy, query.table)
354 {
355 // Iterator over alive local rack replicas (shuffled or deterministically ordered,
356 // depending on the statement being LWT or not).
357 let maybe_local_rack_replicas =
358 if let NodeLocationPreference::DatacenterAndRack(dc, rack) = &self.preferences {
359 let local_rack_replicas = self.maybe_shuffled_replicas(
360 ts,
361 NodeLocationCriteria::DatacenterAndRack(dc, rack),
362 |node, shard| Self::is_alive(node, Some(shard)),
363 cluster,
364 statement_type,
365 table_spec,
366 );
367 Either::Left(local_rack_replicas)
368 } else {
369 Either::Right(std::iter::empty())
370 };
371
372 // Iterator over alive local datacenter replicas (shuffled or deterministically ordered,
373 // depending on the statement being LWT or not).
374 let maybe_local_replicas = if let NodeLocationPreference::DatacenterAndRack(dc, _)
375 | NodeLocationPreference::Datacenter(dc) =
376 &self.preferences
377 {
378 let local_replicas = self.maybe_shuffled_replicas(
379 ts,
380 NodeLocationCriteria::Datacenter(dc),
381 |node, shard| Self::is_alive(node, Some(shard)),
382 cluster,
383 statement_type,
384 table_spec,
385 );
386 Either::Left(local_replicas)
387 } else {
388 Either::Right(std::iter::empty())
389 };
390
391 // If no datacenter is preferred, or datacenter failover is possible, loosen restriction about locality.
392 let maybe_remote_replicas = if self.preferences.datacenter().is_none()
393 || self.is_datacenter_failover_possible()
394 {
395 // Iterator over alive replicas (shuffled or deterministically ordered,
396 // depending on the statement being LWT or not).
397 let remote_replicas = self.maybe_shuffled_replicas(
398 ts,
399 NodeLocationCriteria::Any,
400 |node, shard| Self::is_alive(node, Some(shard)),
401 cluster,
402 statement_type,
403 table_spec,
404 );
405 Either::Left(remote_replicas)
406 } else {
407 Either::Right(std::iter::empty())
408 };
409
410 // Produce an iterator, prioritizing local replicas.
411 // If preferred datacenter is not specified, every replica is treated as a remote one.
412 Either::Left(
413 maybe_local_rack_replicas
414 .chain(maybe_local_replicas)
415 .chain(maybe_remote_replicas)
416 .map(|(node, shard)| (node, Some(shard))),
417 )
418 } else {
419 Either::Right(std::iter::empty::<(NodeRef<'a>, Option<Shard>)>())
420 };
421
422 /* Token-unaware logic - if routing info is not available (e.g. for unprepared statements),
423 * or no replica is suitable for targeting it (e.g. disabled or down), try targetting nodes
424 * that are not necessarily replicas. */
425
426 /* We start having not alive nodes filtered out. */
427
428 // All nodes in the local datacenter (if one is given).
429 let local_nodes = self.preferred_node_set(cluster);
430
431 let robinned_local_rack_nodes =
432 if let NodeLocationPreference::DatacenterAndRack(dc, rack) = &self.preferences {
433 let rack_predicate = Self::make_rack_predicate(
434 |node| Self::is_alive(node, None),
435 NodeLocationCriteria::DatacenterAndRack(dc, rack),
436 );
437 Either::Left(
438 self.round_robin_nodes(local_nodes, rack_predicate)
439 .map(|node| (node, None)),
440 )
441 } else {
442 Either::Right(std::iter::empty::<(NodeRef<'a>, Option<Shard>)>())
443 };
444
445 let robinned_local_nodes = self
446 .round_robin_nodes(local_nodes, |node| Self::is_alive(node, None))
447 .map(|node| (node, None));
448
449 // All nodes in the cluster.
450 let all_nodes = cluster.replica_locator().unique_nodes_in_global_ring();
451
452 // If a datacenter failover is possible, loosen restriction about locality.
453 let maybe_remote_nodes = if self.is_datacenter_failover_possible() {
454 let robinned_all_nodes =
455 self.round_robin_nodes(all_nodes, |node| Self::is_alive(node, None));
456
457 Either::Left(robinned_all_nodes.map(|node| (node, None)))
458 } else {
459 Either::Right(std::iter::empty::<(NodeRef<'a>, Option<Shard>)>())
460 };
461
462 // Even if we consider some enabled nodes to be down, we should try contacting them in the last resort.
463 let maybe_down_local_nodes = local_nodes
464 .iter()
465 .filter(|node| node.is_enabled())
466 .map(|node| (node, None));
467
468 // If a datacenter failover is possible, loosen restriction about locality.
469 let maybe_down_nodes = if self.is_datacenter_failover_possible() {
470 Either::Left(
471 all_nodes
472 .iter()
473 .filter(|node| node.is_enabled())
474 .map(|node| (node, None)),
475 )
476 } else {
477 Either::Right(std::iter::empty())
478 };
479
480 /// *Plan* should return unique elements. It is not however obvious what it means,
481 /// because some nodes in the plan may have shards and some may not.
482 ///
483 /// This helper structure defines equality of plan elements.
484 /// How the comparison works:
485 /// - If at least one of elements is shard-less, then compare just nodes.
486 /// - If both elements have shards, then compare both nodes and shards.
487 ///
488 /// Why is it implemented this way?
489 /// Driver should not attempt to send a request to the same destination twice.
490 /// If a plan element doesn't have shard specified, then a random shard will be
491 /// chosen by the driver. If the plan also contains the same node but with
492 /// a shard present, and we randomly choose the same shard for the shard-less element,
493 /// then we have duplication.
494 ///
495 /// Example: plan is `[(Node1, Some(1)), (Node1, None)]` - if the driver uses
496 /// the second element and randomly chooses shard 1, then we have duplication.
497 ///
498 /// On the other hand, if a plan has a duplicate node, but with different shards,
499 /// then we want to use both elements - so we can't just make the list unique by node,
500 /// and so this struct was created.
501 struct DefaultPolicyTargetComparator {
502 host_id: Uuid,
503 shard: Option<Shard>,
504 }
505
506 impl PartialEq for DefaultPolicyTargetComparator {
507 fn eq(&self, other: &Self) -> bool {
508 match (self.shard, other.shard) {
509 (_, None) | (None, _) => self.host_id.eq(&other.host_id),
510 (Some(shard_left), Some(shard_right)) => {
511 self.host_id.eq(&other.host_id) && shard_left.eq(&shard_right)
512 }
513 }
514 }
515 }
516
517 impl Eq for DefaultPolicyTargetComparator {}
518
519 impl Hash for DefaultPolicyTargetComparator {
520 fn hash<H: Hasher>(&self, state: &mut H) {
521 self.host_id.hash(state);
522 }
523 }
524
525 // Construct a fallback plan as a composition of:
526 // - local rack alive replicas,
527 // - local datacenter alive replicas (or all alive replicas is no DC is preferred),
528 // - remote alive replicas (if DC failover is enabled),
529 // - local rack alive nodes,
530 // - local datacenter alive nodes (or all alive nodes is no DC is preferred),
531 // - remote alive nodes (if DC failover is enabled),
532 // - local datacenter nodes,
533 // - remote nodes (if DC failover is enabled).
534 let plan = maybe_replicas
535 .chain(robinned_local_rack_nodes)
536 .chain(robinned_local_nodes)
537 .chain(maybe_remote_nodes)
538 .chain(maybe_down_local_nodes)
539 .chain(maybe_down_nodes)
540 .unique_by(|(node, shard)| DefaultPolicyTargetComparator {
541 host_id: node.host_id,
542 shard: *shard,
543 });
544
545 // If latency awareness is enabled, wrap the plan by applying latency penalisation:
546 // all penalised nodes are moved behind non-penalised nodes, in a stable fashion.
547 if let Some(latency_awareness) = self.latency_awareness.as_ref() {
548 Box::new(latency_awareness.wrap(plan))
549 } else {
550 Box::new(plan)
551 }
552 }
553
554 fn name(&self) -> String {
555 "DefaultPolicy".to_string()
556 }
557
558 fn on_request_success(
559 &self,
560 _routing_info: &RoutingInfo,
561 latency: Duration,
562 node: NodeRef<'_>,
563 ) {
564 if let Some(latency_awareness) = self.latency_awareness.as_ref() {
565 latency_awareness.report_request(node, latency);
566 }
567 }
568
569 fn on_request_failure(
570 &self,
571 _routing_info: &RoutingInfo,
572 latency: Duration,
573 node: NodeRef<'_>,
574 error: &RequestAttemptError,
575 ) {
576 if let Some(latency_awareness) = self.latency_awareness.as_ref() {
577 if LatencyAwareness::reliable_latency_measure(error) {
578 latency_awareness.report_request(node, latency);
579 }
580 }
581 }
582}
583
584impl DefaultPolicy {
585 /// Creates a builder used to customise configuration of a new DefaultPolicy.
586 pub fn builder() -> DefaultPolicyBuilder {
587 DefaultPolicyBuilder::new()
588 }
589
590 /// Returns the given routing info processed based on given cluster state.
591 fn routing_info<'a>(
592 &'a self,
593 query: &'a RoutingInfo,
594 cluster: &'a ClusterState,
595 ) -> ProcessedRoutingInfo<'a> {
596 let mut routing_info = ProcessedRoutingInfo::new(query, cluster);
597
598 if !self.is_token_aware {
599 routing_info.token_with_strategy = None;
600 }
601
602 routing_info
603 }
604
605 /// Returns all nodes in the local datacenter if one is given,
606 /// or else all nodes in the cluster.
607 fn preferred_node_set<'a>(&'a self, cluster: &'a ClusterState) -> &'a [Arc<Node>] {
608 if let Some(preferred_datacenter) = self.preferences.datacenter() {
609 if let Some(nodes) = cluster
610 .replica_locator()
611 .unique_nodes_in_datacenter_ring(preferred_datacenter)
612 {
613 nodes
614 } else {
615 tracing::warn!(
616 "Datacenter specified as the preferred one ({}) does not exist!",
617 preferred_datacenter
618 );
619 // We won't guess any DC, as it could lead to possible violation of dc failover ban.
620 &[]
621 }
622 } else {
623 cluster.replica_locator().unique_nodes_in_global_ring()
624 }
625 }
626
627 /// Returns a full replica set for given datacenter (if given, else for all DCs),
628 /// cluster state and table spec.
629 fn nonfiltered_replica_set<'a>(
630 &'a self,
631 ts: &TokenWithStrategy<'a>,
632 replica_location: NodeLocationCriteria<'a>,
633 cluster: &'a ClusterState,
634 table_spec: &TableSpec,
635 ) -> ReplicaSet<'a> {
636 let datacenter = replica_location.datacenter();
637
638 cluster
639 .replica_locator()
640 .replicas_for_token(ts.token, ts.strategy, datacenter, table_spec)
641 }
642
643 /// Wraps the provided predicate, adding the requirement for rack to match.
644 fn make_rack_predicate<'a>(
645 predicate: impl Fn(NodeRef<'a>) -> bool + 'a,
646 replica_location: NodeLocationCriteria<'a>,
647 ) -> impl Fn(NodeRef<'a>) -> bool {
648 move |node| match replica_location {
649 NodeLocationCriteria::Any | NodeLocationCriteria::Datacenter(_) => predicate(node),
650 NodeLocationCriteria::DatacenterAndRack(_, rack) => {
651 predicate(node) && node.rack.as_deref() == Some(rack)
652 }
653 }
654 }
655
656 /// Wraps the provided predicate, adding the requirement for rack to match.
657 fn make_sharded_rack_predicate<'a>(
658 predicate: impl Fn(NodeRef<'a>, Shard) -> bool + 'a,
659 replica_location: NodeLocationCriteria<'a>,
660 ) -> impl Fn(NodeRef<'a>, Shard) -> bool {
661 move |node, shard| match replica_location {
662 NodeLocationCriteria::Any | NodeLocationCriteria::Datacenter(_) => {
663 predicate(node, shard)
664 }
665 NodeLocationCriteria::DatacenterAndRack(_, rack) => {
666 predicate(node, shard) && node.rack.as_deref() == Some(rack)
667 }
668 }
669 }
670
671 /// Returns iterator over replicas for given token and table spec, filtered
672 /// by provided location criteria and predicate.
673 /// Respects requested replica order, i.e. if requested, returns replicas ordered
674 /// deterministically (i.e. by token ring order or by tablet definition order),
675 /// else returns replicas in arbitrary order.
676 fn filtered_replicas<'a>(
677 &'a self,
678 ts: &TokenWithStrategy<'a>,
679 replica_location: NodeLocationCriteria<'a>,
680 predicate: impl Fn(NodeRef<'a>, Shard) -> bool + 'a,
681 cluster: &'a ClusterState,
682 order: ReplicaOrder,
683 table_spec: &TableSpec,
684 ) -> impl Iterator<Item = (NodeRef<'a>, Shard)> {
685 let predicate = Self::make_sharded_rack_predicate(predicate, replica_location);
686
687 let replica_iter = match order {
688 ReplicaOrder::Arbitrary => Either::Left(
689 self.nonfiltered_replica_set(ts, replica_location, cluster, table_spec)
690 .into_iter(),
691 ),
692 ReplicaOrder::Deterministic => Either::Right(
693 self.nonfiltered_replica_set(ts, replica_location, cluster, table_spec)
694 .into_replicas_ordered()
695 .into_iter(),
696 ),
697 };
698 replica_iter.filter(move |(node, shard): &(NodeRef<'a>, Shard)| predicate(node, *shard))
699 }
700
701 /// Picks a replica for given token and table spec which meets the provided location criteria
702 /// and the predicate.
703 /// The replica is chosen randomly over all candidates that meet the criteria
704 /// unless the query is LWT; if so, the first replica meeting the criteria is chosen
705 /// to avoid Paxos contention.
706 fn pick_replica<'a>(
707 &'a self,
708 ts: &TokenWithStrategy<'a>,
709 replica_location: NodeLocationCriteria<'a>,
710 predicate: impl Fn(NodeRef<'a>, Shard) -> bool + 'a,
711 cluster: &'a ClusterState,
712 statement_type: StatementType,
713 table_spec: &TableSpec,
714 ) -> Option<PickedReplica<'a>> {
715 match statement_type {
716 StatementType::Lwt => {
717 self.pick_first_replica(ts, replica_location, predicate, cluster, table_spec)
718 }
719 StatementType::NonLwt => self
720 .pick_random_replica(ts, replica_location, predicate, cluster, table_spec)
721 .map(PickedReplica::Computed),
722 }
723 }
724
725 /// Picks the first (wrt the deterministic order imposed on the keyspace, see comment below)
726 /// replica for given token and table spec which meets the provided location criteria
727 /// and the predicate.
728 // This is to be used for LWT optimisation: in order to reduce contention
729 // caused by Paxos conflicts, we always try to query replicas in the same,
730 // deterministic order:
731 // - ring order for token ring keyspaces,
732 // - tablet definition order for tablet keyspaces.
733 //
734 // If preferred rack and DC are set, then the first (encountered on the ring) replica
735 // that resides in that rack in that DC **and** satisfies the `predicate` is returned.
736 //
737 // If preferred DC is set, then the first (encountered on the ring) replica
738 // that resides in that DC **and** satisfies the `predicate` is returned.
739 //
740 // If no DC/rack preferences are set, then the only possible replica to be returned
741 // (due to expensive computation of the others, and we avoid expensive computation in `pick()`)
742 // is the primary replica. If it exists, Some is returned, with either Computed(primary_replica)
743 // **iff** it satisfies the predicate or ToBeComputedInFallback otherwise.
744 fn pick_first_replica<'a>(
745 &'a self,
746 ts: &TokenWithStrategy<'a>,
747 replica_location: NodeLocationCriteria<'a>,
748 predicate: impl Fn(NodeRef<'a>, Shard) -> bool + 'a,
749 cluster: &'a ClusterState,
750 table_spec: &TableSpec,
751 ) -> Option<PickedReplica<'a>> {
752 match replica_location {
753 NodeLocationCriteria::Any => {
754 // ReplicaSet returned by ReplicaLocator for this case:
755 // 1) can be precomputed and later used cheaply,
756 // 2) returns replicas in the **non-ring order** (this because ReplicaSet chains
757 // ring-ordered replicas sequences from different DCs, thus not preserving
758 // the global ring order).
759 // Because of 2), we can't use a precomputed ReplicaSet, but instead we need ReplicasOrdered.
760 // As ReplicasOrdered can compute cheaply only the primary global replica
761 // (computation of the remaining ones is expensive), in case that the primary replica
762 // does not satisfy the `predicate`, ToBeComputedInFallback is returned.
763 // All expensive computation is to be done only when `fallback()` is called.
764 self.nonfiltered_replica_set(ts, replica_location, cluster, table_spec)
765 .into_replicas_ordered()
766 .into_iter()
767 .next()
768 .map(|(primary_replica, shard)| {
769 if predicate(primary_replica, shard) {
770 PickedReplica::Computed((primary_replica, shard))
771 } else {
772 PickedReplica::ToBeComputedInFallback
773 }
774 })
775 }
776 NodeLocationCriteria::Datacenter(_) | NodeLocationCriteria::DatacenterAndRack(_, _) => {
777 // ReplicaSet returned by ReplicaLocator for this case:
778 // 1) can be precomputed and later used cheaply,
779 // 2) returns replicas in the ring order (this is not true for the case
780 // when multiple DCs are allowed, because ReplicaSet chains replicas sequences
781 // from different DCs, thus not preserving the global ring order)
782 self.filtered_replicas(
783 ts,
784 replica_location,
785 predicate,
786 cluster,
787 ReplicaOrder::Deterministic,
788 table_spec,
789 )
790 .next()
791 .map(PickedReplica::Computed)
792 }
793 }
794 }
795
796 /// Picks a random replica for given token and table spec which meets the provided
797 /// location criteria and the predicate.
798 fn pick_random_replica<'a>(
799 &'a self,
800 ts: &TokenWithStrategy<'a>,
801 replica_location: NodeLocationCriteria<'a>,
802 predicate: impl Fn(NodeRef<'a>, Shard) -> bool + 'a,
803 cluster: &'a ClusterState,
804 table_spec: &TableSpec,
805 ) -> Option<(NodeRef<'a>, Shard)> {
806 let predicate = Self::make_sharded_rack_predicate(predicate, replica_location);
807
808 let replica_set = self.nonfiltered_replica_set(ts, replica_location, cluster, table_spec);
809
810 if let Some(fixed) = self.fixed_seed {
811 let mut gen = Pcg32::new(fixed, 0);
812 replica_set.choose_filtered(&mut gen, |(node, shard)| predicate(node, *shard))
813 } else {
814 replica_set.choose_filtered(&mut rng(), |(node, shard)| predicate(node, *shard))
815 }
816 }
817
818 /// Returns iterator over replicas for given token and table spec, filtered
819 /// by provided location criteria and predicate.
820 /// By default, the replicas are shuffled.
821 /// For LWTs, though, the replicas are instead returned in a deterministic order.
822 fn maybe_shuffled_replicas<'a>(
823 &'a self,
824 ts: &TokenWithStrategy<'a>,
825 replica_location: NodeLocationCriteria<'a>,
826 predicate: impl Fn(NodeRef<'a>, Shard) -> bool + 'a,
827 cluster: &'a ClusterState,
828 statement_type: StatementType,
829 table_spec: &TableSpec,
830 ) -> impl Iterator<Item = (NodeRef<'a>, Shard)> {
831 let order = match statement_type {
832 StatementType::Lwt => ReplicaOrder::Deterministic,
833 StatementType::NonLwt => ReplicaOrder::Arbitrary,
834 };
835
836 let replicas =
837 self.filtered_replicas(ts, replica_location, predicate, cluster, order, table_spec);
838
839 match statement_type {
840 // As an LWT optimisation: in order to reduce contention caused by Paxos conflicts,
841 // we always try to query replicas in the same order.
842 StatementType::Lwt => Either::Left(replicas),
843 StatementType::NonLwt => Either::Right(self.shuffle(replicas)),
844 }
845 }
846
847 /// Returns an iterator over the given slice of nodes, rotated by a random shift.
848 fn randomly_rotated_nodes(nodes: &[Arc<Node>]) -> impl Iterator<Item = NodeRef<'_>> {
849 // Create a randomly rotated slice view
850 let nodes_len = nodes.len();
851 if nodes_len > 0 {
852 let index = rng().random_range(0..nodes_len); // gen_range() panics when range is empty!
853 Either::Left(
854 nodes[index..]
855 .iter()
856 .chain(nodes[..index].iter())
857 .take(nodes.len()),
858 )
859 } else {
860 Either::Right(std::iter::empty())
861 }
862 }
863
864 /// Picks a random node from the slice of nodes. The node must satisfy the given predicate.
865 fn pick_node<'a>(
866 &'a self,
867 nodes: &'a [Arc<Node>],
868 predicate: impl Fn(NodeRef<'a>) -> bool,
869 ) -> Option<NodeRef<'a>> {
870 // Select the first node that matches the predicate
871 Self::randomly_rotated_nodes(nodes).find(|&node| predicate(node))
872 }
873
874 /// Returns an iterator over the given slice of nodes, rotated by a random shift
875 /// and filtered by given predicate.
876 fn round_robin_nodes<'a>(
877 &'a self,
878 nodes: &'a [Arc<Node>],
879 predicate: impl Fn(NodeRef<'a>) -> bool,
880 ) -> impl Iterator<Item = NodeRef<'a>> {
881 Self::randomly_rotated_nodes(nodes).filter(move |node| predicate(node))
882 }
883
884 /// Wraps a given iterator by shuffling its contents.
885 fn shuffle<'a>(
886 &self,
887 iter: impl Iterator<Item = (NodeRef<'a>, Shard)>,
888 ) -> impl Iterator<Item = (NodeRef<'a>, Shard)> {
889 let mut vec: Vec<(NodeRef<'_>, Shard)> = iter.collect();
890
891 if let Some(fixed) = self.fixed_seed {
892 let mut gen = Pcg32::new(fixed, 0);
893 vec.shuffle(&mut gen);
894 } else {
895 vec.shuffle(&mut rng());
896 }
897
898 vec.into_iter()
899 }
900
901 /// Returns true iff the node should be considered to be alive.
902 fn is_alive(node: NodeRef, _shard: Option<Shard>) -> bool {
903 // For now, we leave this as stub, until we have time to improve node events.
904 // node.is_enabled() && !node.is_down()
905 node.is_enabled()
906 }
907
908 /// Returns true iff the datacenter failover is permitted for the statement being executed.
909 fn is_datacenter_failover_possible(&self) -> bool {
910 self.preferences.datacenter().is_some() && self.permit_dc_failover
911 }
912}
913
914impl Default for DefaultPolicy {
915 fn default() -> Self {
916 Self {
917 preferences: NodeLocationPreference::Any,
918 is_token_aware: true,
919 permit_dc_failover: false,
920 pick_predicate: Box::new(Self::is_alive),
921 latency_awareness: None,
922 fixed_seed: None,
923 }
924 }
925}
926
927/// The intended way to instantiate the DefaultPolicy.
928///
929/// # Example
930/// ```
931/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
932/// use scylla::policies::load_balancing::DefaultPolicy;
933///
934/// let default_policy = DefaultPolicy::builder()
935/// .prefer_datacenter("dc1".to_string())
936/// .token_aware(true)
937/// .permit_dc_failover(true)
938/// .build();
939/// # Ok(())
940/// # }
941#[derive(Clone, Debug)]
942pub struct DefaultPolicyBuilder {
943 preferences: NodeLocationPreference,
944 is_token_aware: bool,
945 permit_dc_failover: bool,
946 latency_awareness: Option<LatencyAwarenessBuilder>,
947 enable_replica_shuffle: bool,
948}
949
950impl DefaultPolicyBuilder {
951 /// Creates a builder used to customise configuration of a new DefaultPolicy.
952 pub fn new() -> Self {
953 Self {
954 preferences: NodeLocationPreference::Any,
955 is_token_aware: true,
956 permit_dc_failover: false,
957 latency_awareness: None,
958 enable_replica_shuffle: true,
959 }
960 }
961
962 /// Builds a new DefaultPolicy with the previously set configuration.
963 pub fn build(self) -> Arc<dyn LoadBalancingPolicy> {
964 let latency_awareness = self.latency_awareness.map(|builder| builder.build());
965 let pick_predicate = if let Some(ref latency_awareness) = latency_awareness {
966 let latency_predicate = latency_awareness.generate_predicate();
967 Box::new(move |node: NodeRef<'_>, shard| {
968 DefaultPolicy::is_alive(node, shard) && latency_predicate(node)
969 })
970 as Box<dyn Fn(NodeRef<'_>, Option<Shard>) -> bool + Send + Sync + 'static>
971 } else {
972 Box::new(DefaultPolicy::is_alive)
973 };
974
975 Arc::new(DefaultPolicy {
976 preferences: self.preferences,
977 is_token_aware: self.is_token_aware,
978 permit_dc_failover: self.permit_dc_failover,
979 pick_predicate,
980 latency_awareness,
981 fixed_seed: (!self.enable_replica_shuffle).then(|| {
982 let seed = rand::random();
983 debug!("DefaultPolicy: setting fixed seed to {}", seed);
984 seed
985 }),
986 })
987 }
988
989 /// Sets the datacenter to be preferred by this policy.
990 ///
991 /// Allows the load balancing policy to prioritize nodes based on their location.
992 /// When a preferred datacenter is set, the policy will treat nodes in that
993 /// datacenter as "local" nodes, and nodes in other datacenters as "remote" nodes.
994 /// This affects the order in which nodes are returned by the policy when
995 /// selecting replicas for read or write operations. If no preferred datacenter
996 /// is specified, the policy will treat all nodes as local nodes.
997 ///
998 /// When datacenter failover is disabled (`permit_dc_failover` is set to false),
999 /// the default policy will only include local nodes in load balancing plans.
1000 /// Remote nodes will be excluded, even if they are alive and available
1001 /// to serve requests.
1002 pub fn prefer_datacenter(mut self, datacenter_name: String) -> Self {
1003 self.preferences = NodeLocationPreference::Datacenter(datacenter_name);
1004 self
1005 }
1006
1007 /// Sets the datacenter and rack to be preferred by this policy.
1008 ///
1009 /// Allows the load balancing policy to prioritize nodes based on their location
1010 /// as well as their availability zones in the preferred datacenter.
1011 /// When a preferred datacenter is set, the policy will treat nodes in that
1012 /// datacenter as "local" nodes, and nodes in other datacenters as "remote" nodes.
1013 /// This affects the order in which nodes are returned by the policy when
1014 /// selecting replicas for read or write operations. If no preferred datacenter
1015 /// is specified, the policy will treat all nodes as local nodes.
1016 ///
1017 /// When datacenter failover is disabled (`permit_dc_failover` is set to false),
1018 /// the default policy will only include local nodes in load balancing plans.
1019 /// Remote nodes will be excluded, even if they are alive and available
1020 /// to serve requests.
1021 ///
1022 /// When a preferred rack is set, the policy will first return replicas in the local rack
1023 /// in the preferred datacenter, and then the other replicas in the datacenter.
1024 pub fn prefer_datacenter_and_rack(
1025 mut self,
1026 datacenter_name: String,
1027 rack_name: String,
1028 ) -> Self {
1029 self.preferences = NodeLocationPreference::DatacenterAndRack(datacenter_name, rack_name);
1030 self
1031 }
1032
1033 /// Sets whether this policy is token-aware (balances load more consciously) or not.
1034 ///
1035 /// Token awareness refers to a mechanism by which the driver is aware
1036 /// of the token range assigned to each node in the cluster. Tokens
1037 /// are assigned to nodes to partition the data and distribute it
1038 /// across the cluster.
1039 ///
1040 /// When a user wants to read or write data, the driver can use token awareness
1041 /// to route the request to the correct node based on the token range of the data
1042 /// being accessed. This can help to minimize network traffic and improve
1043 /// performance by ensuring that the data is accessed locally as much as possible.
1044 ///
1045 /// In the case of `DefaultPolicy`, token awareness is enabled by default,
1046 /// meaning that the policy will prefer to return alive local replicas
1047 /// if the token is available. This means that if the client is requesting data
1048 /// that falls within the token range of a particular node, the policy will try
1049 /// to route the request to that node first, assuming it is alive and responsive.
1050 ///
1051 /// Token awareness can significantly improve the performance and scalability
1052 /// of applications built on Scylla. By using token awareness, users can ensure
1053 /// that data is accessed locally as much as possible, reducing network overhead
1054 /// and improving throughput.
1055 pub fn token_aware(mut self, is_token_aware: bool) -> Self {
1056 self.is_token_aware = is_token_aware;
1057 self
1058 }
1059
1060 /// Sets whether this policy permits datacenter failover, i.e. ever attempts
1061 /// to send requests to nodes from a non-preferred datacenter.
1062 ///
1063 /// In the event of a datacenter outage or network failure, the nodes
1064 /// in that datacenter may become unavailable, and clients may no longer
1065 /// be able to access data stored on those nodes. To address this,
1066 /// the `DefaultPolicy` supports datacenter failover, which allows routing
1067 /// requests to nodes in other datacenters if the local nodes are unavailable.
1068 ///
1069 /// Datacenter failover can be enabled in `DefaultPolicy` setting this flag.
1070 /// When it is set, the policy will prefer to return alive remote replicas
1071 /// if datacenter failover is permitted and possible due to consistency
1072 /// constraints.
1073 pub fn permit_dc_failover(mut self, permit: bool) -> Self {
1074 self.permit_dc_failover = permit;
1075 self
1076 }
1077
1078 /// Latency awareness is a mechanism that penalises nodes whose measured
1079 /// recent average latency classifies it as falling behind the others.
1080 ///
1081 /// Every `update_rate` the global minimum average latency is computed,
1082 /// and all nodes whose average latency is worse than `exclusion_threshold`
1083 /// times the global minimum average latency become penalised for
1084 /// `retry_period`. Penalisation involves putting those nodes at the very end
1085 /// of the query plan. As it is often not truly beneficial to prefer
1086 /// faster non-replica than replicas lagging behind the non-replicas,
1087 /// this mechanism may as well worsen latencies and/or throughput.
1088 ///
1089 /// ATTENTION: using latency awareness is NOT recommended, unless prior
1090 /// benchmarks prove its beneficial impact on the specific workload's
1091 /// performance. Use with caution.
1092 pub fn latency_awareness(mut self, latency_awareness_builder: LatencyAwarenessBuilder) -> Self {
1093 self.latency_awareness = Some(latency_awareness_builder);
1094 self
1095 }
1096
1097 /// Sets whether this policy should shuffle replicas when token-awareness
1098 /// is enabled. Shuffling can help distribute the load over replicas, but
1099 /// can reduce the effectiveness of caching on the database side (e.g.
1100 /// for reads).
1101 ///
1102 /// This option is enabled by default. If disabled, replicas will be chosen
1103 /// in some random order that is chosen when the load balancing policy
1104 /// is created and will not change over its lifetime.
1105 pub fn enable_shuffling_replicas(mut self, enable: bool) -> Self {
1106 self.enable_replica_shuffle = enable;
1107 self
1108 }
1109}
1110
1111impl Default for DefaultPolicyBuilder {
1112 fn default() -> Self {
1113 Self::new()
1114 }
1115}
1116
1117struct ProcessedRoutingInfo<'a> {
1118 token_with_strategy: Option<TokenWithStrategy<'a>>,
1119}
1120
1121impl<'a> ProcessedRoutingInfo<'a> {
1122 fn new(query: &'a RoutingInfo, cluster: &'a ClusterState) -> ProcessedRoutingInfo<'a> {
1123 Self {
1124 token_with_strategy: TokenWithStrategy::new(query, cluster),
1125 }
1126 }
1127}
1128
1129struct TokenWithStrategy<'a> {
1130 strategy: &'a Strategy,
1131 token: Token,
1132}
1133
1134impl<'a> TokenWithStrategy<'a> {
1135 fn new(query: &'a RoutingInfo, cluster: &'a ClusterState) -> Option<TokenWithStrategy<'a>> {
1136 let token = query.token?;
1137 let keyspace_name = query.table?.ks_name();
1138 let keyspace = cluster.get_keyspace(keyspace_name)?;
1139 let strategy = &keyspace.strategy;
1140 Some(TokenWithStrategy { strategy, token })
1141 }
1142}
1143
1144#[cfg(test)]
1145mod tests {
1146 use std::collections::HashMap;
1147
1148 use scylla_cql::{frame::types::SerialConsistency, Consistency};
1149 use tracing::info;
1150
1151 use self::framework::{
1152 get_plan_and_collect_node_identifiers, mock_cluster_state_for_token_unaware_tests,
1153 ExpectedGroups, ExpectedGroupsBuilder,
1154 };
1155 use crate::policies::host_filter::HostFilter;
1156 use crate::routing::locator::tablets::TabletsInfo;
1157 use crate::routing::locator::test::{
1158 id_to_invalid_addr, mock_metadata_for_token_aware_tests, TABLE_NTS_RF_2, TABLE_NTS_RF_3,
1159 TABLE_SS_RF_2,
1160 };
1161 use crate::{
1162 cluster::ClusterState,
1163 policies::load_balancing::{
1164 default::tests::framework::mock_cluster_state_for_token_aware_tests, Plan, RoutingInfo,
1165 },
1166 routing::Token,
1167 test_utils::setup_tracing,
1168 };
1169
1170 use super::{DefaultPolicy, NodeLocationPreference};
1171
1172 pub(crate) mod framework {
1173 use crate::routing::locator::test::{
1174 id_to_invalid_addr, mock_metadata_for_token_aware_tests,
1175 };
1176 use std::collections::{HashMap, HashSet};
1177
1178 use uuid::Uuid;
1179
1180 use crate::{
1181 cluster::{
1182 metadata::{Metadata, Peer},
1183 ClusterState,
1184 },
1185 policies::load_balancing::{LoadBalancingPolicy, Plan, RoutingInfo},
1186 routing::Token,
1187 test_utils::setup_tracing,
1188 };
1189
1190 use super::TabletsInfo;
1191
1192 #[derive(Debug)]
1193 enum ExpectedGroup {
1194 NonDeterministic(HashSet<u16>),
1195 Deterministic(HashSet<u16>),
1196 Ordered(Vec<u16>),
1197 }
1198
1199 impl ExpectedGroup {
1200 fn len(&self) -> usize {
1201 match self {
1202 Self::NonDeterministic(s) => s.len(),
1203 Self::Deterministic(s) => s.len(),
1204 Self::Ordered(v) => v.len(),
1205 }
1206 }
1207 }
1208
1209 pub(crate) struct ExpectedGroupsBuilder {
1210 groups: Vec<ExpectedGroup>,
1211 }
1212
1213 impl ExpectedGroupsBuilder {
1214 pub(crate) fn new() -> Self {
1215 Self { groups: Vec::new() }
1216 }
1217 /// Expects that the next group in the plan will have a set of nodes
1218 /// that is equal to the provided one. The groups are assumed to be
1219 /// non deterministic, i.e. the policy is expected to shuffle
1220 /// the nodes within that group.
1221 pub(crate) fn group(mut self, group: impl IntoIterator<Item = u16>) -> Self {
1222 self.groups
1223 .push(ExpectedGroup::NonDeterministic(group.into_iter().collect()));
1224 self
1225 }
1226 /// Expects that the next group in the plan will have a set of nodes
1227 /// that is equal to the provided one, but the order of nodes in
1228 /// that group must be stable over multiple plans.
1229 pub(crate) fn deterministic(mut self, group: impl IntoIterator<Item = u16>) -> Self {
1230 self.groups
1231 .push(ExpectedGroup::Deterministic(group.into_iter().collect()));
1232 self
1233 }
1234 /// Expects that the next group in the plan will have a sequence of nodes
1235 /// that is equal to the provided one, including order.
1236 pub(crate) fn ordered(mut self, group: impl IntoIterator<Item = u16>) -> Self {
1237 self.groups
1238 .push(ExpectedGroup::Ordered(group.into_iter().collect()));
1239 self
1240 }
1241 pub(crate) fn build(self) -> ExpectedGroups {
1242 ExpectedGroups {
1243 groups: self.groups,
1244 }
1245 }
1246 }
1247
1248 #[derive(Debug)]
1249 pub(crate) struct ExpectedGroups {
1250 groups: Vec<ExpectedGroup>,
1251 }
1252
1253 impl ExpectedGroups {
1254 pub(crate) fn assert_proper_grouping_in_plans(&self, gots: &[Vec<u16>]) {
1255 // For simplicity, assume that there is at least one plan
1256 // in `gots`
1257 assert!(!gots.is_empty());
1258
1259 // Each plan is assumed to have the same number of groups.
1260 // For group index `i`, the code below will go over all plans
1261 // and will collect their `i`-th groups and put them under
1262 // index `i` in `sets_of_groups`.
1263 let mut sets_of_groups: Vec<HashSet<Vec<u16>>> =
1264 vec![HashSet::new(); self.groups.len()];
1265
1266 for got in gots {
1267 // First, make sure that `got` has the right number of items,
1268 // equal to the sum of sizes of all expected groups
1269 let combined_groups_len: usize = self.groups.iter().map(|s| s.len()).sum();
1270 assert_eq!(
1271 got.len(),
1272 combined_groups_len,
1273 "Plan length different than expected. Got plan {:?}, expected groups {:?}",
1274 got,
1275 self.groups,
1276 );
1277
1278 // Now, split `got` into groups of expected sizes
1279 // and just `assert_eq` them
1280 let mut got = got.iter();
1281 for (group_id, expected) in self.groups.iter().enumerate() {
1282 // Collect the nodes that constitute the group
1283 // in the actual plan
1284 let got_group: Vec<_> = (&mut got).take(expected.len()).copied().collect();
1285
1286 match expected {
1287 ExpectedGroup::NonDeterministic(expected_set)
1288 | ExpectedGroup::Deterministic(expected_set) => {
1289 // Verify that the group has the same nodes as the
1290 // expected one
1291 let got_set: HashSet<_> = got_group.iter().copied().collect();
1292 assert_eq!(&got_set, expected_set, "Unordered group mismatch");
1293 }
1294 ExpectedGroup::Ordered(sequence) => {
1295 assert_eq!(&got_group, sequence, "Ordered group mismatch");
1296 }
1297 }
1298
1299 // Put the group into sets_of_groups
1300 sets_of_groups[group_id].insert(got_group);
1301 }
1302 }
1303
1304 // Verify that the groups are either deterministic
1305 // or non-deterministic
1306 for (sets, expected) in sets_of_groups.iter().zip(self.groups.iter()) {
1307 match expected {
1308 ExpectedGroup::NonDeterministic(s) => {
1309 // The group is supposed to have non-deterministic
1310 // ordering. If the group size is larger than one,
1311 // then expect there to be more than one group
1312 // in the set.
1313 if gots.len() > 1 && s.len() > 1 {
1314 assert!(
1315 sets.len() > 1,
1316 "Group {:?} is expected to be nondeterministic, but it appears to be deterministic",
1317 expected
1318 );
1319 }
1320 }
1321 ExpectedGroup::Deterministic(_) | ExpectedGroup::Ordered(_) => {
1322 // The group is supposed to be deterministic,
1323 // i.e. a given instance of the default policy
1324 // must always return the nodes within it using
1325 // the same order.
1326 // There will only be one, unique ordering shared
1327 // by all plans - check this
1328 assert_eq!(
1329 sets.len(),
1330 1,
1331 "Group {:?} is expected to be deterministic, but it appears to be nondeterministic",
1332 expected
1333 );
1334 }
1335 }
1336 }
1337 }
1338 }
1339
1340 #[test]
1341 fn test_assert_proper_grouping_in_plan_good() {
1342 setup_tracing();
1343 let got = vec![1u16, 2, 3, 4, 5];
1344 let expected_groups = ExpectedGroupsBuilder::new()
1345 .group([1])
1346 .group([3, 2, 4])
1347 .group([5])
1348 .build();
1349
1350 expected_groups.assert_proper_grouping_in_plans(&[got]);
1351 }
1352
1353 #[test]
1354 #[should_panic]
1355 fn test_assert_proper_grouping_in_plan_too_many_nodes_in_the_end() {
1356 setup_tracing();
1357 let got = vec![1u16, 2, 3, 4, 5, 6];
1358 let expected_groups = ExpectedGroupsBuilder::new()
1359 .group([1])
1360 .group([3, 2, 4])
1361 .group([5])
1362 .build();
1363
1364 expected_groups.assert_proper_grouping_in_plans(&[got]);
1365 }
1366
1367 #[test]
1368 #[should_panic]
1369 fn test_assert_proper_grouping_in_plan_too_many_nodes_in_the_middle() {
1370 setup_tracing();
1371 let got = vec![1u16, 2, 6, 3, 4, 5];
1372 let expected_groups = ExpectedGroupsBuilder::new()
1373 .group([1])
1374 .group([3, 2, 4])
1375 .group([5])
1376 .build();
1377
1378 expected_groups.assert_proper_grouping_in_plans(&[got]);
1379 }
1380
1381 #[test]
1382 #[should_panic]
1383 fn test_assert_proper_grouping_in_plan_missing_node() {
1384 setup_tracing();
1385 let got = vec![1u16, 2, 3, 4];
1386 let expected_groups = ExpectedGroupsBuilder::new()
1387 .group([1])
1388 .group([3, 2, 4])
1389 .group([5])
1390 .build();
1391
1392 expected_groups.assert_proper_grouping_in_plans(&[got]);
1393 }
1394
1395 // based on locator mock cluster
1396 pub(crate) async fn mock_cluster_state_for_token_aware_tests() -> ClusterState {
1397 let metadata = mock_metadata_for_token_aware_tests();
1398 ClusterState::new(
1399 metadata,
1400 &Default::default(),
1401 &HashMap::new(),
1402 &None,
1403 None,
1404 TabletsInfo::new(),
1405 &HashMap::new(),
1406 #[cfg(feature = "metrics")]
1407 &Default::default(),
1408 )
1409 .await
1410 }
1411
1412 // creates ClusterState with info about 5 nodes living in 2 different datacenters
1413 // ring field is minimal, not intended to influence the tests
1414 pub(crate) async fn mock_cluster_state_for_token_unaware_tests() -> ClusterState {
1415 let peers = [("eu", 1), ("eu", 2), ("eu", 3), ("us", 4), ("us", 5)]
1416 .iter()
1417 .map(|(dc, id)| Peer {
1418 datacenter: Some(dc.to_string()),
1419 rack: None,
1420 address: id_to_invalid_addr(*id),
1421 tokens: vec![Token::new(*id as i64 * 100)],
1422 host_id: Uuid::new_v4(),
1423 })
1424 .collect::<Vec<_>>();
1425
1426 let info = Metadata {
1427 peers,
1428 keyspaces: HashMap::new(),
1429 };
1430
1431 ClusterState::new(
1432 info,
1433 &Default::default(),
1434 &HashMap::new(),
1435 &None,
1436 None,
1437 TabletsInfo::new(),
1438 &HashMap::new(),
1439 #[cfg(feature = "metrics")]
1440 &Default::default(),
1441 )
1442 .await
1443 }
1444
1445 pub(crate) fn get_plan_and_collect_node_identifiers(
1446 policy: &impl LoadBalancingPolicy,
1447 query_info: &RoutingInfo,
1448 cluster: &ClusterState,
1449 ) -> Vec<u16> {
1450 let plan = Plan::new(policy, query_info, cluster);
1451 plan.map(|(node, _shard)| node.address.port())
1452 .collect::<Vec<_>>()
1453 }
1454 }
1455
1456 pub(crate) const EMPTY_ROUTING_INFO: RoutingInfo = RoutingInfo {
1457 token: None,
1458 table: None,
1459 is_confirmed_lwt: false,
1460 consistency: Consistency::Quorum,
1461 serial_consistency: Some(SerialConsistency::Serial),
1462 };
1463
1464 pub(super) async fn test_default_policy_with_given_cluster_and_routing_info(
1465 policy: &DefaultPolicy,
1466 cluster: &ClusterState,
1467 routing_info: &RoutingInfo<'_>,
1468 expected_groups: &ExpectedGroups,
1469 ) {
1470 let mut plans = Vec::new();
1471 for _ in 0..256 {
1472 let plan = get_plan_and_collect_node_identifiers(policy, routing_info, cluster);
1473 plans.push(plan);
1474 }
1475 let example_plan = Plan::new(policy, routing_info, cluster);
1476 info!("Example plan from policy:",);
1477 for (node, shard) in example_plan {
1478 info!(
1479 "Node port: {}, shard: {}, dc: {:?}, rack: {:?}, down: {:?}",
1480 node.address.port(),
1481 shard,
1482 node.datacenter,
1483 node.rack,
1484 node.is_down()
1485 );
1486 }
1487
1488 expected_groups.assert_proper_grouping_in_plans(&plans);
1489 }
1490
1491 async fn test_given_default_policy_with_token_unaware_statements(
1492 policy: DefaultPolicy,
1493 expected_groups: &ExpectedGroups,
1494 ) {
1495 let cluster = mock_cluster_state_for_token_unaware_tests().await;
1496
1497 test_default_policy_with_given_cluster_and_routing_info(
1498 &policy,
1499 &cluster,
1500 &EMPTY_ROUTING_INFO,
1501 expected_groups,
1502 )
1503 .await;
1504 }
1505
1506 #[tokio::test]
1507 async fn test_default_policy_with_token_unaware_statements() {
1508 setup_tracing();
1509 let local_dc = "eu".to_string();
1510 let policy_with_disabled_dc_failover = DefaultPolicy {
1511 preferences: NodeLocationPreference::Datacenter(local_dc.clone()),
1512 permit_dc_failover: false,
1513 ..Default::default()
1514 };
1515 let expected_groups = ExpectedGroupsBuilder::new()
1516 .group([1, 2, 3]) // pick + fallback local nodes
1517 .build();
1518 test_given_default_policy_with_token_unaware_statements(
1519 policy_with_disabled_dc_failover,
1520 &expected_groups,
1521 )
1522 .await;
1523
1524 let policy_with_enabled_dc_failover = DefaultPolicy {
1525 preferences: NodeLocationPreference::Datacenter(local_dc.clone()),
1526 permit_dc_failover: true,
1527 ..Default::default()
1528 };
1529 let expected_groups = ExpectedGroupsBuilder::new()
1530 .group([1, 2, 3]) // pick + fallback local nodes
1531 .group([4, 5]) // fallback remote nodes
1532 .build();
1533 test_given_default_policy_with_token_unaware_statements(
1534 policy_with_enabled_dc_failover,
1535 &expected_groups,
1536 )
1537 .await;
1538 }
1539
1540 #[tokio::test]
1541 async fn test_default_policy_with_token_aware_statements() {
1542 setup_tracing();
1543
1544 use crate::routing::locator::test::{A, B, C, D, E, F, G};
1545 let cluster = mock_cluster_state_for_token_aware_tests().await;
1546
1547 #[derive(Debug)]
1548 struct Test<'a> {
1549 policy: DefaultPolicy,
1550 routing_info: RoutingInfo<'a>,
1551 expected_groups: ExpectedGroups,
1552 }
1553
1554 let tests = [
1555 // Keyspace NTS with RF=2 with enabled DC failover
1556 Test {
1557 policy: DefaultPolicy {
1558 preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
1559 is_token_aware: true,
1560 permit_dc_failover: true,
1561 ..Default::default()
1562 },
1563 routing_info: RoutingInfo {
1564 token: Some(Token::new(160)),
1565 table: Some(TABLE_NTS_RF_2),
1566 consistency: Consistency::Two,
1567 ..Default::default()
1568 },
1569 // going through the ring, we get order: F , A , C , D , G , B , E
1570 // us eu eu us eu eu us
1571 // r2 r1 r1 r1 r2 r1 r1
1572 expected_groups: ExpectedGroupsBuilder::new()
1573 .group([A, G]) // pick + fallback local replicas
1574 .group([F, D]) // remote replicas
1575 .group([C, B]) // local nodes
1576 .group([E]) // remote nodes
1577 .build(),
1578 },
1579 // Keyspace NTS with RF=2 with enabled DC failover, shuffling replicas disabled
1580 Test {
1581 policy: DefaultPolicy {
1582 preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
1583 is_token_aware: true,
1584 permit_dc_failover: true,
1585 fixed_seed: Some(123),
1586 ..Default::default()
1587 },
1588 routing_info: RoutingInfo {
1589 token: Some(Token::new(160)),
1590 table: Some(TABLE_NTS_RF_2),
1591 consistency: Consistency::Two,
1592 ..Default::default()
1593 },
1594 // going through the ring, we get order: F , A , C , D , G , B , E
1595 // us eu eu us eu eu us
1596 // r2 r1 r1 r1 r2 r1 r1
1597 expected_groups: ExpectedGroupsBuilder::new()
1598 .deterministic([A, G]) // pick + fallback local replicas
1599 .deterministic([F, D]) // remote replicas
1600 .group([C, B]) // local nodes
1601 .group([E]) // remote nodes
1602 .build(),
1603 },
1604 // Keyspace NTS with RF=2 with DC with local Consistency. DC failover should still work.
1605 Test {
1606 policy: DefaultPolicy {
1607 preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
1608 is_token_aware: true,
1609 permit_dc_failover: true,
1610 fixed_seed: Some(123),
1611 ..Default::default()
1612 },
1613 routing_info: RoutingInfo {
1614 token: Some(Token::new(160)),
1615 table: Some(TABLE_NTS_RF_2),
1616 consistency: Consistency::LocalOne,
1617 ..Default::default()
1618 },
1619 // going through the ring, we get order: F , A , C , D , G , B , E
1620 // us eu eu us eu eu us
1621 // r2 r1 r1 r1 r2 r1 r1
1622 expected_groups: ExpectedGroupsBuilder::new()
1623 .deterministic([A, G]) // pick + fallback local replicas
1624 .deterministic([F, D]) // remote replicas
1625 .group([C, B]) // local nodes
1626 .group([E]) // remote nodes
1627 .build(),
1628 },
1629 // Keyspace NTS with RF=2 with explicitly disabled DC failover
1630 Test {
1631 policy: DefaultPolicy {
1632 preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
1633 is_token_aware: true,
1634 permit_dc_failover: false,
1635 ..Default::default()
1636 },
1637 routing_info: RoutingInfo {
1638 token: Some(Token::new(160)),
1639 table: Some(TABLE_NTS_RF_2),
1640 consistency: Consistency::One,
1641 ..Default::default()
1642 },
1643 // going through the ring, we get order: F , A , C , D , G , B , E
1644 // us eu eu us eu eu us
1645 // r2 r1 r1 r1 r2 r1 r1
1646 expected_groups: ExpectedGroupsBuilder::new()
1647 .group([A, G]) // pick + fallback local replicas
1648 .group([C, B]) // local nodes
1649 .build(), // failover is explicitly forbidden
1650 },
1651 // Keyspace NTS with RF=3 with enabled DC failover
1652 Test {
1653 policy: DefaultPolicy {
1654 preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
1655 is_token_aware: true,
1656 permit_dc_failover: true,
1657 ..Default::default()
1658 },
1659 routing_info: RoutingInfo {
1660 token: Some(Token::new(160)),
1661 table: Some(TABLE_NTS_RF_3),
1662 consistency: Consistency::Quorum,
1663 ..Default::default()
1664 },
1665 // going through the ring, we get order: F , A , C , D , G , B , E
1666 // us eu eu us eu eu us
1667 // r2 r1 r1 r1 r2 r1 r1
1668 expected_groups: ExpectedGroupsBuilder::new()
1669 .group([A, C, G]) // pick + fallback local replicas
1670 .group([F, D, E]) // remote replicas
1671 .group([B]) // local nodes
1672 .group([]) // remote nodes
1673 .build(),
1674 },
1675 // Keyspace NTS with RF=3 with enabled DC failover, shuffling replicas disabled
1676 Test {
1677 policy: DefaultPolicy {
1678 preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
1679 is_token_aware: true,
1680 permit_dc_failover: true,
1681 fixed_seed: Some(123),
1682 ..Default::default()
1683 },
1684 routing_info: RoutingInfo {
1685 token: Some(Token::new(160)),
1686 table: Some(TABLE_NTS_RF_3),
1687 consistency: Consistency::Quorum,
1688 ..Default::default()
1689 },
1690 // going through the ring, we get order: F , A , C , D , G , B , E
1691 // us eu eu us eu eu us
1692 // r2 r1 r1 r1 r2 r1 r1
1693 expected_groups: ExpectedGroupsBuilder::new()
1694 .deterministic([A, C, G]) // pick + fallback local replicas
1695 .deterministic([F, D, E]) // remote replicas
1696 .group([B]) // local nodes
1697 .group([]) // remote nodes
1698 .build(),
1699 },
1700 // Keyspace NTS with RF=3 with disabled DC failover
1701 Test {
1702 policy: DefaultPolicy {
1703 preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
1704 is_token_aware: true,
1705 permit_dc_failover: false,
1706 ..Default::default()
1707 },
1708 routing_info: RoutingInfo {
1709 token: Some(Token::new(160)),
1710 table: Some(TABLE_NTS_RF_3),
1711 consistency: Consistency::Quorum,
1712 ..Default::default()
1713 },
1714 // going through the ring, we get order: F , A , C , D , G , B , E
1715 // us eu eu us eu eu us
1716 // r2 r1 r1 r1 r2 r1 r1
1717 expected_groups: ExpectedGroupsBuilder::new()
1718 .group([A, C, G]) // pick + fallback local replicas
1719 .group([B]) // local nodes
1720 .build(), // failover explicitly forbidden
1721 },
1722 // Keyspace SS with RF=2 with enabled DC failover
1723 Test {
1724 policy: DefaultPolicy {
1725 preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
1726 is_token_aware: true,
1727 permit_dc_failover: true,
1728 ..Default::default()
1729 },
1730 routing_info: RoutingInfo {
1731 token: Some(Token::new(160)),
1732 table: Some(TABLE_SS_RF_2),
1733 consistency: Consistency::Two,
1734 ..Default::default()
1735 },
1736 // going through the ring, we get order: F , A , C , D , G , B , E
1737 // us eu eu us eu eu us
1738 // r2 r1 r1 r1 r2 r1 r1
1739 expected_groups: ExpectedGroupsBuilder::new()
1740 .group([A]) // pick + fallback local replicas
1741 .group([F]) // remote replicas
1742 .group([C, G, B]) // local nodes
1743 .group([D, E]) // remote nodes
1744 .build(),
1745 },
1746 // Keyspace SS with RF=2 with DC failover and local Consistency. DC failover should still work.
1747 Test {
1748 policy: DefaultPolicy {
1749 preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
1750 is_token_aware: true,
1751 permit_dc_failover: true,
1752 ..Default::default()
1753 },
1754 routing_info: RoutingInfo {
1755 token: Some(Token::new(160)),
1756 table: Some(TABLE_SS_RF_2),
1757 consistency: Consistency::LocalOne,
1758 ..Default::default()
1759 },
1760 // going through the ring, we get order: F , A , C , D , G , B , E
1761 // us eu eu us eu eu us
1762 // r2 r1 r1 r1 r2 r1 r1
1763 // going through the ring, we get order: F , A , C , D , G , B , E
1764 // us eu eu us eu eu us
1765 // r2 r1 r1 r1 r2 r1 r1
1766 expected_groups: ExpectedGroupsBuilder::new()
1767 .group([A]) // pick + fallback local replicas
1768 .group([F]) // remote replicas
1769 .group([C, G, B]) // local nodes
1770 .group([D, E]) // remote nodes
1771 .build(),
1772 },
1773 // No token implies no token awareness
1774 Test {
1775 policy: DefaultPolicy {
1776 preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
1777 is_token_aware: true,
1778 permit_dc_failover: true,
1779 ..Default::default()
1780 },
1781 routing_info: RoutingInfo {
1782 token: None, // no token
1783 table: Some(TABLE_NTS_RF_3),
1784 consistency: Consistency::Quorum,
1785 ..Default::default()
1786 },
1787 expected_groups: ExpectedGroupsBuilder::new()
1788 .group([A, B, C, G]) // local nodes
1789 .group([D, E, F]) // remote nodes
1790 .build(),
1791 },
1792 // No keyspace implies no token awareness
1793 Test {
1794 policy: DefaultPolicy {
1795 preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
1796 is_token_aware: true,
1797 permit_dc_failover: true,
1798 ..Default::default()
1799 },
1800 routing_info: RoutingInfo {
1801 token: Some(Token::new(160)),
1802 table: None, // no keyspace
1803 consistency: Consistency::Quorum,
1804 ..Default::default()
1805 },
1806 expected_groups: ExpectedGroupsBuilder::new()
1807 .group([A, B, C, G]) // local nodes
1808 .group([D, E, F]) // remote nodes
1809 .build(),
1810 },
1811 // Unknown preferred DC, failover permitted
1812 Test {
1813 policy: DefaultPolicy {
1814 preferences: NodeLocationPreference::Datacenter("au".to_owned()),
1815 is_token_aware: true,
1816 permit_dc_failover: true,
1817 ..Default::default()
1818 },
1819 routing_info: RoutingInfo {
1820 token: Some(Token::new(160)),
1821 table: Some(TABLE_NTS_RF_2),
1822 consistency: Consistency::Quorum,
1823 ..Default::default()
1824 },
1825 // going through the ring, we get order: F , A , C , D , G , B , E
1826 // us eu eu us eu eu us
1827 // r2 r1 r1 r1 r2 r1 r1
1828 expected_groups: ExpectedGroupsBuilder::new()
1829 .group([A, D, F, G]) // remote replicas
1830 .group([B, C, E]) // remote nodes
1831 .build(),
1832 },
1833 // Unknown preferred DC, failover forbidden
1834 Test {
1835 policy: DefaultPolicy {
1836 preferences: NodeLocationPreference::Datacenter("au".to_owned()),
1837 is_token_aware: true,
1838 permit_dc_failover: false,
1839 ..Default::default()
1840 },
1841 routing_info: RoutingInfo {
1842 token: Some(Token::new(160)),
1843 table: Some(TABLE_NTS_RF_2),
1844 consistency: Consistency::Quorum,
1845 ..Default::default()
1846 },
1847 // going through the ring, we get order: F , A , C , D , G , B , E
1848 // us eu eu us eu eu us
1849 // r2 r1 r1 r1 r2 r1 r1
1850 expected_groups: ExpectedGroupsBuilder::new().build(), // empty plan, because all nodes are remote and failover is forbidden
1851 },
1852 // No preferred DC, failover permitted
1853 Test {
1854 policy: DefaultPolicy {
1855 preferences: NodeLocationPreference::Any,
1856 is_token_aware: true,
1857 permit_dc_failover: true,
1858 ..Default::default()
1859 },
1860 routing_info: RoutingInfo {
1861 token: Some(Token::new(160)),
1862 table: Some(TABLE_NTS_RF_2),
1863 consistency: Consistency::Quorum,
1864 ..Default::default()
1865 },
1866 // going through the ring, we get order: F , A , C , D , G , B , E
1867 // us eu eu us eu eu us
1868 // r2 r1 r1 r1 r2 r1 r1
1869 expected_groups: ExpectedGroupsBuilder::new()
1870 .group([A, D, F, G]) // remote replicas
1871 .group([B, C, E]) // remote nodes
1872 .build(),
1873 },
1874 // No preferred DC, failover forbidden
1875 Test {
1876 policy: DefaultPolicy {
1877 preferences: NodeLocationPreference::Any,
1878 is_token_aware: true,
1879 permit_dc_failover: false,
1880 ..Default::default()
1881 },
1882 routing_info: RoutingInfo {
1883 token: Some(Token::new(160)),
1884 table: Some(TABLE_NTS_RF_2),
1885 consistency: Consistency::Quorum,
1886 ..Default::default()
1887 },
1888 // going through the ring, we get order: F , A , C , D , G , B , E
1889 // us eu eu us eu eu us
1890 // r2 r1 r1 r1 r2 r1 r1
1891 expected_groups: ExpectedGroupsBuilder::new()
1892 .group([A, D, F, G]) // remote replicas
1893 .group([B, C, E]) // remote nodes
1894 .build(),
1895 },
1896 // Keyspace NTS with RF=3 with enabled DC failover and rack-awareness
1897 Test {
1898 policy: DefaultPolicy {
1899 preferences: NodeLocationPreference::DatacenterAndRack(
1900 "eu".to_owned(),
1901 "r1".to_owned(),
1902 ),
1903 is_token_aware: true,
1904 permit_dc_failover: true,
1905 ..Default::default()
1906 },
1907 routing_info: RoutingInfo {
1908 token: Some(Token::new(160)),
1909 table: Some(TABLE_NTS_RF_3),
1910 consistency: Consistency::One,
1911 ..Default::default()
1912 },
1913 // going through the ring, we get order: F , A , C , D , G , B , E
1914 // us eu eu us eu eu us
1915 // r2 r1 r1 r1 r2 r1 r1
1916 expected_groups: ExpectedGroupsBuilder::new()
1917 .group([A, C]) // pick local rack replicas
1918 .group([G]) // local DC replicas
1919 .group([F, D, E]) // remote replicas
1920 .group([B]) // local nodes
1921 .build(),
1922 },
1923 // Keyspace SS with RF=2 with enabled rack-awareness, shuffling replicas disabled
1924 Test {
1925 policy: DefaultPolicy {
1926 preferences: NodeLocationPreference::DatacenterAndRack(
1927 "eu".to_owned(),
1928 "r1".to_owned(),
1929 ),
1930 is_token_aware: true,
1931 permit_dc_failover: false,
1932 fixed_seed: Some(123),
1933 ..Default::default()
1934 },
1935 routing_info: RoutingInfo {
1936 token: Some(Token::new(560)),
1937 table: Some(TABLE_SS_RF_2),
1938 consistency: Consistency::Two,
1939 ..Default::default()
1940 },
1941 // going through the ring, we get order: B , C , E , G , A , F , D
1942 // eu eu us eu eu us us
1943 // r1 r1 r1 r2 r1 r2 r1
1944 expected_groups: ExpectedGroupsBuilder::new()
1945 .deterministic([B]) // pick local rack replicas
1946 .deterministic([C]) // fallback replicas
1947 .group([A]) // local rack nodes
1948 .group([G]) // local DC nodes
1949 .build(),
1950 },
1951 // Keyspace SS with RF=2 with enabled rack-awareness and no local-rack replica
1952 Test {
1953 policy: DefaultPolicy {
1954 preferences: NodeLocationPreference::DatacenterAndRack(
1955 "eu".to_owned(),
1956 "r2".to_owned(),
1957 ),
1958 is_token_aware: true,
1959 permit_dc_failover: false,
1960 ..Default::default()
1961 },
1962 routing_info: RoutingInfo {
1963 token: Some(Token::new(160)),
1964 table: Some(TABLE_SS_RF_2),
1965 consistency: Consistency::One,
1966 ..Default::default()
1967 },
1968 // going through the ring, we get order: F , A , C , D , G , B , E
1969 // us eu eu us eu eu us
1970 // r2 r1 r1 r1 r2 r1 r1
1971 expected_groups: ExpectedGroupsBuilder::new()
1972 .group([A]) // pick local DC
1973 .group([G]) // local rack nodes
1974 .group([C, B]) // local DC nodes
1975 .build(),
1976 },
1977 // Keyspace NTS with RF=3 with enabled DC failover and rack-awareness, no token provided
1978 Test {
1979 policy: DefaultPolicy {
1980 preferences: NodeLocationPreference::DatacenterAndRack(
1981 "eu".to_owned(),
1982 "r1".to_owned(),
1983 ),
1984 is_token_aware: true,
1985 permit_dc_failover: true,
1986 ..Default::default()
1987 },
1988 routing_info: RoutingInfo {
1989 token: None,
1990 table: Some(TABLE_NTS_RF_3),
1991 consistency: Consistency::One,
1992 ..Default::default()
1993 },
1994 // going through the ring, we get order: F , A , C , D , G , B , E
1995 // us eu eu us eu eu us
1996 // r2 r1 r1 r1 r2 r1 r1
1997 expected_groups: ExpectedGroupsBuilder::new()
1998 .group([A, C, B]) // local rack nodes
1999 .group([G]) // local DC nodes
2000 .group([F, D, E]) // remote nodes
2001 .build(),
2002 },
2003 ];
2004
2005 for test in tests {
2006 info!("Test: {:?}", test);
2007 let Test {
2008 policy,
2009 routing_info,
2010 expected_groups,
2011 } = test;
2012 test_default_policy_with_given_cluster_and_routing_info(
2013 &policy,
2014 &cluster,
2015 &routing_info,
2016 &expected_groups,
2017 )
2018 .await;
2019 }
2020 }
2021
2022 #[tokio::test]
2023 async fn test_default_policy_with_lwt_statements() {
2024 setup_tracing();
2025 use crate::routing::locator::test::{A, B, C, D, E, F, G};
2026
2027 let cluster = mock_cluster_state_for_token_aware_tests().await;
2028 struct Test<'a> {
2029 policy: DefaultPolicy,
2030 routing_info: RoutingInfo<'a>,
2031 expected_groups: ExpectedGroups,
2032 }
2033
2034 let tests = [
2035 // Keyspace NTS with RF=2 with enabled DC failover
2036 Test {
2037 policy: DefaultPolicy {
2038 preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
2039 is_token_aware: true,
2040 permit_dc_failover: true,
2041 ..Default::default()
2042 },
2043 routing_info: RoutingInfo {
2044 token: Some(Token::new(160)),
2045 table: Some(TABLE_NTS_RF_2),
2046 consistency: Consistency::Two,
2047 is_confirmed_lwt: true,
2048 ..Default::default()
2049 },
2050 // going through the ring, we get order: F , A , C , D , G , B , E
2051 // us eu eu us eu eu us
2052 // r2 r1 r1 r1 r2 r1 r1
2053 expected_groups: ExpectedGroupsBuilder::new()
2054 .ordered([A, G]) // pick + fallback local replicas
2055 .ordered([F, D]) // remote replicas
2056 .group([C, B]) // local nodes
2057 .group([E]) // remote nodes
2058 .build(),
2059 },
2060 // Keyspace NTS with RF=2 with enabled DC failover, shuffling replicas disabled
2061 Test {
2062 policy: DefaultPolicy {
2063 preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
2064 is_token_aware: true,
2065 permit_dc_failover: true,
2066 fixed_seed: Some(123),
2067 ..Default::default()
2068 },
2069 routing_info: RoutingInfo {
2070 token: Some(Token::new(160)),
2071 table: Some(TABLE_NTS_RF_2),
2072 consistency: Consistency::Two,
2073 is_confirmed_lwt: true,
2074 ..Default::default()
2075 },
2076 // going through the ring, we get order: F , A , C , D , G , B , E
2077 // us eu eu us eu eu us
2078 // r2 r1 r1 r1 r2 r1 r1
2079 expected_groups: ExpectedGroupsBuilder::new()
2080 .ordered([A, G]) // pick + fallback local replicas
2081 .ordered([F, D]) // remote replicas
2082 .group([C, B]) // local nodes
2083 .group([E]) // remote nodes
2084 .build(),
2085 },
2086 // Keyspace NTS with RF=2 with DC failover and local Consistency. DC failover should still work.
2087 Test {
2088 policy: DefaultPolicy {
2089 preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
2090 is_token_aware: true,
2091 permit_dc_failover: true,
2092 ..Default::default()
2093 },
2094 routing_info: RoutingInfo {
2095 token: Some(Token::new(160)),
2096 table: Some(TABLE_NTS_RF_2),
2097 consistency: Consistency::LocalOne,
2098 is_confirmed_lwt: true,
2099 ..Default::default()
2100 },
2101 // going through the ring, we get order: F , A , C , D , G , B , E
2102 // us eu eu us eu eu us
2103 // r2 r1 r1 r1 r2 r1 r1
2104 expected_groups: ExpectedGroupsBuilder::new()
2105 .ordered([A, G]) // pick + fallback local replicas
2106 .ordered([F, D]) // remote replicas
2107 .group([C, B]) // local nodes
2108 .group([E]) // remote nodes
2109 .build(),
2110 },
2111 // Keyspace NTS with RF=2 with explicitly disabled DC failover
2112 Test {
2113 policy: DefaultPolicy {
2114 preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
2115 is_token_aware: true,
2116 permit_dc_failover: false,
2117 ..Default::default()
2118 },
2119 routing_info: RoutingInfo {
2120 token: Some(Token::new(160)),
2121 table: Some(TABLE_NTS_RF_2),
2122 consistency: Consistency::One,
2123 is_confirmed_lwt: true,
2124 ..Default::default()
2125 },
2126 // going through the ring, we get order: F , A , C , D , G , B , E
2127 // us eu eu us eu eu us
2128 // r2 r1 r1 r1 r2 r1 r1
2129 expected_groups: ExpectedGroupsBuilder::new()
2130 .ordered([A, G]) // pick + fallback local replicas
2131 .group([C, B]) // local nodes
2132 .build(), // failover is explicitly forbidden
2133 },
2134 // Keyspace NTS with RF=3 with enabled DC failover
2135 Test {
2136 policy: DefaultPolicy {
2137 preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
2138 is_token_aware: true,
2139 permit_dc_failover: true,
2140 ..Default::default()
2141 },
2142 routing_info: RoutingInfo {
2143 token: Some(Token::new(160)),
2144 table: Some(TABLE_NTS_RF_3),
2145 consistency: Consistency::Quorum,
2146 is_confirmed_lwt: true,
2147 ..Default::default()
2148 },
2149 // going through the ring, we get order: F , A , C , D , G , B , E
2150 // us eu eu us eu eu us
2151 // r2 r1 r1 r1 r2 r1 r1
2152 expected_groups: ExpectedGroupsBuilder::new()
2153 .ordered([A, C, G]) // pick + fallback local replicas
2154 .ordered([F, D, E]) // remote replicas
2155 .group([B]) // local nodes
2156 .group([]) // remote nodes
2157 .build(),
2158 },
2159 // Keyspace NTS with RF=3 with enabled DC failover, shuffling replicas disabled
2160 Test {
2161 policy: DefaultPolicy {
2162 preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
2163 is_token_aware: true,
2164 permit_dc_failover: true,
2165 fixed_seed: Some(123),
2166 ..Default::default()
2167 },
2168 routing_info: RoutingInfo {
2169 token: Some(Token::new(160)),
2170 table: Some(TABLE_NTS_RF_3),
2171 consistency: Consistency::Quorum,
2172 is_confirmed_lwt: true,
2173 ..Default::default()
2174 },
2175 // going through the ring, we get order: F , A , C , D , G , B , E
2176 // us eu eu us eu eu us
2177 // r2 r1 r1 r1 r2 r1 r1
2178 expected_groups: ExpectedGroupsBuilder::new()
2179 .ordered([A, C, G]) // pick + fallback local replicas
2180 .ordered([F, D, E]) // remote replicas
2181 .group([B]) // local nodes
2182 .group([]) // remote nodes
2183 .build(),
2184 },
2185 // Keyspace NTS with RF=3 with disabled DC failover
2186 Test {
2187 policy: DefaultPolicy {
2188 preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
2189 is_token_aware: true,
2190 permit_dc_failover: false,
2191 ..Default::default()
2192 },
2193 routing_info: RoutingInfo {
2194 token: Some(Token::new(160)),
2195 table: Some(TABLE_NTS_RF_3),
2196 consistency: Consistency::Quorum,
2197 is_confirmed_lwt: true,
2198 ..Default::default()
2199 },
2200 // going through the ring, we get order: F , A , C , D , G , B , E
2201 // us eu eu us eu eu us
2202 // r2 r1 r1 r1 r2 r1 r1
2203 expected_groups: ExpectedGroupsBuilder::new()
2204 .ordered([A, C, G]) // pick + fallback local replicas
2205 .group([B]) // local nodes
2206 .build(), // failover explicitly forbidden
2207 },
2208 // Keyspace SS with RF=2 with enabled DC failover
2209 Test {
2210 policy: DefaultPolicy {
2211 preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
2212 is_token_aware: true,
2213 permit_dc_failover: true,
2214 ..Default::default()
2215 },
2216 routing_info: RoutingInfo {
2217 token: Some(Token::new(160)),
2218 table: Some(TABLE_SS_RF_2),
2219 consistency: Consistency::Two,
2220 is_confirmed_lwt: true,
2221 ..Default::default()
2222 },
2223 // going through the ring, we get order: F , A , C , D , G , B , E
2224 // us eu eu us eu eu us
2225 // r2 r1 r1 r1 r2 r1 r1
2226 expected_groups: ExpectedGroupsBuilder::new()
2227 .ordered([A]) // pick + fallback local replicas
2228 .ordered([F]) // remote replicas
2229 .group([C, G, B]) // local nodes
2230 .group([D, E]) // remote nodes
2231 .build(),
2232 },
2233 // Keyspace SS with RF=2 with DC failover and local Consistency. DC failover should still work.
2234 Test {
2235 policy: DefaultPolicy {
2236 preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
2237 is_token_aware: true,
2238 permit_dc_failover: true,
2239 ..Default::default()
2240 },
2241 routing_info: RoutingInfo {
2242 token: Some(Token::new(160)),
2243 table: Some(TABLE_SS_RF_2),
2244 consistency: Consistency::LocalOne,
2245 is_confirmed_lwt: true,
2246 ..Default::default()
2247 },
2248 // going through the ring, we get order: F , A , C , D , G , B , E
2249 // us eu eu us eu eu us
2250 // r2 r1 r1 r1 r2 r1 r1
2251 expected_groups: ExpectedGroupsBuilder::new()
2252 .ordered([A]) // pick + fallback local replicas
2253 .ordered([F]) // remote replicas
2254 .group([C, G, B]) // local nodes
2255 .group([D, E]) // remote nodes
2256 .build(),
2257 },
2258 // No token implies no token awareness
2259 Test {
2260 policy: DefaultPolicy {
2261 preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
2262 is_token_aware: true,
2263 permit_dc_failover: true,
2264 ..Default::default()
2265 },
2266 routing_info: RoutingInfo {
2267 token: None, // no token
2268 table: Some(TABLE_NTS_RF_3),
2269 consistency: Consistency::Quorum,
2270 is_confirmed_lwt: true,
2271 ..Default::default()
2272 },
2273 expected_groups: ExpectedGroupsBuilder::new()
2274 .group([A, B, C, G]) // local nodes
2275 .group([D, E, F]) // remote nodes
2276 .build(),
2277 },
2278 // No keyspace implies no token awareness
2279 Test {
2280 policy: DefaultPolicy {
2281 preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
2282 is_token_aware: true,
2283 permit_dc_failover: true,
2284 ..Default::default()
2285 },
2286 routing_info: RoutingInfo {
2287 token: Some(Token::new(160)),
2288 table: None, // no keyspace
2289 consistency: Consistency::Quorum,
2290 is_confirmed_lwt: true,
2291 ..Default::default()
2292 },
2293 expected_groups: ExpectedGroupsBuilder::new()
2294 .group([A, B, C, G]) // local nodes
2295 .group([D, E, F]) // remote nodes
2296 .build(),
2297 },
2298 // Unknown preferred DC, failover permitted
2299 Test {
2300 policy: DefaultPolicy {
2301 preferences: NodeLocationPreference::Datacenter("au".to_owned()),
2302 is_token_aware: true,
2303 permit_dc_failover: true,
2304 ..Default::default()
2305 },
2306 routing_info: RoutingInfo {
2307 token: Some(Token::new(160)),
2308 table: Some(TABLE_NTS_RF_2),
2309 consistency: Consistency::Quorum,
2310 is_confirmed_lwt: true,
2311 ..Default::default()
2312 },
2313 // going through the ring, we get order: F , A , C , D , G , B , E
2314 // us eu eu us eu eu us
2315 // r2 r1 r1 r1 r2 r1 r1
2316 expected_groups: ExpectedGroupsBuilder::new()
2317 .ordered([F, A, D, G]) // remote replicas
2318 .group([B, C, E]) // remote nodes
2319 .build(),
2320 },
2321 // Unknown preferred DC, failover forbidden
2322 Test {
2323 policy: DefaultPolicy {
2324 preferences: NodeLocationPreference::Datacenter("au".to_owned()),
2325 is_token_aware: true,
2326 permit_dc_failover: false,
2327 ..Default::default()
2328 },
2329 routing_info: RoutingInfo {
2330 token: Some(Token::new(160)),
2331 table: Some(TABLE_NTS_RF_2),
2332 consistency: Consistency::Quorum,
2333 is_confirmed_lwt: true,
2334 ..Default::default()
2335 },
2336 // going through the ring, we get order: F , A , C , D , G , B , E
2337 // us eu eu us eu eu us
2338 // r2 r1 r1 r1 r2 r1 r1
2339 expected_groups: ExpectedGroupsBuilder::new().build(), // empty plan, because all nodes are remote and failover is forbidden
2340 },
2341 // No preferred DC, failover permitted
2342 Test {
2343 policy: DefaultPolicy {
2344 preferences: NodeLocationPreference::Any,
2345 is_token_aware: true,
2346 permit_dc_failover: true,
2347 ..Default::default()
2348 },
2349 routing_info: RoutingInfo {
2350 token: Some(Token::new(160)),
2351 table: Some(TABLE_NTS_RF_2),
2352 consistency: Consistency::Quorum,
2353 is_confirmed_lwt: true,
2354 ..Default::default()
2355 },
2356 // going through the ring, we get order: F , A , C , D , G , B , E
2357 // us eu eu us eu eu us
2358 // r2 r1 r1 r1 r2 r1 r1
2359 expected_groups: ExpectedGroupsBuilder::new()
2360 .ordered([F, A, D, G]) // remote replicas
2361 .group([B, C, E]) // remote nodes
2362 .build(),
2363 },
2364 // No preferred DC, failover forbidden
2365 Test {
2366 policy: DefaultPolicy {
2367 preferences: NodeLocationPreference::Any,
2368 is_token_aware: true,
2369 permit_dc_failover: false,
2370 ..Default::default()
2371 },
2372 routing_info: RoutingInfo {
2373 token: Some(Token::new(160)),
2374 table: Some(TABLE_NTS_RF_2),
2375 consistency: Consistency::Quorum,
2376 is_confirmed_lwt: true,
2377 ..Default::default()
2378 },
2379 // going through the ring, we get order: F , A , C , D , G , B , E
2380 // us eu eu us eu eu us
2381 // r2 r1 r1 r1 r2 r1 r1
2382 expected_groups: ExpectedGroupsBuilder::new()
2383 .ordered([F, A, D, G]) // remote replicas
2384 .group([B, C, E]) // remote nodes
2385 .build(),
2386 },
2387 // Keyspace NTS with RF=3 with enabled DC failover and rack-awareness
2388 Test {
2389 policy: DefaultPolicy {
2390 preferences: NodeLocationPreference::DatacenterAndRack(
2391 "eu".to_owned(),
2392 "r1".to_owned(),
2393 ),
2394 is_token_aware: true,
2395 permit_dc_failover: true,
2396 ..Default::default()
2397 },
2398 routing_info: RoutingInfo {
2399 token: Some(Token::new(160)),
2400 table: Some(TABLE_NTS_RF_3),
2401 consistency: Consistency::One,
2402 is_confirmed_lwt: true,
2403 ..Default::default()
2404 },
2405 // going through the ring, we get order: F , A , C , D , G , B , E
2406 // us eu eu us eu eu us
2407 // r2 r1 r1 r1 r2 r1 r1
2408 expected_groups: ExpectedGroupsBuilder::new()
2409 .ordered([A, C]) // pick local rack replicas
2410 .ordered([G]) // local DC replicas
2411 .ordered([F, D, E]) // remote replicas
2412 .group([B]) // local nodes
2413 .build(),
2414 },
2415 // Keyspace SS with RF=2 with enabled rack-awareness, shuffling replicas disabled
2416 Test {
2417 policy: DefaultPolicy {
2418 preferences: NodeLocationPreference::DatacenterAndRack(
2419 "eu".to_owned(),
2420 "r1".to_owned(),
2421 ),
2422 is_token_aware: true,
2423 permit_dc_failover: false,
2424 fixed_seed: Some(123),
2425 ..Default::default()
2426 },
2427 routing_info: RoutingInfo {
2428 token: Some(Token::new(760)),
2429 table: Some(TABLE_SS_RF_2),
2430 consistency: Consistency::Two,
2431 is_confirmed_lwt: true,
2432 ..Default::default()
2433 },
2434 // going through the ring, we get order: G , B , A , E , F , C , D
2435 // eu eu eu us us eu us
2436 // r2 r1 r1 r1 r2 r1 r1
2437 expected_groups: ExpectedGroupsBuilder::new()
2438 .ordered([B]) // pick local rack replicas
2439 .ordered([G]) // local DC replicas
2440 .group([A, C]) // local nodes
2441 .build(),
2442 },
2443 // Keyspace SS with RF=2 with enabled rack-awareness and no local-rack replica
2444 Test {
2445 policy: DefaultPolicy {
2446 preferences: NodeLocationPreference::DatacenterAndRack(
2447 "eu".to_owned(),
2448 "r2".to_owned(),
2449 ),
2450 is_token_aware: true,
2451 permit_dc_failover: false,
2452 ..Default::default()
2453 },
2454 routing_info: RoutingInfo {
2455 token: Some(Token::new(160)),
2456 table: Some(TABLE_SS_RF_2),
2457 consistency: Consistency::One,
2458 is_confirmed_lwt: true,
2459 ..Default::default()
2460 },
2461 // going through the ring, we get order: F , A , C , D , G , B , E
2462 // us eu eu us eu eu us
2463 // r2 r1 r1 r1 r2 r1 r1
2464 expected_groups: ExpectedGroupsBuilder::new()
2465 .group([A]) // pick local DC
2466 .group([C, G, B]) // local nodes
2467 .build(),
2468 },
2469 ];
2470
2471 for Test {
2472 policy,
2473 routing_info,
2474 expected_groups,
2475 } in tests
2476 {
2477 test_default_policy_with_given_cluster_and_routing_info(
2478 &policy,
2479 &cluster,
2480 &routing_info,
2481 &expected_groups,
2482 )
2483 .await;
2484 }
2485
2486 let cluster_with_disabled_node_f = ClusterState::new(
2487 mock_metadata_for_token_aware_tests(),
2488 &Default::default(),
2489 &HashMap::new(),
2490 &None,
2491 {
2492 struct FHostFilter;
2493 impl HostFilter for FHostFilter {
2494 fn accept(&self, peer: &crate::cluster::metadata::Peer) -> bool {
2495 peer.address != id_to_invalid_addr(F)
2496 }
2497 }
2498
2499 Some(&FHostFilter)
2500 },
2501 TabletsInfo::new(),
2502 &HashMap::new(),
2503 #[cfg(feature = "metrics")]
2504 &Default::default(),
2505 )
2506 .await;
2507
2508 let tests_with_disabled_node_f = [
2509 // Keyspace NTS with RF=3 without preferred DC.
2510 // The primary replica does not satisfy the predicate (being disabled by HostFilter),
2511 // so pick() should return None and fallback should return A first.
2512 //
2513 // This is a regression test after a bug was fixed.
2514 Test {
2515 policy: DefaultPolicy {
2516 preferences: NodeLocationPreference::Any,
2517 is_token_aware: true,
2518 permit_dc_failover: true,
2519 pick_predicate: Box::new(|node, _shard| node.address != id_to_invalid_addr(F)),
2520 ..Default::default()
2521 },
2522 routing_info: RoutingInfo {
2523 token: Some(Token::new(160)),
2524 table: Some(TABLE_NTS_RF_3),
2525 consistency: Consistency::One,
2526 is_confirmed_lwt: true,
2527 ..Default::default()
2528 },
2529 // going through the ring, we get order: F , A , C , D , G , B , E
2530 // us eu eu us eu eu us
2531 // r2 r1 r1 r1 r2 r1 r1
2532 expected_groups: ExpectedGroupsBuilder::new()
2533 // pick is empty, because the primary replica does not satisfy pick predicate,
2534 // and with LWT we cannot compute other replicas for NTS without allocations.
2535 .ordered([A, C, D, G, E]) // replicas
2536 .group([B]) // nodes
2537 .build(),
2538 },
2539 ];
2540
2541 for Test {
2542 policy,
2543 routing_info,
2544 expected_groups,
2545 } in tests_with_disabled_node_f
2546 {
2547 test_default_policy_with_given_cluster_and_routing_info(
2548 &policy,
2549 &cluster_with_disabled_node_f,
2550 &routing_info,
2551 &expected_groups,
2552 )
2553 .await;
2554 }
2555 }
2556}
2557
2558mod latency_awareness {
2559 use futures::{future::RemoteHandle, FutureExt};
2560 use itertools::Either;
2561 use tokio::time::{Duration, Instant};
2562 use tracing::{trace, warn};
2563 use uuid::Uuid;
2564
2565 use crate::cluster::node::Node;
2566 use crate::errors::{DbError, RequestAttemptError};
2567 use crate::policies::load_balancing::NodeRef;
2568 use crate::routing::Shard;
2569 use std::{
2570 collections::HashMap,
2571 ops::Deref,
2572 sync::{
2573 atomic::{AtomicU64, Ordering},
2574 Arc, RwLock,
2575 },
2576 };
2577
2578 #[derive(Debug)]
2579 struct AtomicDuration(AtomicU64);
2580
2581 impl AtomicDuration {
2582 fn new() -> Self {
2583 Self(AtomicU64::new(u64::MAX))
2584 }
2585
2586 fn store(&self, duration: Duration) {
2587 self.0.store(duration.as_micros() as u64, Ordering::Relaxed)
2588 }
2589
2590 fn load(&self) -> Option<Duration> {
2591 let micros = self.0.load(Ordering::Relaxed);
2592 if micros == u64::MAX {
2593 None
2594 } else {
2595 Some(Duration::from_micros(micros))
2596 }
2597 }
2598 }
2599
2600 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
2601 pub(super) struct TimestampedAverage {
2602 pub(super) timestamp: Instant,
2603 pub(super) average: Duration,
2604 pub(super) num_measures: usize,
2605 }
2606
2607 impl TimestampedAverage {
2608 pub(crate) fn compute_next(
2609 previous: Option<Self>,
2610 last_latency: Duration,
2611 scale_secs: f64,
2612 ) -> Option<Self> {
2613 let now = Instant::now();
2614 match previous {
2615 prev if last_latency.is_zero() => prev,
2616 None => Some(Self {
2617 num_measures: 1,
2618 average: last_latency,
2619 timestamp: now,
2620 }),
2621 Some(prev_avg) => Some({
2622 let delay = now
2623 .saturating_duration_since(prev_avg.timestamp)
2624 .as_secs_f64();
2625 let scaled_delay = delay / scale_secs;
2626 let prev_weight = if scaled_delay <= 0. {
2627 1.
2628 } else {
2629 (scaled_delay + 1.).ln() / scaled_delay
2630 };
2631
2632 let last_latency_secs = last_latency.as_secs_f64();
2633 let prev_avg_secs = prev_avg.average.as_secs_f64();
2634 let average = match Duration::try_from_secs_f64(
2635 (1. - prev_weight) * last_latency_secs + prev_weight * prev_avg_secs,
2636 ) {
2637 Ok(ts) => ts,
2638 Err(e) => {
2639 warn!(
2640 "Error while calculating average: {e}. \
2641 prev_avg_secs: {prev_avg_secs}, \
2642 last_latency_secs: {last_latency_secs}, \
2643 prev_weight: {prev_weight}, \
2644 scaled_delay: {scaled_delay}, \
2645 delay: {delay}, \
2646 prev_avg.timestamp: {:?}, \
2647 now: {now:?}",
2648 prev_avg.timestamp
2649 );
2650
2651 // Not sure when we could enter this branch,
2652 // so I have no idea what would be a sensible value to return here,
2653 // this does not seem like a very bad choice.
2654 prev_avg.average
2655 }
2656 };
2657 Self {
2658 num_measures: prev_avg.num_measures + 1,
2659 timestamp: now,
2660 average,
2661 }
2662 }),
2663 }
2664 }
2665 }
2666
2667 /// A latency-aware load balancing policy module, which enables penalising nodes that are too slow.
2668 #[derive(Debug)]
2669 pub(super) struct LatencyAwareness {
2670 pub(super) exclusion_threshold: f64,
2671 pub(super) retry_period: Duration,
2672 pub(super) _update_rate: Duration,
2673 pub(super) minimum_measurements: usize,
2674 pub(super) scale_secs: f64,
2675
2676 /// Last minimum average latency that was noted among the nodes. It is updated every
2677 /// [update_rate](Self::_update_rate).
2678 last_min_latency: Arc<AtomicDuration>,
2679
2680 node_avgs: Arc<RwLock<HashMap<Uuid, RwLock<Option<TimestampedAverage>>>>>,
2681
2682 // This is Some iff there is an associated updater running on a separate Tokio task
2683 // For some tests, not to rely on timing, this is None. The updater is then tick'ed
2684 // explicitly from outside this struct.
2685 _updater_handle: Option<RemoteHandle<()>>,
2686 }
2687
2688 impl LatencyAwareness {
2689 pub(super) fn builder() -> LatencyAwarenessBuilder {
2690 LatencyAwarenessBuilder::new()
2691 }
2692
2693 fn new_for_test(
2694 exclusion_threshold: f64,
2695 retry_period: Duration,
2696 update_rate: Duration,
2697 minimum_measurements: usize,
2698 scale: Duration,
2699 ) -> (Self, MinAvgUpdater) {
2700 let min_latency = Arc::new(AtomicDuration::new());
2701
2702 let min_latency_clone = min_latency.clone();
2703 let node_avgs = Arc::new(RwLock::new(HashMap::new()));
2704 let node_avgs_clone = node_avgs.clone();
2705
2706 let updater = MinAvgUpdater {
2707 node_avgs,
2708 min_latency,
2709 minimum_measurements,
2710 };
2711
2712 (
2713 Self {
2714 exclusion_threshold,
2715 retry_period,
2716 _update_rate: update_rate,
2717 minimum_measurements,
2718 scale_secs: scale.as_secs_f64(),
2719 last_min_latency: min_latency_clone,
2720 node_avgs: node_avgs_clone,
2721 _updater_handle: None,
2722 },
2723 updater,
2724 )
2725 }
2726
2727 fn new(
2728 exclusion_threshold: f64,
2729 retry_period: Duration,
2730 update_rate: Duration,
2731 minimum_measurements: usize,
2732 scale: Duration,
2733 ) -> Self {
2734 let (self_, updater) = Self::new_for_test(
2735 exclusion_threshold,
2736 retry_period,
2737 update_rate,
2738 minimum_measurements,
2739 scale,
2740 );
2741
2742 let (updater_fut, updater_handle) = async move {
2743 let mut update_scheduler = tokio::time::interval(update_rate);
2744 loop {
2745 update_scheduler.tick().await;
2746 updater.tick().await;
2747 }
2748 }
2749 .remote_handle();
2750 tokio::task::spawn(updater_fut);
2751
2752 Self {
2753 _updater_handle: Some(updater_handle),
2754 ..self_
2755 }
2756 }
2757
2758 pub(super) fn generate_predicate(&self) -> impl Fn(&Node) -> bool {
2759 let last_min_latency = self.last_min_latency.clone();
2760 let node_avgs = self.node_avgs.clone();
2761 let exclusion_threshold = self.exclusion_threshold;
2762 let minimum_measurements = self.minimum_measurements;
2763 let retry_period = self.retry_period;
2764
2765 move |node| {
2766 last_min_latency.load().map(|min_avg| match fast_enough(&node_avgs.read().unwrap(), node.host_id, exclusion_threshold, retry_period, minimum_measurements, min_avg) {
2767 FastEnough::Yes => true,
2768 FastEnough::No { average } => {
2769 trace!("Latency awareness: Penalising node {{address={}, datacenter={:?}, rack={:?}}} for being on average at least {} times slower (latency: {}ms) than the fastest ({}ms).",
2770 node.address, node.datacenter, node.rack, exclusion_threshold, average.as_millis(), min_avg.as_millis());
2771 false
2772 }
2773 }).unwrap_or(true)
2774 }
2775 }
2776
2777 pub(super) fn wrap<'a>(
2778 &self,
2779 fallback: impl Iterator<Item = (NodeRef<'a>, Option<Shard>)>,
2780 ) -> impl Iterator<Item = (NodeRef<'a>, Option<Shard>)> {
2781 let min_avg_latency = match self.last_min_latency.load() {
2782 Some(min_avg) => min_avg,
2783 None => return Either::Left(fallback), // noop, as no latency data has been collected yet
2784 };
2785
2786 let average_latencies = self.node_avgs.read().unwrap();
2787 let targets = fallback;
2788
2789 let mut fast_targets = vec![];
2790 let mut penalised_targets = vec![];
2791
2792 for node_and_shard @ (node, _shard) in targets {
2793 match fast_enough(
2794 average_latencies.deref(),
2795 node.host_id,
2796 self.exclusion_threshold,
2797 self.retry_period,
2798 self.minimum_measurements,
2799 min_avg_latency,
2800 ) {
2801 FastEnough::Yes => fast_targets.push(node_and_shard),
2802 FastEnough::No { average } => {
2803 trace!("Latency awareness: Penalising node {{address={}, datacenter={:?}, rack={:?}}} for being on average at least {} times slower (latency: {}ms) than the fastest ({}ms).",
2804 node.address, node.datacenter, node.rack, self.exclusion_threshold, average.as_millis(), min_avg_latency.as_millis());
2805 penalised_targets.push(node_and_shard);
2806 }
2807 }
2808 }
2809
2810 let mut fast_targets = fast_targets.into_iter();
2811 let mut penalised_targets = penalised_targets.into_iter();
2812
2813 let skipping_penalised_targets_iterator = std::iter::from_fn(move || {
2814 fast_targets.next().or_else(|| penalised_targets.next())
2815 });
2816
2817 Either::Right(skipping_penalised_targets_iterator)
2818 }
2819
2820 pub(super) fn report_request(&self, node: &Node, latency: Duration) {
2821 let node_avgs_guard = self.node_avgs.read().unwrap();
2822 if let Some(previous_node_avg) = node_avgs_guard.get(&node.host_id) {
2823 // The usual path, the node has been already noticed.
2824 let mut node_avg_guard = previous_node_avg.write().unwrap();
2825 let previous_node_avg = *node_avg_guard;
2826 *node_avg_guard =
2827 TimestampedAverage::compute_next(previous_node_avg, latency, self.scale_secs);
2828 } else {
2829 // We drop the read lock not to deadlock while taking write lock.
2830 std::mem::drop(node_avgs_guard);
2831 let mut node_avgs_guard = self.node_avgs.write().unwrap();
2832
2833 // We have to read this again, as other threads may race with us.
2834 let previous_node_avg = node_avgs_guard
2835 .get(&node.host_id)
2836 .and_then(|rwlock| *rwlock.read().unwrap());
2837
2838 // We most probably need to add the node to the map.
2839 // (this will be Some only in an unlikely case that another thread raced with us and won)
2840 node_avgs_guard.insert(
2841 node.host_id,
2842 RwLock::new(TimestampedAverage::compute_next(
2843 previous_node_avg,
2844 latency,
2845 self.scale_secs,
2846 )),
2847 );
2848 }
2849 }
2850
2851 pub(crate) fn reliable_latency_measure(error: &RequestAttemptError) -> bool {
2852 match error {
2853 // "fast" errors, i.e. ones that are returned quickly after the query begins
2854 RequestAttemptError::CqlRequestSerialization(_)
2855 | RequestAttemptError::BrokenConnectionError(_)
2856 | RequestAttemptError::UnableToAllocStreamId
2857 | RequestAttemptError::DbError(DbError::IsBootstrapping, _)
2858 | RequestAttemptError::DbError(DbError::Unavailable { .. }, _)
2859 | RequestAttemptError::DbError(DbError::Unprepared { .. }, _)
2860 | RequestAttemptError::DbError(DbError::Overloaded, _)
2861 | RequestAttemptError::DbError(DbError::RateLimitReached { .. }, _)
2862 | RequestAttemptError::SerializationError(_) => false,
2863
2864 // "slow" errors, i.e. ones that are returned after considerable time of query being run
2865 RequestAttemptError::DbError(_, _)
2866 | RequestAttemptError::CqlResultParseError(_)
2867 | RequestAttemptError::CqlErrorParseError(_)
2868 | RequestAttemptError::BodyExtensionsParseError(_)
2869 | RequestAttemptError::RepreparedIdChanged { .. }
2870 | RequestAttemptError::RepreparedIdMissingInBatch
2871 | RequestAttemptError::UnexpectedResponse(_)
2872 | RequestAttemptError::NonfinishedPagingState => true,
2873 }
2874 }
2875 }
2876
2877 impl Default for LatencyAwareness {
2878 fn default() -> Self {
2879 Self::builder().build()
2880 }
2881 }
2882
2883 /// Updates minimum average latency upon request each request to `tick()`.
2884 /// The said average is a crucial criterium for penalising "too slow" nodes.
2885 struct MinAvgUpdater {
2886 node_avgs: Arc<RwLock<HashMap<Uuid, RwLock<Option<TimestampedAverage>>>>>,
2887 min_latency: Arc<AtomicDuration>,
2888 minimum_measurements: usize,
2889 }
2890
2891 impl MinAvgUpdater {
2892 async fn tick(&self) {
2893 let averages: &HashMap<Uuid, RwLock<Option<TimestampedAverage>>> =
2894 &self.node_avgs.read().unwrap();
2895 if averages.is_empty() {
2896 return; // No nodes queries registered to LAP performed yet.
2897 }
2898
2899 let min_avg = averages
2900 .values()
2901 .filter_map(|avg| {
2902 avg.read().unwrap().and_then(|timestamped_average| {
2903 (timestamped_average.num_measures >= self.minimum_measurements)
2904 .then_some(timestamped_average.average)
2905 })
2906 })
2907 .min();
2908 if let Some(min_avg) = min_avg {
2909 self.min_latency.store(min_avg);
2910 trace!(
2911 "Latency awareness: updated min average latency to {} ms",
2912 min_avg.as_secs_f64() * 1000.
2913 );
2914 }
2915 }
2916 }
2917
2918 /// The builder of LatencyAwareness module of DefaultPolicy.
2919 ///
2920 /// (For more information about latency awareness, see [DefaultPolicyBuilder::latency_awareness()](super::DefaultPolicyBuilder::latency_awareness)).
2921 /// It is intended to be created and configured by the user and then
2922 /// passed to DefaultPolicyBuilder, like this:
2923 ///
2924 /// # Example
2925 /// ```
2926 /// # fn example() {
2927 /// use scylla::policies::load_balancing::{
2928 /// LatencyAwarenessBuilder, DefaultPolicy
2929 /// };
2930 ///
2931 /// let latency_awareness_builder = LatencyAwarenessBuilder::new()
2932 /// .exclusion_threshold(3.)
2933 /// .minimum_measurements(200);
2934 ///
2935 /// let policy = DefaultPolicy::builder()
2936 /// .latency_awareness(latency_awareness_builder)
2937 /// .build();
2938 /// # }
2939 #[derive(Debug, Clone)]
2940 pub struct LatencyAwarenessBuilder {
2941 exclusion_threshold: f64,
2942 retry_period: Duration,
2943 update_rate: Duration,
2944 minimum_measurements: usize,
2945 scale: Duration,
2946 }
2947
2948 impl LatencyAwarenessBuilder {
2949 /// Creates a builder of LatencyAwareness module of DefaultPolicy.
2950 pub fn new() -> Self {
2951 Self {
2952 exclusion_threshold: 2_f64,
2953 retry_period: Duration::from_secs(10),
2954 update_rate: Duration::from_millis(100),
2955 minimum_measurements: 50,
2956 scale: Duration::from_millis(100),
2957 }
2958 }
2959
2960 /// Sets minimum measurements for latency awareness (if there have been fewer measurements taken for a node,
2961 /// the node will not be penalised).
2962 ///
2963 /// Penalising nodes is based on an average of their recently measured average latency.
2964 /// This average is only meaningful if a minimum of measurements have been collected.
2965 /// This is what this option controls. If fewer than [minimum_measurements](Self::minimum_measurements)
2966 /// data points have been collected for a given host, the policy will never penalise that host.
2967 /// Note that the number of collected measurements for a given host is reset if the node
2968 /// is restarted.
2969 /// The default for this option is **50**.
2970 pub fn minimum_measurements(self, minimum_measurements: usize) -> Self {
2971 Self {
2972 minimum_measurements,
2973 ..self
2974 }
2975 }
2976
2977 /// Sets retry period for latency awareness (max time that a node is being penalised).
2978 ///
2979 /// The retry period defines how long a node may be penalised by the policy before it is given
2980 /// a 2nd chance. More precisely, a node is excluded from query plans if both his calculated
2981 /// average latency is [exclusion_threshold](Self::exclusion_threshold) times slower than
2982 /// the fastest node average latency (at the time the query plan is computed) **and** his
2983 /// calculated average latency has been updated since less than [retry_period](Self::retry_period).
2984 /// Since penalised nodes will likely not see their latency updated, this is basically how long
2985 /// the policy will exclude a node.
2986 pub fn retry_period(self, retry_period: Duration) -> Self {
2987 Self {
2988 retry_period,
2989 ..self
2990 }
2991 }
2992
2993 /// Sets exclusion threshold for latency awareness (a threshold for a node to be penalised).
2994 ///
2995 /// The exclusion threshold controls how much worse the average latency of a node must be
2996 /// compared to the fastest performing node for it to be penalised by the policy.
2997 /// For example, if set to 2, the resulting policy excludes nodes that are more than twice
2998 /// slower than the fastest node.
2999 pub fn exclusion_threshold(self, exclusion_threshold: f64) -> Self {
3000 Self {
3001 exclusion_threshold,
3002 ..self
3003 }
3004 }
3005
3006 /// Sets update rate for latency awareness (how often is the global minimal average latency updated).
3007 ///
3008 /// The update rate defines how often the minimum average latency is recomputed. While the
3009 /// average latency score of each node is computed iteratively (updated each time a new latency
3010 /// is collected), the minimum score needs to be recomputed from scratch every time, which is
3011 /// slightly more costly. For this reason, the minimum is only re-calculated at the given fixed
3012 /// rate and cached between re-calculation.
3013 /// The default update rate if **100 milliseconds**, which should be appropriate for most
3014 /// applications. In particular, note that while we want to avoid to recompute the minimum for
3015 /// every query, that computation is not particularly intensive either and there is no reason to
3016 /// use a very slow rate (more than second is probably unnecessarily slow for instance).
3017 pub fn update_rate(self, update_rate: Duration) -> Self {
3018 Self {
3019 update_rate,
3020 ..self
3021 }
3022 }
3023
3024 /// Sets the scale to use for the resulting latency aware policy.
3025 ///
3026 /// The `scale` provides control on how the weight given to older latencies decreases
3027 /// over time. For a given host, if a new latency `l` is received at time `t`, and
3028 /// the previously calculated average is `prev` calculated at time `t'`, then the
3029 /// newly calculated average `avg` for that host is calculated thusly:
3030 ///
3031 /// ```text
3032 /// d = (t - t') / scale
3033 /// alpha = 1 - (ln(d+1) / d)
3034 /// avg = alpha * l + (1 - alpha) * prev
3035 /// ```
3036 ///
3037 /// Typically, with a `scale` of 100 milliseconds (the default), if a new latency is
3038 /// measured and the previous measure is 10 millisecond old (so `d=0.1`), then `alpha`
3039 /// will be around `0.05`. In other words, the new latency will weight 5% of the
3040 /// updated average. A bigger scale will get less weight to new measurements (compared to
3041 /// previous ones), a smaller one will give them more weight.
3042 ///
3043 /// The default scale (if this method is not used) is of **100 milliseconds**. If unsure,
3044 /// try this default scale first and experiment only if it doesn't provide acceptable results
3045 /// (hosts are excluded too quickly or not fast enough and tuning the exclusion threshold doesn't
3046 /// help).
3047 pub fn scale(self, scale: Duration) -> Self {
3048 Self { scale, ..self }
3049 }
3050
3051 pub(super) fn build(self) -> LatencyAwareness {
3052 let Self {
3053 exclusion_threshold,
3054 retry_period,
3055 update_rate,
3056 minimum_measurements,
3057 scale,
3058 } = self;
3059 LatencyAwareness::new(
3060 exclusion_threshold,
3061 retry_period,
3062 update_rate,
3063 minimum_measurements,
3064 scale,
3065 )
3066 }
3067
3068 #[cfg(test)]
3069 fn build_for_test(self) -> (LatencyAwareness, MinAvgUpdater) {
3070 let Self {
3071 exclusion_threshold,
3072 retry_period,
3073 update_rate,
3074 minimum_measurements,
3075 scale,
3076 } = self;
3077 LatencyAwareness::new_for_test(
3078 exclusion_threshold,
3079 retry_period,
3080 update_rate,
3081 minimum_measurements,
3082 scale,
3083 )
3084 }
3085 }
3086
3087 impl Default for LatencyAwarenessBuilder {
3088 fn default() -> Self {
3089 Self::new()
3090 }
3091 }
3092
3093 pub(super) enum FastEnough {
3094 Yes,
3095 No { average: Duration },
3096 }
3097
3098 pub(super) fn fast_enough(
3099 average_latencies: &HashMap<Uuid, RwLock<Option<TimestampedAverage>>>,
3100 node: Uuid,
3101 exclusion_threshold: f64,
3102 retry_period: Duration,
3103 minimum_measurements: usize,
3104 min_avg: Duration,
3105 ) -> FastEnough {
3106 let avg = match average_latencies
3107 .get(&node)
3108 .and_then(|avgs| *avgs.read().unwrap())
3109 {
3110 Some(avg) => avg,
3111 None => return FastEnough::Yes,
3112 };
3113 if avg.num_measures >= minimum_measurements
3114 && avg.timestamp.elapsed() < retry_period
3115 && avg.average.as_micros() as f64 > exclusion_threshold * min_avg.as_micros() as f64
3116 {
3117 FastEnough::No {
3118 average: avg.average,
3119 }
3120 } else {
3121 FastEnough::Yes
3122 }
3123 }
3124
3125 #[cfg(test)]
3126 mod tests {
3127 use scylla_cql::Consistency;
3128
3129 use super::{
3130 super::tests::{framework::*, EMPTY_ROUTING_INFO},
3131 super::DefaultPolicy,
3132 *,
3133 };
3134
3135 use crate::{
3136 cluster::ClusterState,
3137 cluster::NodeAddr,
3138 policies::load_balancing::default::NodeLocationPreference,
3139 policies::load_balancing::{
3140 default::tests::test_default_policy_with_given_cluster_and_routing_info,
3141 RoutingInfo,
3142 },
3143 routing::locator::test::{id_to_invalid_addr, A, B, C, D, E, F, G},
3144 routing::locator::test::{TABLE_INVALID, TABLE_NTS_RF_2, TABLE_NTS_RF_3},
3145 routing::Shard,
3146 routing::Token,
3147 test_utils::setup_tracing,
3148 };
3149 use tokio::time::Instant;
3150
3151 trait DefaultPolicyTestExt {
3152 fn set_nodes_latency_stats(
3153 &self,
3154 cluster: &ClusterState,
3155 averages: &[(u16, Option<TimestampedAverage>)],
3156 );
3157 }
3158
3159 impl DefaultPolicyTestExt for DefaultPolicy {
3160 fn set_nodes_latency_stats(
3161 &self,
3162 cluster: &ClusterState,
3163 averages: &[(u16, Option<TimestampedAverage>)],
3164 ) {
3165 let addr_to_host_id: HashMap<NodeAddr, Uuid> = cluster
3166 .known_peers
3167 .values()
3168 .map(|node| (node.address, node.host_id))
3169 .collect();
3170
3171 for (id, average) in averages.iter().copied() {
3172 let host_id = *addr_to_host_id.get(&id_to_invalid_addr(id)).unwrap();
3173 let mut node_latencies = self
3174 .latency_awareness
3175 .as_ref()
3176 .unwrap()
3177 .node_avgs
3178 .write()
3179 .unwrap();
3180 let mut node_latency =
3181 node_latencies.entry(host_id).or_default().write().unwrap();
3182 println!("Set latency: node {}, latency {:?}.", id, average);
3183 *node_latency = average;
3184 }
3185 println!("Set node latency stats.")
3186 }
3187 }
3188
3189 fn default_policy_with_given_latency_awareness(
3190 latency_awareness: LatencyAwareness,
3191 ) -> DefaultPolicy {
3192 let pick_predicate = {
3193 let latency_predicate = latency_awareness.generate_predicate();
3194 Box::new(move |node: NodeRef<'_>, shard| {
3195 DefaultPolicy::is_alive(node, shard) && latency_predicate(node)
3196 })
3197 as Box<dyn Fn(NodeRef<'_>, Option<Shard>) -> bool + Send + Sync + 'static>
3198 };
3199
3200 DefaultPolicy {
3201 preferences: NodeLocationPreference::Datacenter("eu".to_owned()),
3202 permit_dc_failover: true,
3203 is_token_aware: true,
3204 pick_predicate,
3205 latency_awareness: Some(latency_awareness),
3206 fixed_seed: None,
3207 }
3208 }
3209
3210 fn latency_aware_default_policy_customised(
3211 configurer: impl FnOnce(LatencyAwarenessBuilder) -> LatencyAwarenessBuilder,
3212 ) -> DefaultPolicy {
3213 let latency_awareness = configurer(LatencyAwareness::builder()).build();
3214 default_policy_with_given_latency_awareness(latency_awareness)
3215 }
3216
3217 fn latency_aware_default_policy() -> DefaultPolicy {
3218 latency_aware_default_policy_customised(|b| b)
3219 }
3220
3221 fn latency_aware_policy_with_explicit_updater_customised(
3222 configurer: impl FnOnce(LatencyAwarenessBuilder) -> LatencyAwarenessBuilder,
3223 ) -> (DefaultPolicy, MinAvgUpdater) {
3224 let (latency_awareness, updater) =
3225 configurer(LatencyAwareness::builder()).build_for_test();
3226 (
3227 default_policy_with_given_latency_awareness(latency_awareness),
3228 updater,
3229 )
3230 }
3231
3232 fn latency_aware_default_policy_with_explicit_updater() -> (DefaultPolicy, MinAvgUpdater) {
3233 latency_aware_policy_with_explicit_updater_customised(|b| b)
3234 }
3235
3236 #[tokio::test]
3237 async fn latency_aware_default_policy_does_not_penalise_if_no_latency_info_available_yet() {
3238 setup_tracing();
3239 let policy = latency_aware_default_policy();
3240 let cluster = tests::mock_cluster_state_for_token_unaware_tests().await;
3241
3242 let expected_groups = ExpectedGroupsBuilder::new()
3243 .group([1, 2, 3]) // pick + fallback local nodes
3244 .group([4, 5]) // fallback remote nodes
3245 .build();
3246
3247 test_default_policy_with_given_cluster_and_routing_info(
3248 &policy,
3249 &cluster,
3250 &EMPTY_ROUTING_INFO,
3251 &expected_groups,
3252 )
3253 .await;
3254 }
3255
3256 #[tokio::test]
3257 async fn latency_aware_default_policy_does_not_penalise_if_not_enough_measurements() {
3258 setup_tracing();
3259 let policy = latency_aware_default_policy();
3260 let cluster = tests::mock_cluster_state_for_token_unaware_tests().await;
3261
3262 let min_avg = Duration::from_millis(10);
3263
3264 policy.set_nodes_latency_stats(
3265 &cluster,
3266 &[
3267 (
3268 1,
3269 Some(TimestampedAverage {
3270 timestamp: Instant::now(),
3271 average: Duration::from_secs_f64(
3272 policy
3273 .latency_awareness
3274 .as_ref()
3275 .unwrap()
3276 .exclusion_threshold
3277 * 1.5
3278 * min_avg.as_secs_f64(),
3279 ),
3280 num_measures: policy
3281 .latency_awareness
3282 .as_ref()
3283 .unwrap()
3284 .minimum_measurements
3285 - 1,
3286 }),
3287 ),
3288 (
3289 3,
3290 Some(TimestampedAverage {
3291 timestamp: Instant::now(),
3292 average: min_avg,
3293 num_measures: policy
3294 .latency_awareness
3295 .as_ref()
3296 .unwrap()
3297 .minimum_measurements,
3298 }),
3299 ),
3300 ],
3301 );
3302
3303 let expected_groups = ExpectedGroupsBuilder::new()
3304 .group([1, 2, 3]) // pick + fallback local nodes
3305 .group([4, 5]) // fallback remote nodes
3306 .build();
3307
3308 test_default_policy_with_given_cluster_and_routing_info(
3309 &policy,
3310 &cluster,
3311 &EMPTY_ROUTING_INFO,
3312 &expected_groups,
3313 )
3314 .await;
3315 }
3316
3317 #[tokio::test]
3318 async fn latency_aware_default_policy_does_not_penalise_if_exclusion_threshold_not_crossed()
3319 {
3320 setup_tracing();
3321 let policy = latency_aware_default_policy();
3322 let cluster = tests::mock_cluster_state_for_token_unaware_tests().await;
3323
3324 let min_avg = Duration::from_millis(10);
3325
3326 policy.set_nodes_latency_stats(
3327 &cluster,
3328 &[
3329 (
3330 1,
3331 Some(TimestampedAverage {
3332 timestamp: Instant::now(),
3333 average: Duration::from_secs_f64(
3334 policy
3335 .latency_awareness
3336 .as_ref()
3337 .unwrap()
3338 .exclusion_threshold
3339 * 0.95
3340 * min_avg.as_secs_f64(),
3341 ),
3342 num_measures: policy
3343 .latency_awareness
3344 .as_ref()
3345 .unwrap()
3346 .minimum_measurements,
3347 }),
3348 ),
3349 (
3350 3,
3351 Some(TimestampedAverage {
3352 timestamp: Instant::now(),
3353 average: min_avg,
3354 num_measures: policy
3355 .latency_awareness
3356 .as_ref()
3357 .unwrap()
3358 .minimum_measurements,
3359 }),
3360 ),
3361 ],
3362 );
3363
3364 let expected_groups = ExpectedGroupsBuilder::new()
3365 .group([1, 2, 3]) // pick + fallback local nodes
3366 .group([4, 5]) // fallback remote nodes
3367 .build();
3368
3369 test_default_policy_with_given_cluster_and_routing_info(
3370 &policy,
3371 &cluster,
3372 &EMPTY_ROUTING_INFO,
3373 &expected_groups,
3374 )
3375 .await;
3376 }
3377
3378 #[tokio::test]
3379 async fn latency_aware_default_policy_does_not_penalise_if_retry_period_expired() {
3380 setup_tracing();
3381 let policy = latency_aware_default_policy_customised(|b| {
3382 b.retry_period(Duration::from_millis(10))
3383 });
3384
3385 let cluster = tests::mock_cluster_state_for_token_unaware_tests().await;
3386
3387 let min_avg = Duration::from_millis(10);
3388
3389 policy.set_nodes_latency_stats(
3390 &cluster,
3391 &[
3392 (
3393 1,
3394 Some(TimestampedAverage {
3395 timestamp: Instant::now(),
3396 average: Duration::from_secs_f64(
3397 policy
3398 .latency_awareness
3399 .as_ref()
3400 .unwrap()
3401 .exclusion_threshold
3402 * 1.5
3403 * min_avg.as_secs_f64(),
3404 ),
3405 num_measures: policy
3406 .latency_awareness
3407 .as_ref()
3408 .unwrap()
3409 .minimum_measurements,
3410 }),
3411 ),
3412 (
3413 3,
3414 Some(TimestampedAverage {
3415 timestamp: Instant::now(),
3416 average: min_avg,
3417 num_measures: policy
3418 .latency_awareness
3419 .as_ref()
3420 .unwrap()
3421 .minimum_measurements,
3422 }),
3423 ),
3424 ],
3425 );
3426
3427 tokio::time::sleep(2 * policy.latency_awareness.as_ref().unwrap().retry_period).await;
3428
3429 let expected_groups = ExpectedGroupsBuilder::new()
3430 .group([1, 2, 3]) // pick + fallback local nodes
3431 .group([4, 5]) // fallback remote nodes
3432 .build();
3433
3434 test_default_policy_with_given_cluster_and_routing_info(
3435 &policy,
3436 &cluster,
3437 &EMPTY_ROUTING_INFO,
3438 &expected_groups,
3439 )
3440 .await;
3441 }
3442
3443 #[tokio::test]
3444 async fn latency_aware_default_policy_penalises_if_conditions_met() {
3445 setup_tracing();
3446 let (policy, updater) = latency_aware_default_policy_with_explicit_updater();
3447 let cluster = tests::mock_cluster_state_for_token_unaware_tests().await;
3448
3449 let min_avg = Duration::from_millis(10);
3450
3451 policy.set_nodes_latency_stats(
3452 &cluster,
3453 &[
3454 // 3 is fast enough to make 1 and 4 penalised.
3455 (
3456 1,
3457 Some(TimestampedAverage {
3458 timestamp: Instant::now(),
3459 average: Duration::from_secs_f64(
3460 policy
3461 .latency_awareness
3462 .as_ref()
3463 .unwrap()
3464 .exclusion_threshold
3465 * 1.05
3466 * min_avg.as_secs_f64(),
3467 ),
3468 num_measures: policy
3469 .latency_awareness
3470 .as_ref()
3471 .unwrap()
3472 .minimum_measurements,
3473 }),
3474 ),
3475 (
3476 3,
3477 Some(TimestampedAverage {
3478 timestamp: Instant::now(),
3479 average: min_avg,
3480 num_measures: policy
3481 .latency_awareness
3482 .as_ref()
3483 .unwrap()
3484 .minimum_measurements,
3485 }),
3486 ),
3487 (
3488 4,
3489 Some(TimestampedAverage {
3490 timestamp: Instant::now(),
3491 average: Duration::from_secs_f64(
3492 policy
3493 .latency_awareness
3494 .as_ref()
3495 .unwrap()
3496 .exclusion_threshold
3497 * 1.05
3498 * min_avg.as_secs_f64(),
3499 ),
3500 num_measures: policy
3501 .latency_awareness
3502 .as_ref()
3503 .unwrap()
3504 .minimum_measurements,
3505 }),
3506 ),
3507 ],
3508 );
3509
3510 // Await last min average updater.
3511 updater.tick().await;
3512
3513 let expected_groups = ExpectedGroupsBuilder::new()
3514 .group([2, 3]) // pick + fallback local nodes
3515 .group([5]) // fallback remote nodes
3516 .group([1]) // local node that was penalised due to high latency
3517 .group([4]) // remote node that was penalised due to high latency
3518 .build();
3519
3520 test_default_policy_with_given_cluster_and_routing_info(
3521 &policy,
3522 &cluster,
3523 &EMPTY_ROUTING_INFO,
3524 &expected_groups,
3525 )
3526 .await;
3527 }
3528
3529 #[tokio::test]
3530 async fn latency_aware_default_policy_stops_penalising_after_min_average_increases_enough_only_after_update_rate_elapses(
3531 ) {
3532 setup_tracing();
3533
3534 let (policy, updater) = latency_aware_default_policy_with_explicit_updater();
3535
3536 let cluster = tests::mock_cluster_state_for_token_unaware_tests().await;
3537
3538 let min_avg = Duration::from_millis(10);
3539
3540 policy.set_nodes_latency_stats(
3541 &cluster,
3542 &[
3543 (
3544 1,
3545 Some(TimestampedAverage {
3546 timestamp: Instant::now(),
3547 average: Duration::from_secs_f64(
3548 policy
3549 .latency_awareness
3550 .as_ref()
3551 .unwrap()
3552 .exclusion_threshold
3553 * 1.05
3554 * min_avg.as_secs_f64(),
3555 ),
3556 num_measures: policy
3557 .latency_awareness
3558 .as_ref()
3559 .unwrap()
3560 .minimum_measurements,
3561 }),
3562 ),
3563 (
3564 3,
3565 Some(TimestampedAverage {
3566 timestamp: Instant::now(),
3567 average: min_avg,
3568 num_measures: policy
3569 .latency_awareness
3570 .as_ref()
3571 .unwrap()
3572 .minimum_measurements,
3573 }),
3574 ),
3575 ],
3576 );
3577
3578 // Await last min average updater.
3579 updater.tick().await;
3580 {
3581 // min_avg is low enough to penalise node 1
3582 let expected_groups = ExpectedGroupsBuilder::new()
3583 .group([2, 3]) // pick + fallback local nodes
3584 .group([4, 5]) // fallback remote nodes
3585 .group([1]) // local node that was penalised due to high latency
3586 .build();
3587
3588 test_default_policy_with_given_cluster_and_routing_info(
3589 &policy,
3590 &cluster,
3591 &EMPTY_ROUTING_INFO,
3592 &expected_groups,
3593 )
3594 .await;
3595 }
3596
3597 // node 3 becomes as slow as node 1
3598 policy.set_nodes_latency_stats(
3599 &cluster,
3600 &[(
3601 3,
3602 Some(TimestampedAverage {
3603 timestamp: Instant::now(),
3604 average: Duration::from_secs_f64(
3605 policy
3606 .latency_awareness
3607 .as_ref()
3608 .unwrap()
3609 .exclusion_threshold
3610 * min_avg.as_secs_f64(),
3611 ),
3612 num_measures: policy
3613 .latency_awareness
3614 .as_ref()
3615 .unwrap()
3616 .minimum_measurements,
3617 }),
3618 )],
3619 );
3620 {
3621 // min_avg has not yet been updated and so node 1 is still being penalised
3622 let expected_groups = ExpectedGroupsBuilder::new()
3623 .group([2, 3]) // pick + fallback local nodes
3624 .group([4, 5]) // fallback remote nodes
3625 .group([1]) // local node that was penalised due to high latency
3626 .build();
3627
3628 test_default_policy_with_given_cluster_and_routing_info(
3629 &policy,
3630 &cluster,
3631 &EMPTY_ROUTING_INFO,
3632 &expected_groups,
3633 )
3634 .await;
3635 }
3636
3637 updater.tick().await;
3638 {
3639 // min_avg has been updated and is already high enough to stop penalising node 1
3640 let expected_groups = ExpectedGroupsBuilder::new()
3641 .group([1, 2, 3]) // pick + fallback local nodes
3642 .group([4, 5]) // fallback remote nodes
3643 .build();
3644
3645 test_default_policy_with_given_cluster_and_routing_info(
3646 &policy,
3647 &cluster,
3648 &EMPTY_ROUTING_INFO,
3649 &expected_groups,
3650 )
3651 .await;
3652 }
3653 }
3654
3655 #[tokio::test]
3656 async fn latency_aware_default_policy_is_correctly_token_aware() {
3657 setup_tracing();
3658
3659 struct Test<'a, 'b> {
3660 // If Some, then the provided value is set as a min_avg.
3661 // Else, the min_avg is updated based on values provided to set_latency_stats().
3662 preset_min_avg: Option<Duration>,
3663 latency_stats: &'b [(u16, Option<TimestampedAverage>)],
3664 routing_info: RoutingInfo<'a>,
3665 expected_groups: ExpectedGroups,
3666 }
3667
3668 let cluster = tests::mock_cluster_state_for_token_aware_tests().await;
3669 let latency_awareness_defaults =
3670 latency_aware_default_policy().latency_awareness.unwrap();
3671 let min_avg = Duration::from_millis(10);
3672
3673 let fast_leader = || {
3674 Some(TimestampedAverage {
3675 timestamp: Instant::now(),
3676 average: min_avg,
3677 num_measures: latency_awareness_defaults.minimum_measurements,
3678 })
3679 };
3680
3681 let fast_enough = || {
3682 Some(TimestampedAverage {
3683 timestamp: Instant::now(),
3684 average: Duration::from_secs_f64(
3685 latency_awareness_defaults.exclusion_threshold
3686 * 0.95
3687 * min_avg.as_secs_f64(),
3688 ),
3689 num_measures: latency_awareness_defaults.minimum_measurements,
3690 })
3691 };
3692
3693 let slow_penalised = || {
3694 Some(TimestampedAverage {
3695 timestamp: Instant::now(),
3696 average: Duration::from_secs_f64(
3697 latency_awareness_defaults.exclusion_threshold
3698 * 1.05
3699 * min_avg.as_secs_f64(),
3700 ),
3701 num_measures: latency_awareness_defaults.minimum_measurements,
3702 })
3703 };
3704
3705 let too_few_measurements_slow = || {
3706 Some(TimestampedAverage {
3707 timestamp: Instant::now(),
3708 average: Duration::from_secs_f64(
3709 latency_awareness_defaults.exclusion_threshold
3710 * 1.05
3711 * min_avg.as_secs_f64(),
3712 ),
3713 num_measures: latency_awareness_defaults.minimum_measurements - 1,
3714 })
3715 };
3716
3717 let too_few_measurements_fast_leader = || {
3718 Some(TimestampedAverage {
3719 timestamp: Instant::now(),
3720 average: min_avg,
3721 num_measures: 1,
3722 })
3723 };
3724
3725 let tests = [
3726 Test {
3727 // Latency-awareness penalisation fires up and moves C and D to the end.
3728 preset_min_avg: None,
3729 latency_stats: &[
3730 (A, fast_leader()),
3731 (C, slow_penalised()),
3732 (D, slow_penalised()),
3733 (E, too_few_measurements_slow()),
3734 ],
3735 routing_info: RoutingInfo {
3736 token: Some(Token::new(160)),
3737 table: Some(TABLE_NTS_RF_3),
3738 consistency: Consistency::Quorum,
3739 ..Default::default()
3740 },
3741 // going through the ring, we get order: F , A , C , D , G , B , E
3742 // us eu eu us eu eu us
3743 // r2 r1 r1 r1 r2 r1 r1
3744 expected_groups: ExpectedGroupsBuilder::new()
3745 .group([A, G]) // fast enough local replicas
3746 .group([F, E]) // fast enough remote replicas
3747 .group([B]) // fast enough local nodes
3748 .group([C]) // penalised local replica
3749 .group([D]) // penalised remote replica
3750 .build(),
3751 },
3752 Test {
3753 // Latency-awareness has old minimum average cached, so does not fire.
3754 preset_min_avg: Some(100 * min_avg),
3755 routing_info: RoutingInfo {
3756 token: Some(Token::new(160)),
3757 table: Some(TABLE_NTS_RF_3),
3758 consistency: Consistency::Quorum,
3759 ..Default::default()
3760 },
3761 latency_stats: &[
3762 (A, fast_leader()),
3763 (B, fast_enough()),
3764 (C, slow_penalised()),
3765 (D, slow_penalised()),
3766 ],
3767 // going through the ring, we get order: F , A , C , D , G , B , E
3768 // us eu eu us eu eu us
3769 // r2 r1 r1 r1 r2 r1 r1
3770 expected_groups: ExpectedGroupsBuilder::new()
3771 .group([A, C, G]) // fast enough local replicas
3772 .group([F, D, E]) // fast enough remote replicas
3773 .group([B]) // fast enough local nodes
3774 .build(),
3775 },
3776 Test {
3777 // Both A and B are slower than C, but only B has enough measurements collected.
3778 preset_min_avg: None,
3779 latency_stats: &[
3780 (A, slow_penalised()), // not really penalised, because no fast leader here
3781 (B, slow_penalised()), // ditto
3782 (C, too_few_measurements_fast_leader()),
3783 ],
3784 routing_info: RoutingInfo {
3785 token: Some(Token::new(160)),
3786 table: Some(TABLE_NTS_RF_2),
3787 consistency: Consistency::Quorum,
3788 ..Default::default()
3789 },
3790 // going through the ring, we get order: F , A , C , D , G , B , E
3791 // us eu eu us eu eu us
3792 // r2 r1 r1 r1 r2 r1 r1
3793 expected_groups: ExpectedGroupsBuilder::new()
3794 .group([A, G]) // pick + fallback local replicas
3795 .group([F, D]) // remote replicas
3796 .group([C, B]) // local nodes
3797 .group([E]) // remote nodes
3798 .build(),
3799 },
3800 Test {
3801 // No latency stats, so latency-awareness is a no-op.
3802 preset_min_avg: None,
3803 routing_info: RoutingInfo {
3804 token: Some(Token::new(160)),
3805 table: Some(TABLE_INVALID),
3806 consistency: Consistency::Quorum,
3807 ..Default::default()
3808 },
3809 latency_stats: &[],
3810 expected_groups: ExpectedGroupsBuilder::new()
3811 .group([A, B, C, G]) // local nodes
3812 .group([D, E, F]) // remote nodes
3813 .build(),
3814 },
3815 ];
3816
3817 for test in &tests {
3818 let (policy, updater) = latency_aware_default_policy_with_explicit_updater();
3819
3820 if let Some(preset_min_avg) = test.preset_min_avg {
3821 policy.set_nodes_latency_stats(
3822 &cluster,
3823 &[(
3824 1,
3825 Some(TimestampedAverage {
3826 timestamp: Instant::now(),
3827 average: preset_min_avg,
3828 num_measures: latency_awareness_defaults.minimum_measurements,
3829 }),
3830 )],
3831 );
3832 // Await last min average updater for update with a forged min_avg.
3833 updater.tick().await;
3834 policy.set_nodes_latency_stats(&cluster, &[(1, None)]);
3835 }
3836 policy.set_nodes_latency_stats(&cluster, test.latency_stats);
3837
3838 if test.preset_min_avg.is_none() {
3839 // Await last min average updater for update with None min_avg.
3840 updater.tick().await;
3841 }
3842
3843 test_default_policy_with_given_cluster_and_routing_info(
3844 &policy,
3845 &cluster,
3846 &test.routing_info,
3847 &test.expected_groups,
3848 )
3849 .await;
3850 }
3851 }
3852
3853 #[tokio::test(start_paused = true)]
3854 async fn timestamped_average_works_when_clock_stops() {
3855 setup_tracing();
3856 let avg = Some(TimestampedAverage {
3857 timestamp: Instant::now(),
3858 average: Duration::from_secs(123),
3859 num_measures: 1,
3860 });
3861 let new_avg = TimestampedAverage::compute_next(avg, Duration::from_secs(456), 10.0);
3862 assert_eq!(
3863 new_avg,
3864 Some(TimestampedAverage {
3865 timestamp: Instant::now(),
3866 average: Duration::from_secs(123),
3867 num_measures: 2,
3868 }),
3869 );
3870 }
3871 }
3872}