scylla/routing/locator/
mod.rs

1mod precomputed_replicas;
2mod replicas;
3mod replication_info;
4pub(crate) mod tablets;
5#[cfg(test)]
6pub(crate) mod test;
7mod token_ring;
8
9use rand::{seq::IteratorRandom, Rng};
10use scylla_cql::frame::response::result::TableSpec;
11pub use token_ring::TokenRing;
12
13use self::tablets::TabletsInfo;
14
15use crate::cluster::metadata::Strategy;
16use crate::cluster::{Node, NodeRef};
17use crate::routing::{Shard, Token};
18use itertools::Itertools;
19use precomputed_replicas::PrecomputedReplicas;
20use replicas::{ReplicasArray, EMPTY_REPLICAS};
21use replication_info::ReplicationInfo;
22use std::{
23    cmp,
24    collections::{HashMap, HashSet},
25    sync::Arc,
26};
27use tracing::debug;
28
29/// `ReplicaLocator` provides a way to find the set of owning nodes for a given (token,
30/// replication strategy, table) tuple. It does so by either using the precomputed
31/// token ranges, or doing the computation on the fly (precomputation is configurable).
32#[derive(Debug, Clone)]
33pub struct ReplicaLocator {
34    /// The data based on which `ReplicaLocator` computes replica sets.
35    replication_data: ReplicationInfo,
36
37    precomputed_replicas: PrecomputedReplicas,
38
39    datacenters: Vec<String>,
40
41    pub(crate) tablets: TabletsInfo,
42}
43
44impl ReplicaLocator {
45    /// Creates a new `ReplicaLocator` in which the specified replication strategies
46    /// (`precompute_replica_sets_for`) will have its token ranges precomputed. This function can
47    /// potentially be CPU-intensive (if a ring & replication factors in given strategies are big).
48    pub(crate) fn new<'a>(
49        ring_iter: impl Iterator<Item = (Token, Arc<Node>)>,
50        precompute_replica_sets_for: impl Iterator<Item = &'a Strategy>,
51        tablets: TabletsInfo,
52    ) -> Self {
53        let replication_data = ReplicationInfo::new(ring_iter);
54        let precomputed_replicas =
55            PrecomputedReplicas::compute(&replication_data, precompute_replica_sets_for);
56
57        let datacenters = replication_data
58            .get_global_ring()
59            .iter()
60            .filter_map(|(_, node)| node.datacenter.as_deref())
61            .unique()
62            .map(ToOwned::to_owned)
63            .collect();
64
65        Self {
66            replication_data,
67            precomputed_replicas,
68            datacenters,
69            tablets,
70        }
71    }
72
73    /// Returns a set of nodes that are considered to be replicas for a given token, strategy and table.
74    /// If the `datacenter` parameter is set, the returned `ReplicaSet` is limited only to replicas
75    /// from that datacenter. If a specified datacenter name does not correspond to a valid
76    /// datacenter, an empty set will be returned.
77    ///
78    /// Supported replication strategies: `SimpleStrategy`, 'NetworkTopologyStrategy',
79    /// 'LocalStrategy'. If other is specified, it is treated as the `SimpleStrategy` with
80    /// replication factor equal to 1.
81    ///
82    /// If a provided replication strategy did not appear in `precompute_replica_sets_for`
83    /// parameter of `Self::new`, invocation of this function will trigger a computation of the
84    /// desired replica set (the computation might be delegated in time and start upon interaction
85    /// with the returned `ReplicaSet`).
86    ///
87    /// If the requested table uses Tablets, then a separate code path is taken, which ignores
88    /// replication strategies and only uses tablet information stored in ReplicaLocator.
89    /// If we don't have info about the tablet that owns the given token, empty set will be returned.
90    pub fn replicas_for_token<'a>(
91        &'a self,
92        token: Token,
93        strategy: &'a Strategy,
94        datacenter: Option<&'a str>,
95        table_spec: &TableSpec,
96    ) -> ReplicaSet<'a> {
97        if let Some(tablets) = self.tablets.tablets_for_table(table_spec) {
98            let replicas: Option<&[(Arc<Node>, Shard)]> = if let Some(datacenter) = datacenter {
99                tablets.dc_replicas_for_token(token, datacenter)
100            } else {
101                tablets.replicas_for_token(token)
102            };
103            ReplicaSet {
104                inner: ReplicaSetInner::PlainSharded(replicas.unwrap_or(
105                    // The table is a tablet table, but we don't have information for given token.
106                    // Let's just return empty set in this case.
107                    &[],
108                )),
109                token,
110            }
111        } else {
112            match strategy {
113                Strategy::SimpleStrategy { replication_factor } => {
114                    if let Some(datacenter) = datacenter {
115                        let replicas =
116                            self.get_simple_strategy_replicas(token, *replication_factor);
117
118                        return ReplicaSet {
119                            inner: ReplicaSetInner::FilteredSimple {
120                                replicas,
121                                datacenter,
122                            },
123                            token,
124                        };
125                    } else {
126                        return ReplicaSet {
127                            inner: ReplicaSetInner::Plain(
128                                self.get_simple_strategy_replicas(token, *replication_factor),
129                            ),
130                            token,
131                        };
132                    }
133                }
134                Strategy::NetworkTopologyStrategy {
135                    datacenter_repfactors,
136                } => {
137                    if let Some(dc) = datacenter {
138                        if let Some(repfactor) = datacenter_repfactors.get(dc) {
139                            return ReplicaSet {
140                                inner: ReplicaSetInner::Plain(
141                                    self.get_network_strategy_replicas(token, dc, *repfactor),
142                                ),
143                                token,
144                            };
145                        } else {
146                            debug!("Datacenter ({}) does not exist!", dc);
147                            return ReplicaSet {
148                                inner: ReplicaSetInner::Plain(EMPTY_REPLICAS),
149                                token,
150                            };
151                        }
152                    } else {
153                        return ReplicaSet {
154                            inner: ReplicaSetInner::ChainedNTS {
155                                datacenter_repfactors,
156                                locator: self,
157                                token,
158                            },
159                            token,
160                        };
161                    }
162                }
163                Strategy::Other { name, .. } => {
164                    debug!("Unknown strategy ({}), falling back to SimpleStrategy with replication_factor = 1", name)
165                }
166                _ => (),
167            }
168
169            // Fallback to simple strategy with replication factor = 1.
170            self.replicas_for_token(
171                token,
172                &Strategy::SimpleStrategy {
173                    replication_factor: 1,
174                },
175                datacenter,
176                table_spec,
177            )
178        }
179    }
180
181    /// Gives access to the token ring, based on which all token ranges/replica sets are computed.
182    pub fn ring(&self) -> &TokenRing<Arc<Node>> {
183        self.replication_data.get_global_ring()
184    }
185
186    /// Gives a list of all nodes in the token ring.
187    pub fn unique_nodes_in_global_ring(&self) -> &[Arc<Node>] {
188        self.replication_data.unique_nodes_in_global_ring()
189    }
190
191    /// Gives a list of all known datacenters.
192    pub fn datacenter_names(&self) -> &[String] {
193        self.datacenters.as_slice()
194    }
195
196    /// Gives a list of all nodes in a specified datacenter ring (which is created by filtering the
197    /// original ring to only contain nodes living in the specified datacenter).
198    pub fn unique_nodes_in_datacenter_ring<'a>(
199        &'a self,
200        datacenter_name: &str,
201    ) -> Option<&'a [Arc<Node>]> {
202        self.replication_data
203            .unique_nodes_in_datacenter_ring(datacenter_name)
204    }
205
206    fn get_simple_strategy_replicas(
207        &self,
208        token: Token,
209        replication_factor: usize,
210    ) -> ReplicasArray<'_> {
211        if replication_factor == 0 {
212            return EMPTY_REPLICAS;
213        }
214
215        if let Some(precomputed_replicas) = self
216            .precomputed_replicas
217            .get_precomputed_simple_strategy_replicas(token, replication_factor)
218        {
219            precomputed_replicas.into()
220        } else {
221            ReplicasArray::from_iter(
222                self.replication_data
223                    .simple_strategy_replicas(token, replication_factor),
224            )
225        }
226    }
227
228    fn get_network_strategy_replicas<'a>(
229        &'a self,
230        token: Token,
231        datacenter: &str,
232        datacenter_replication_factor: usize,
233    ) -> ReplicasArray<'a> {
234        if datacenter_replication_factor == 0 {
235            return EMPTY_REPLICAS;
236        }
237
238        if let Some(precomputed_replicas) = self
239            .precomputed_replicas
240            .get_precomputed_network_strategy_replicas(
241                token,
242                datacenter,
243                datacenter_replication_factor,
244            )
245        {
246            ReplicasArray::from(precomputed_replicas)
247        } else {
248            ReplicasArray::from_iter(self.replication_data.nts_replicas_in_datacenter(
249                token,
250                datacenter,
251                datacenter_replication_factor,
252            ))
253        }
254    }
255}
256
257fn with_computed_shard(node: NodeRef, token: Token) -> (NodeRef, Shard) {
258    let shard = node
259        .sharder()
260        .map(|sharder| sharder.shard_of(token))
261        .unwrap_or(0);
262    (node, shard)
263}
264
265#[derive(Debug)]
266enum ReplicaSetInner<'a> {
267    Plain(ReplicasArray<'a>),
268
269    PlainSharded(&'a [(Arc<Node>, Shard)]),
270
271    // Represents a set of SimpleStrategy replicas that is limited to a specified datacenter.
272    FilteredSimple {
273        replicas: ReplicasArray<'a>,
274        datacenter: &'a str,
275    },
276
277    // Represents a set of NetworkTopologyStrategy replicas that is not limited to any specific
278    // datacenter. The set is constructed lazily, by invoking
279    // `locator.get_network_strategy_replicas()`.
280    ChainedNTS {
281        datacenter_repfactors: &'a HashMap<String, usize>,
282        locator: &'a ReplicaLocator,
283        token: Token,
284    },
285}
286
287/// Represents a set of replicas for a given token and strategy;
288///
289/// This container can only be created by calling `ReplicaLocator::replicas_for_token`, and it
290/// can borrow precomputed replica lists living in the locator.
291#[derive(Debug)]
292pub struct ReplicaSet<'a> {
293    inner: ReplicaSetInner<'a>,
294    token: Token,
295}
296
297impl<'a> ReplicaSet<'a> {
298    /// Chooses a random replica that satisfies the given predicate.
299    pub fn choose_filtered<R>(
300        self,
301        rng: &mut R,
302        predicate: impl Fn(&(NodeRef<'a>, Shard)) -> bool,
303    ) -> Option<(NodeRef<'a>, Shard)>
304    where
305        R: Rng + ?Sized,
306    {
307        let happy = self.choose(rng)?;
308        if predicate(&happy) {
309            return Some(happy);
310        }
311
312        self.into_iter().filter(predicate).choose(rng)
313    }
314
315    /// Gets the size of the set.
316    ///
317    /// If the set represents `SimpleStrategy` replicas that were filtered by datacenter, this
318    /// function will have O(R) complexity, where R is the replication factor of that strategy.
319    ///
320    /// If the set represents `NetworkTopologyStrategy` replicas that were not filtered by
321    /// datacenter, this function will have O(D) complexity where D is the number of known
322    /// datacenters.
323    ///
324    /// In all other cases, the complexity is O(1)
325    pub fn len(&self) -> usize {
326        match &self.inner {
327            ReplicaSetInner::Plain(replicas) => replicas.len(),
328            ReplicaSetInner::PlainSharded(replicas) => replicas.len(),
329            ReplicaSetInner::FilteredSimple {
330                replicas,
331                datacenter,
332            } => replicas
333                .iter()
334                .filter(|node| node.datacenter.as_deref() == Some(*datacenter))
335                .count(),
336            ReplicaSetInner::ChainedNTS {
337                datacenter_repfactors,
338                locator,
339                token: _,
340            } => datacenter_repfactors
341                .iter()
342                .map(|(dc, rf)| {
343                    let unique_nodes_in_dc_count = locator
344                        .unique_nodes_in_datacenter_ring(dc)
345                        .map(|nodes| nodes.len())
346                        .unwrap_or(0);
347
348                    cmp::min(*rf, unique_nodes_in_dc_count)
349                })
350                .sum(),
351        }
352    }
353
354    /// Returns `true` if the replica set contains no elements.
355    ///
356    /// Complexity same as of `ReplicaSet::len`.
357    pub fn is_empty(&self) -> bool {
358        self.len() == 0
359    }
360
361    fn choose<R>(&self, rng: &mut R) -> Option<(NodeRef<'a>, Shard)>
362    where
363        R: Rng + ?Sized,
364    {
365        let len = self.len();
366        if len > 0 {
367            let index = rng.random_range(0..len);
368
369            match &self.inner {
370                ReplicaSetInner::Plain(replicas) => replicas
371                    .get(index)
372                    .map(|node| with_computed_shard(node, self.token)),
373                ReplicaSetInner::PlainSharded(replicas) => {
374                    replicas.get(index).map(|(node, shard)| (node, *shard))
375                }
376                ReplicaSetInner::FilteredSimple {
377                    replicas,
378                    datacenter,
379                } => replicas
380                    .iter()
381                    .filter(|node| node.datacenter.as_deref() == Some(*datacenter))
382                    .nth(index)
383                    .map(|node| with_computed_shard(node, self.token)),
384                ReplicaSetInner::ChainedNTS {
385                    datacenter_repfactors,
386                    locator,
387                    token,
388                } => {
389                    let mut nodes_to_skip = index;
390                    for datacenter in locator.datacenters.iter() {
391                        let requested_repfactor =
392                            *datacenter_repfactors.get(datacenter).unwrap_or(&0);
393                        let unique_nodes_in_dc_count = locator
394                            .unique_nodes_in_datacenter_ring(datacenter)
395                            .map(|nodes| nodes.len())
396                            .unwrap_or(0);
397
398                        let repfactor = cmp::min(requested_repfactor, unique_nodes_in_dc_count);
399
400                        if nodes_to_skip < repfactor {
401                            return locator
402                                .get_network_strategy_replicas(*token, datacenter, repfactor)
403                                .get(nodes_to_skip)
404                                .map(|node| with_computed_shard(node, self.token));
405                        }
406
407                        nodes_to_skip -= repfactor;
408                    }
409
410                    None
411                }
412            }
413        } else {
414            None
415        }
416    }
417}
418
419impl<'a> IntoIterator for ReplicaSet<'a> {
420    type Item = (NodeRef<'a>, Shard);
421    type IntoIter = ReplicaSetIterator<'a>;
422
423    /// Converts the replica set into iterator. Order defined by that iterator does not have to
424    /// match the order set by the token ring.
425    ///
426    /// Iterating through `ReplicaSet` using this method is far more efficient than invoking the
427    /// `get` method sequentially.
428    fn into_iter(self) -> Self::IntoIter {
429        let inner = match self.inner {
430            ReplicaSetInner::Plain(replicas) => ReplicaSetIteratorInner::Plain { replicas, idx: 0 },
431            ReplicaSetInner::PlainSharded(replicas) => {
432                ReplicaSetIteratorInner::PlainSharded { replicas, idx: 0 }
433            }
434            ReplicaSetInner::FilteredSimple {
435                replicas,
436                datacenter,
437            } => ReplicaSetIteratorInner::FilteredSimple {
438                replicas,
439                datacenter,
440                idx: 0,
441            },
442            ReplicaSetInner::ChainedNTS {
443                datacenter_repfactors,
444                locator,
445                token,
446            } => {
447                if let Some(datacenter) = locator.datacenters.first() {
448                    let repfactor = *datacenter_repfactors.get(datacenter.as_str()).unwrap_or(&0);
449                    ReplicaSetIteratorInner::ChainedNTS {
450                        replicas: locator
451                            .get_network_strategy_replicas(token, datacenter, repfactor),
452                        replicas_idx: 0,
453
454                        locator,
455                        token,
456                        datacenter_idx: 0,
457                        datacenter_repfactors,
458                    }
459                } else {
460                    ReplicaSetIteratorInner::Plain {
461                        replicas: EMPTY_REPLICAS,
462                        idx: 0,
463                    }
464                }
465            }
466        };
467
468        ReplicaSetIterator {
469            inner,
470            token: self.token,
471        }
472    }
473}
474
475#[derive(Clone)]
476enum ReplicaSetIteratorInner<'a> {
477    /// Token ring with SimpleStrategy, any datacenter
478    Plain {
479        replicas: ReplicasArray<'a>,
480        idx: usize,
481    },
482    /// Tablets
483    PlainSharded {
484        replicas: &'a [(Arc<Node>, Shard)],
485        idx: usize,
486    },
487    /// Token ring with SimpleStrategy, specific datacenter
488    FilteredSimple {
489        replicas: ReplicasArray<'a>,
490        datacenter: &'a str,
491        idx: usize,
492    },
493    /// Token ring with NetworkTopologyStrategy
494    ChainedNTS {
495        replicas: ReplicasArray<'a>,
496        replicas_idx: usize,
497
498        datacenter_repfactors: &'a HashMap<String, usize>,
499        locator: &'a ReplicaLocator,
500        token: Token,
501        datacenter_idx: usize,
502    },
503}
504
505/// Iterator that returns replicas from some replica set.
506#[derive(Clone)]
507pub struct ReplicaSetIterator<'a> {
508    inner: ReplicaSetIteratorInner<'a>,
509    token: Token,
510}
511
512impl<'a> Iterator for ReplicaSetIterator<'a> {
513    type Item = (NodeRef<'a>, Shard);
514
515    fn next(&mut self) -> Option<Self::Item> {
516        match &mut self.inner {
517            ReplicaSetIteratorInner::Plain { replicas, idx } => {
518                if let Some(replica) = replicas.get(*idx) {
519                    *idx += 1;
520                    return Some(with_computed_shard(replica, self.token));
521                }
522
523                None
524            }
525            ReplicaSetIteratorInner::PlainSharded { replicas, idx } => {
526                if let Some((replica, shard)) = replicas.get(*idx) {
527                    *idx += 1;
528                    return Some((replica, *shard));
529                }
530
531                None
532            }
533            ReplicaSetIteratorInner::FilteredSimple {
534                replicas,
535                datacenter,
536                idx,
537            } => {
538                while let Some(replica) = replicas.get(*idx) {
539                    *idx += 1;
540                    if replica.datacenter.as_deref() == Some(*datacenter) {
541                        return Some(with_computed_shard(replica, self.token));
542                    }
543                }
544
545                None
546            }
547            ReplicaSetIteratorInner::ChainedNTS {
548                replicas,
549                replicas_idx,
550                locator,
551                token,
552                datacenter_idx,
553                datacenter_repfactors,
554            } => {
555                if let Some(replica) = replicas.get(*replicas_idx) {
556                    *replicas_idx += 1;
557                    Some(with_computed_shard(replica, self.token))
558                } else if *datacenter_idx + 1 < locator.datacenters.len() {
559                    *datacenter_idx += 1;
560                    *replicas_idx = 0;
561
562                    let datacenter = &locator.datacenters[*datacenter_idx];
563                    let repfactor = *datacenter_repfactors.get(datacenter).unwrap_or(&0);
564                    *replicas =
565                        locator.get_network_strategy_replicas(*token, datacenter, repfactor);
566
567                    self.next()
568                } else {
569                    None
570                }
571            }
572        }
573    }
574
575    fn size_hint(&self) -> (usize, Option<usize>) {
576        match &self.inner {
577            ReplicaSetIteratorInner::Plain { replicas, idx } => {
578                let size = replicas.len() - *idx;
579
580                (size, Some(size))
581            }
582            ReplicaSetIteratorInner::PlainSharded { replicas, idx } => {
583                let size = replicas.len() - *idx;
584
585                (size, Some(size))
586            }
587            ReplicaSetIteratorInner::FilteredSimple {
588                replicas,
589                datacenter: _,
590                idx,
591            } => (0, Some(replicas.len() - *idx)),
592            ReplicaSetIteratorInner::ChainedNTS {
593                replicas: _,
594                replicas_idx: _,
595                datacenter_repfactors,
596                locator,
597                token: _,
598                datacenter_idx,
599            } => {
600                let yielded: usize = locator.datacenter_names()[0..*datacenter_idx]
601                    .iter()
602                    .filter_map(|name| datacenter_repfactors.get(name))
603                    .sum();
604
605                (
606                    0,
607                    Some(datacenter_repfactors.values().sum::<usize>() - yielded),
608                )
609            }
610        }
611    }
612
613    fn nth(&mut self, n: usize) -> Option<Self::Item> {
614        match &mut self.inner {
615            ReplicaSetIteratorInner::Plain { replicas: _, idx }
616            | ReplicaSetIteratorInner::PlainSharded { replicas: _, idx } => {
617                *idx += n;
618
619                self.next()
620            }
621            _ => {
622                for _i in 0..n {
623                    self.next()?;
624                }
625
626                self.next()
627            }
628        }
629    }
630}
631
632impl<'a> ReplicaSet<'a> {
633    pub fn into_replicas_ordered(self) -> ReplicasOrdered<'a> {
634        ReplicasOrdered { replica_set: self }
635    }
636}
637
638/// Represents a sequence of replicas for a given token and strategy,
639/// ordered according to the ring order (for token-ring tables) or with the
640/// order defined by tablet data (for tablet tables).
641///
642/// This container can only be created by calling `ReplicaSet::into_replicas_ordered()`,
643/// and either it can borrow precomputed replica lists living in the locator (in case of SimpleStrategy)
644/// or it must compute them on-demand (in case of NetworkTopologyStrategy).
645/// The computation is lazy (performed by `ReplicasOrderedIterator` upon call to `next()`).
646/// For obtaining the primary replica, no allocations are needed. Therefore, the first call
647/// to `next()` is optimised and does not allocate.
648/// For the remaining others, unfortunately, allocation is inevitable.
649pub struct ReplicasOrdered<'a> {
650    replica_set: ReplicaSet<'a>,
651}
652
653/// Iterator that returns replicas from some replica sequence, ordered according to the ring order.
654pub struct ReplicasOrderedIterator<'a> {
655    inner: ReplicasOrderedIteratorInner<'a>,
656}
657
658enum ReplicasOrderedIteratorInner<'a> {
659    AlreadyRingOrdered {
660        // In case of Plain, PlainSharded and FilteredSimple variants,
661        // ReplicaSetIterator respects ring order.
662        replica_set_iter: ReplicaSetIterator<'a>,
663    },
664    PolyDatacenterNTS {
665        // In case of ChainedNTS variant, ReplicaSetIterator does not respect ring order,
666        // so specific code is needed to yield replicas according to that order.
667        replicas_ordered_iter: ReplicasOrderedNTSIterator<'a>,
668    },
669}
670
671struct ReplicasOrderedNTSIterator<'a> {
672    token: Token,
673    inner: ReplicasOrderedNTSIteratorInner<'a>,
674}
675
676enum ReplicasOrderedNTSIteratorInner<'a> {
677    FreshForPick {
678        datacenter_repfactors: &'a HashMap<String, usize>,
679        locator: &'a ReplicaLocator,
680        token: Token,
681    },
682    Picked {
683        datacenter_repfactors: &'a HashMap<String, usize>,
684        locator: &'a ReplicaLocator,
685        token: Token,
686        picked: NodeRef<'a>,
687    },
688    ComputedFallback {
689        replicas: ReplicasArray<'a>,
690        idx: usize,
691    },
692}
693
694impl<'a> Iterator for ReplicasOrderedNTSIterator<'a> {
695    type Item = (NodeRef<'a>, Shard);
696
697    fn next(&mut self) -> Option<Self::Item> {
698        match self.inner {
699            ReplicasOrderedNTSIteratorInner::FreshForPick {
700                datacenter_repfactors,
701                locator,
702                token,
703            } => {
704                // We're going to find the primary replica for the given token.
705                let nodes_on_ring = locator.replication_data.get_global_ring().ring_range(token);
706                for node in nodes_on_ring {
707                    // If this node's DC has some replicas in this NTS...
708                    if let Some(dc) = &node.datacenter {
709                        if datacenter_repfactors.get(dc).is_some() {
710                            // ...then this node must be the primary replica.
711                            self.inner = ReplicasOrderedNTSIteratorInner::Picked {
712                                datacenter_repfactors,
713                                locator,
714                                token,
715                                picked: node,
716                            };
717                            return Some(with_computed_shard(node, self.token));
718                        }
719                    }
720                }
721                None
722            }
723            ReplicasOrderedNTSIteratorInner::Picked {
724                datacenter_repfactors,
725                locator,
726                token,
727                picked,
728            } => {
729                // Clippy can't check that in Eq and Hash impls we don't actually use any field with interior mutability
730                // (in Node only `down_marker` is such, being an AtomicBool).
731                // https://rust-lang.github.io/rust-clippy/master/index.html#mutable_key_type
732                #[allow(clippy::mutable_key_type)]
733                let mut all_replicas: HashSet<&'a Arc<Node>> = HashSet::new();
734                for (datacenter, repfactor) in datacenter_repfactors.iter() {
735                    all_replicas.extend(
736                        locator
737                            .get_network_strategy_replicas(token, datacenter, *repfactor)
738                            .iter(),
739                    );
740                }
741                // It's no use returning a node that was already picked.
742                all_replicas.remove(picked);
743
744                let mut replicas_ordered = vec![];
745                let nodes_on_ring = locator.replication_data.get_global_ring().ring_range(token);
746                for node in nodes_on_ring {
747                    if all_replicas.is_empty() {
748                        // All replicas were put in order.
749                        break;
750                    }
751                    if all_replicas.remove(node) {
752                        replicas_ordered.push(node);
753                    }
754                }
755                assert!(
756                    all_replicas.is_empty(),
757                    "all_replicas somehow contained a node that wasn't present in the global ring!"
758                );
759
760                self.inner = ReplicasOrderedNTSIteratorInner::ComputedFallback {
761                    replicas: ReplicasArray::Owned(replicas_ordered),
762                    idx: 0,
763                };
764                self.next()
765            }
766            ReplicasOrderedNTSIteratorInner::ComputedFallback {
767                ref replicas,
768                ref mut idx,
769            } => {
770                if let Some(replica) = replicas.get(*idx) {
771                    *idx += 1;
772                    Some(with_computed_shard(replica, self.token))
773                } else {
774                    None
775                }
776            }
777        }
778    }
779}
780
781impl<'a> Iterator for ReplicasOrderedIterator<'a> {
782    type Item = (NodeRef<'a>, Shard);
783
784    fn next(&mut self) -> Option<Self::Item> {
785        match &mut self.inner {
786            ReplicasOrderedIteratorInner::AlreadyRingOrdered { replica_set_iter } => {
787                replica_set_iter.next()
788            }
789            ReplicasOrderedIteratorInner::PolyDatacenterNTS {
790                replicas_ordered_iter,
791            } => replicas_ordered_iter.next(),
792        }
793    }
794}
795
796impl<'a> IntoIterator for ReplicasOrdered<'a> {
797    type Item = (NodeRef<'a>, Shard);
798    type IntoIter = ReplicasOrderedIterator<'a>;
799
800    fn into_iter(self) -> Self::IntoIter {
801        let Self { replica_set } = self;
802        Self::IntoIter {
803            inner: match replica_set.inner {
804                ReplicaSetInner::Plain(_) | ReplicaSetInner::FilteredSimple { .. } => {
805                    ReplicasOrderedIteratorInner::AlreadyRingOrdered {
806                        replica_set_iter: replica_set.into_iter(),
807                    }
808                }
809                ReplicaSetInner::PlainSharded(_) => {
810                    ReplicasOrderedIteratorInner::AlreadyRingOrdered {
811                        replica_set_iter: replica_set.into_iter(),
812                    }
813                }
814                ReplicaSetInner::ChainedNTS {
815                    datacenter_repfactors,
816                    locator,
817                    token,
818                } => ReplicasOrderedIteratorInner::PolyDatacenterNTS {
819                    replicas_ordered_iter: ReplicasOrderedNTSIterator {
820                        token: replica_set.token,
821                        inner: ReplicasOrderedNTSIteratorInner::FreshForPick {
822                            datacenter_repfactors,
823                            locator,
824                            token,
825                        },
826                    },
827                },
828            },
829        }
830    }
831}
832
833#[cfg(test)]
834mod tests {
835    use crate::{routing::locator::test::*, routing::Token, test_utils::setup_tracing};
836
837    #[tokio::test]
838    async fn test_replicas_ordered() {
839        setup_tracing();
840        let metadata = mock_metadata_for_token_aware_tests();
841        let locator = create_locator(&metadata);
842
843        // For each case (token, limit_to_dc, strategy), we are checking
844        // that ReplicasOrdered yields replicas in the expected order.
845        let check = |token, limit_to_dc, strategy, table, expected| {
846            let replica_set =
847                locator.replicas_for_token(Token::new(token), strategy, limit_to_dc, table);
848            let replicas_ordered = replica_set.into_replicas_ordered();
849            let ids: Vec<_> = replicas_ordered
850                .into_iter()
851                .map(|(node, _shard)| node.address.port())
852                .collect();
853            assert_eq!(expected, ids);
854        };
855
856        // In all these tests:
857        // going through the ring, we get order: F , A , C , D , G , B , E
858        //                                       us  eu  eu  us  eu  eu  us
859        //                                       r2  r1  r1  r1  r2  r1  r1
860        check(
861            160,
862            None,
863            &metadata
864                .keyspaces
865                .get(KEYSPACE_NTS_RF_3)
866                .unwrap()
867                .as_ref()
868                .unwrap()
869                .strategy,
870            TABLE_NTS_RF_3,
871            vec![F, A, C, D, G, E],
872        );
873        check(
874            160,
875            None,
876            &metadata
877                .keyspaces
878                .get(KEYSPACE_NTS_RF_2)
879                .unwrap()
880                .as_ref()
881                .unwrap()
882                .strategy,
883            TABLE_NTS_RF_2,
884            vec![F, A, D, G],
885        );
886        check(
887            160,
888            None,
889            &metadata
890                .keyspaces
891                .get(KEYSPACE_SS_RF_2)
892                .unwrap()
893                .as_ref()
894                .unwrap()
895                .strategy,
896            TABLE_SS_RF_2,
897            vec![F, A],
898        );
899
900        check(
901            160,
902            Some("eu"),
903            &metadata
904                .keyspaces
905                .get(KEYSPACE_NTS_RF_3)
906                .unwrap()
907                .as_ref()
908                .unwrap()
909                .strategy,
910            TABLE_NTS_RF_3,
911            vec![A, C, G],
912        );
913        check(
914            160,
915            Some("us"),
916            &metadata
917                .keyspaces
918                .get(KEYSPACE_NTS_RF_3)
919                .unwrap()
920                .as_ref()
921                .unwrap()
922                .strategy,
923            TABLE_NTS_RF_3,
924            vec![F, D, E],
925        );
926        check(
927            160,
928            Some("eu"),
929            &metadata
930                .keyspaces
931                .get(KEYSPACE_SS_RF_2)
932                .unwrap()
933                .as_ref()
934                .unwrap()
935                .strategy,
936            TABLE_SS_RF_2,
937            vec![A],
938        );
939    }
940}