scylla/routing/
partitioner.rs

1//! Partitioners are algorithms that can compute token for a given partition key,
2//! ultimately allowing optimised routing of requests (such that a request is routed
3//! to replicas, which are nodes and shards that really own the data the request concerns).
4//! Currently, two partitioners are supported:
5//! - Murmur3Partitioner
6//!     - the default partitioner,
7//!     - modified for compatibility with Cassandra's buggy implementation.
8//! - CDCPartitioner
9//!     - the partitioner employed when using CDC (_Change Data Capture_).
10
11use bytes::Buf;
12use scylla_cql::frame::types::RawValue;
13use scylla_cql::serialize::row::SerializedValues;
14use std::num::Wrapping;
15
16use crate::routing::Token;
17use crate::statement::prepared::TokenCalculationError;
18
19#[allow(clippy::upper_case_acronyms)]
20#[derive(Clone, PartialEq, Eq, Debug, Default)]
21#[non_exhaustive]
22pub enum PartitionerName {
23    #[default]
24    Murmur3,
25    CDC,
26}
27
28impl PartitionerName {
29    pub(crate) fn from_str(name: &str) -> Option<Self> {
30        if name.ends_with("Murmur3Partitioner") {
31            Some(PartitionerName::Murmur3)
32        } else if name.ends_with("CDCPartitioner") {
33            Some(PartitionerName::CDC)
34        } else {
35            None
36        }
37    }
38}
39
40impl sealed::Sealed for PartitionerName {}
41impl Partitioner for PartitionerName {
42    type Hasher = PartitionerHasherAny;
43
44    fn build_hasher(&self) -> Self::Hasher {
45        match self {
46            PartitionerName::Murmur3 => {
47                PartitionerHasherAny::Murmur3(Murmur3Partitioner.build_hasher())
48            }
49            PartitionerName::CDC => PartitionerHasherAny::CDC(CDCPartitioner.build_hasher()),
50        }
51    }
52}
53
54#[allow(clippy::upper_case_acronyms)]
55#[non_exhaustive]
56pub enum PartitionerHasherAny {
57    Murmur3(Murmur3PartitionerHasher),
58    CDC(CDCPartitionerHasher),
59}
60
61impl sealed::Sealed for PartitionerHasherAny {}
62impl PartitionerHasher for PartitionerHasherAny {
63    fn write(&mut self, pk_part: &[u8]) {
64        match self {
65            PartitionerHasherAny::Murmur3(h) => h.write(pk_part),
66            PartitionerHasherAny::CDC(h) => h.write(pk_part),
67        }
68    }
69
70    fn finish(&self) -> Token {
71        match self {
72            PartitionerHasherAny::Murmur3(h) => h.finish(),
73            PartitionerHasherAny::CDC(h) => h.finish(),
74        }
75    }
76}
77
78mod sealed {
79    // This is a sealed trait - its whole purpose is to be unnameable.
80    // This means we need to disable the check.
81    #[allow(unknown_lints)] // Rust 1.70 doesn't know this lint
82    #[allow(unnameable_types)]
83    pub trait Sealed {}
84}
85
86/// A trait for creating instances of `PartitionHasher`, which ultimately compute the token.
87///
88/// The Partitioners' design is based on std::hash design: `Partitioner`
89/// corresponds to `HasherBuilder`, and `PartitionerHasher` to `Hasher`.
90/// See their documentation for more details.
91pub trait Partitioner: sealed::Sealed {
92    type Hasher: PartitionerHasher;
93
94    fn build_hasher(&self) -> Self::Hasher;
95
96    #[allow(unused)] // Currently, no public API uses this.
97    fn hash_one(&self, data: &[u8]) -> Token {
98        let mut hasher = self.build_hasher();
99        hasher.write(data);
100        hasher.finish()
101    }
102}
103
104/// A trait for hashing a stream of serialized CQL values.
105///
106/// Instances of this trait are created by a `Partitioner` and are stateful.
107/// At any point, one can call `finish()` and a `Token` will be computed
108/// based on values that has been fed so far.
109pub trait PartitionerHasher: sealed::Sealed {
110    fn write(&mut self, pk_part: &[u8]);
111    fn finish(&self) -> Token;
112}
113
114pub struct Murmur3Partitioner;
115
116impl sealed::Sealed for Murmur3Partitioner {}
117impl Partitioner for Murmur3Partitioner {
118    type Hasher = Murmur3PartitionerHasher;
119
120    fn build_hasher(&self) -> Self::Hasher {
121        Self::Hasher {
122            total_len: 0,
123            buf: Default::default(),
124            h1: Wrapping(0),
125            h2: Wrapping(0),
126        }
127    }
128}
129
130pub struct Murmur3PartitionerHasher {
131    total_len: usize,
132    buf: [u8; Self::BUF_CAPACITY],
133    h1: Wrapping<i64>,
134    h2: Wrapping<i64>,
135}
136
137impl Murmur3PartitionerHasher {
138    const BUF_CAPACITY: usize = 16;
139
140    const C1: Wrapping<i64> = Wrapping(0x87c3_7b91_1142_53d5_u64 as i64);
141    const C2: Wrapping<i64> = Wrapping(0x4cf5_ad43_2745_937f_u64 as i64);
142
143    fn hash_16_bytes(&mut self, mut k1: Wrapping<i64>, mut k2: Wrapping<i64>) {
144        k1 *= Self::C1;
145        k1 = Self::rotl64(k1, 31);
146        k1 *= Self::C2;
147        self.h1 ^= k1;
148
149        self.h1 = Self::rotl64(self.h1, 27);
150        self.h1 += self.h2;
151        self.h1 = self.h1 * Wrapping(5) + Wrapping(0x52dce729);
152
153        k2 *= Self::C2;
154        k2 = Self::rotl64(k2, 33);
155        k2 *= Self::C1;
156        self.h2 ^= k2;
157
158        self.h2 = Self::rotl64(self.h2, 31);
159        self.h2 += self.h1;
160        self.h2 = self.h2 * Wrapping(5) + Wrapping(0x38495ab5);
161    }
162
163    fn fetch_16_bytes_from_buf(buf: &mut &[u8]) -> (Wrapping<i64>, Wrapping<i64>) {
164        let k1 = Wrapping(buf.get_i64_le());
165        let k2 = Wrapping(buf.get_i64_le());
166        (k1, k2)
167    }
168
169    #[inline]
170    fn rotl64(v: Wrapping<i64>, n: u32) -> Wrapping<i64> {
171        Wrapping((v.0 << n) | (v.0 as u64 >> (64 - n)) as i64)
172    }
173
174    #[inline]
175    fn fmix(mut k: Wrapping<i64>) -> Wrapping<i64> {
176        k ^= Wrapping((k.0 as u64 >> 33) as i64);
177        k *= Wrapping(0xff51afd7ed558ccd_u64 as i64);
178        k ^= Wrapping((k.0 as u64 >> 33) as i64);
179        k *= Wrapping(0xc4ceb9fe1a85ec53_u64 as i64);
180        k ^= Wrapping((k.0 as u64 >> 33) as i64);
181
182        k
183    }
184}
185
186impl sealed::Sealed for Murmur3PartitionerHasher {}
187
188// The implemented Murmur3 algorithm is roughly as follows:
189// 1. while there are at least 16 bytes given:
190//      consume 16 bytes by parsing them into i64s, then
191//      include them in h1, h2, k1, k2;
192// 2. do some magic with remaining n < 16 bytes,
193//      include them in h1, h2, k1, k2;
194// 3. compute the token based on h1, h2, k1, k2.
195//
196// Therefore, the buffer of capacity 16 is used. As soon as it gets full,
197// point 1. is executed. Points 2. and 3. are exclusively done in `finish()`,
198// so they don't mutate the state.
199impl PartitionerHasher for Murmur3PartitionerHasher {
200    fn write(&mut self, mut pk_part: &[u8]) {
201        let mut buf_len = self.total_len % Self::BUF_CAPACITY;
202        self.total_len += pk_part.len();
203
204        // If the buffer is nonempty and can be filled completely, so that we can fetch two i64s from it,
205        // fill it and hash its contents, then make it empty.
206        if buf_len > 0 && Self::BUF_CAPACITY - buf_len <= pk_part.len() {
207            // First phase: populate buffer until full, then consume two i64s.
208            let to_write = Ord::min(Self::BUF_CAPACITY - buf_len, pk_part.len());
209            self.buf[buf_len..buf_len + to_write].copy_from_slice(&pk_part[..to_write]);
210            pk_part.advance(to_write);
211            buf_len += to_write;
212
213            debug_assert_eq!(buf_len, Self::BUF_CAPACITY);
214            // consume 16 bytes from internal buf
215            let mut buf_ptr = &self.buf[..];
216            let (k1, k2) = Self::fetch_16_bytes_from_buf(&mut buf_ptr);
217            debug_assert!(buf_ptr.is_empty());
218            self.hash_16_bytes(k1, k2);
219            buf_len = 0;
220        }
221
222        // If there were enough data, now we have an empty buffer. Further data, if enough, can be hence
223        // hashed directly from the external buffer.
224        if buf_len == 0 {
225            // Second phase: fast path for big values.
226            while pk_part.len() >= Self::BUF_CAPACITY {
227                let (k1, k2) = Self::fetch_16_bytes_from_buf(&mut pk_part);
228                self.hash_16_bytes(k1, k2);
229            }
230        }
231
232        // Third phase: move remaining bytes to the buffer.
233        debug_assert!(pk_part.len() < Self::BUF_CAPACITY - buf_len);
234        let to_write = pk_part.len();
235        self.buf[buf_len..buf_len + to_write].copy_from_slice(&pk_part[..to_write]);
236        pk_part.advance(to_write);
237        buf_len += to_write;
238        debug_assert!(pk_part.is_empty());
239
240        debug_assert!(buf_len < Self::BUF_CAPACITY);
241    }
242
243    fn finish(&self) -> Token {
244        let mut h1 = self.h1;
245        let mut h2 = self.h2;
246
247        let mut k1 = Wrapping(0_i64);
248        let mut k2 = Wrapping(0_i64);
249
250        let buf_len = self.total_len % Self::BUF_CAPACITY;
251
252        if buf_len > 8 {
253            for i in (8..buf_len).rev() {
254                k2 ^= Wrapping(self.buf[i] as i8 as i64) << ((i - 8) * 8);
255            }
256
257            k2 *= Self::C2;
258            k2 = Self::rotl64(k2, 33);
259            k2 *= Self::C1;
260            h2 ^= k2;
261        }
262
263        if buf_len > 0 {
264            for i in (0..std::cmp::min(8, buf_len)).rev() {
265                k1 ^= Wrapping(self.buf[i] as i8 as i64) << (i * 8);
266            }
267
268            k1 *= Self::C1;
269            k1 = Self::rotl64(k1, 31);
270            k1 *= Self::C2;
271            h1 ^= k1;
272        }
273
274        h1 ^= Wrapping(self.total_len as i64);
275        h2 ^= Wrapping(self.total_len as i64);
276
277        h1 += h2;
278        h2 += h1;
279
280        h1 = Self::fmix(h1);
281        h2 = Self::fmix(h2);
282
283        h1 += h2;
284        h2 += h1;
285
286        Token::new((((h2.0 as i128) << 64) | h1.0 as i128) as i64)
287    }
288}
289
290enum CDCPartitionerHasherState {
291    Feeding {
292        len: usize,
293        buf: [u8; CDCPartitionerHasher::BUF_CAPACITY],
294    },
295    Computed(Token),
296}
297
298pub struct CDCPartitioner;
299
300pub struct CDCPartitionerHasher {
301    state: CDCPartitionerHasherState,
302}
303
304impl sealed::Sealed for CDCPartitioner {}
305impl Partitioner for CDCPartitioner {
306    type Hasher = CDCPartitionerHasher;
307
308    fn build_hasher(&self) -> Self::Hasher {
309        Self::Hasher {
310            state: CDCPartitionerHasherState::Feeding {
311                len: 0,
312                buf: Default::default(),
313            },
314        }
315    }
316}
317
318impl CDCPartitionerHasher {
319    const BUF_CAPACITY: usize = 8;
320}
321
322impl sealed::Sealed for CDCPartitionerHasher {}
323impl PartitionerHasher for CDCPartitionerHasher {
324    fn write(&mut self, pk_part: &[u8]) {
325        match &mut self.state {
326            CDCPartitionerHasherState::Feeding { len, buf } => {
327                // We feed the buffer until it's full.
328                let copied_len = Ord::min(pk_part.len(), Self::BUF_CAPACITY - *len);
329                buf[*len..*len + copied_len].copy_from_slice(&pk_part[..copied_len]);
330                *len += copied_len;
331
332                // If the buffer is full, we can compute and fix the token.
333                if *len == Self::BUF_CAPACITY {
334                    let token = Token::new((&mut &buf[..]).get_i64());
335                    self.state = CDCPartitionerHasherState::Computed(token);
336                }
337            }
338            CDCPartitionerHasherState::Computed(_) => (),
339        }
340    }
341
342    fn finish(&self) -> Token {
343        match self.state {
344            // Looking at Scylla code it seems that here we actually want token with this value.
345            // If the value is too short Scylla returns `dht::minimum_token()`:
346            // https://github.com/scylladb/scylladb/blob/4be70bfc2bc7f133cab492b4aac7bab9c790a48c/cdc/cdc_partitioner.cc#L32
347            // When you call `long_token` on `minimum_token` it will actually return `i64::MIN`:
348            // https://github.com/scylladb/scylladb/blob/0a7854ea4de04f20b71326ba5940b5fac6f7241a/dht/token.cc#L21-L35
349            CDCPartitionerHasherState::Feeding { .. } => Token::INVALID,
350            CDCPartitionerHasherState::Computed(token) => token,
351        }
352    }
353}
354
355/// Calculates the token for given partitioner and serialized partition key.
356///
357/// The ordinary way to calculate token is based on a PreparedStatement
358/// and values for that statement. However, if a user knows:
359/// - the order of the columns in the partition key,
360/// - the values of the columns of the partition key,
361/// - the partitioner of the table that the statement operates on,
362///
363/// then having a `PreparedStatement` is not necessary and the token can
364/// be calculated based on that information.
365///
366/// NOTE: the provided values must completely constitute partition key
367/// and be in the order defined in CREATE TABLE statement.
368pub(crate) fn calculate_token_for_partition_key(
369    serialized_partition_key_values: &SerializedValues,
370    partitioner: &PartitionerName,
371) -> Result<Token, TokenCalculationError> {
372    let mut partitioner_hasher = partitioner.build_hasher();
373
374    if serialized_partition_key_values.element_count() == 1 {
375        let val = serialized_partition_key_values.iter().next().unwrap();
376        if let RawValue::Value(val) = val {
377            partitioner_hasher.write(val);
378        }
379    } else {
380        for val in serialized_partition_key_values
381            .iter()
382            .filter_map(|rv| rv.as_value())
383        {
384            let val_len_u16: u16 = val
385                .len()
386                .try_into()
387                .map_err(|_| TokenCalculationError::ValueTooLong(val.len()))?;
388            partitioner_hasher.write(&val_len_u16.to_be_bytes());
389            partitioner_hasher.write(val);
390            partitioner_hasher.write(&[0u8]);
391        }
392    }
393
394    Ok(partitioner_hasher.finish())
395}
396
397#[cfg(test)]
398mod tests {
399    use rand::Rng;
400    use rand_pcg::Pcg32;
401
402    use crate::test_utils::setup_tracing;
403
404    use super::{CDCPartitioner, Murmur3Partitioner, Partitioner, PartitionerHasher};
405
406    fn assert_correct_murmur3_hash(pk: &'static str, expected_hash: i64) {
407        let hash = Murmur3Partitioner.hash_one(pk.as_bytes()).value();
408        assert_eq!(hash, expected_hash);
409    }
410
411    #[test]
412    fn test_murmur3_partitioner() {
413        setup_tracing();
414        for s in [
415            ("test", -6017608668500074083),
416            ("xd", 4507812186440344727),
417            ("primary_key", -1632642444691073360),
418            ("kremówki", 4354931215268080151),
419        ] {
420            assert_correct_murmur3_hash(s.0, s.1);
421        }
422    }
423
424    fn assert_correct_cdc_hash(pk: &'static str, expected_hash: i64) {
425        let hash = CDCPartitioner.hash_one(pk.as_bytes()).value();
426        assert_eq!(hash, expected_hash);
427    }
428
429    #[test]
430    fn partitioners_output_same_result_no_matter_how_input_is_partitioned() {
431        setup_tracing();
432        let inputs: &[&[u8]] = &[
433            b"",
434            b"0",
435            "Ala ma kota, a kota ma Ala.".as_bytes(),
436            "Zażółć gęślą jaźń. Wsiadł rycerz Szaławiła na bułanego konia. Litwo, ojczyzno moja, ...".as_bytes(),
437        ];
438
439        let seed = 0x2137;
440        let mut randgen = Pcg32::new(seed, 0);
441
442        // Splits the given data 2^n times and feeds partitioner with the chunks got.
443        fn split_and_feed(
444            randgen: &mut impl Rng,
445            partitioner: &mut impl PartitionerHasher,
446            data: &[u8],
447            n: usize,
448        ) {
449            if n == 0 {
450                partitioner.write(data);
451            } else {
452                let pivot = if !data.is_empty() {
453                    randgen.random_range(0..data.len())
454                } else {
455                    0
456                };
457                let (data1, data2) = data.split_at(pivot);
458                for data in [data1, data2] {
459                    split_and_feed(randgen, partitioner, data, n - 1);
460                }
461            }
462        }
463
464        fn check_for_partitioner<P: Partitioner>(
465            partitioner: P,
466            randgen: &mut impl Rng,
467            input: &[u8],
468        ) {
469            let result_single_batch = partitioner.hash_one(input);
470
471            let results_chunks = (0..1000).map(|_| {
472                let mut partitioner_hasher = partitioner.build_hasher();
473                split_and_feed(randgen, &mut partitioner_hasher, input, 2);
474                partitioner_hasher.finish()
475            });
476
477            for result_chunk in results_chunks {
478                assert_eq!(result_single_batch, result_chunk)
479            }
480        }
481
482        for input in inputs {
483            check_for_partitioner(Murmur3Partitioner, &mut randgen, input);
484            check_for_partitioner(CDCPartitioner, &mut randgen, input);
485        }
486    }
487
488    #[test]
489    fn test_cdc_partitioner() {
490        setup_tracing();
491        for s in [
492            ("test", -9223372036854775808),
493            ("xd", -9223372036854775808),
494            ("primary_key", 8102654598100187487),
495            ("kremówki", 7742362231512463211),
496        ] {
497            assert_correct_cdc_hash(s.0, s.1);
498        }
499    }
500}