1use bytes::Buf;
12use scylla_cql::frame::types::RawValue;
13use scylla_cql::serialize::row::SerializedValues;
14use std::num::Wrapping;
15
16use crate::routing::Token;
17use crate::statement::prepared::TokenCalculationError;
18
19#[allow(clippy::upper_case_acronyms)]
20#[derive(Clone, PartialEq, Eq, Debug, Default)]
21#[non_exhaustive]
22pub enum PartitionerName {
23 #[default]
24 Murmur3,
25 CDC,
26}
27
28impl PartitionerName {
29 pub(crate) fn from_str(name: &str) -> Option<Self> {
30 if name.ends_with("Murmur3Partitioner") {
31 Some(PartitionerName::Murmur3)
32 } else if name.ends_with("CDCPartitioner") {
33 Some(PartitionerName::CDC)
34 } else {
35 None
36 }
37 }
38}
39
40impl sealed::Sealed for PartitionerName {}
41impl Partitioner for PartitionerName {
42 type Hasher = PartitionerHasherAny;
43
44 fn build_hasher(&self) -> Self::Hasher {
45 match self {
46 PartitionerName::Murmur3 => {
47 PartitionerHasherAny::Murmur3(Murmur3Partitioner.build_hasher())
48 }
49 PartitionerName::CDC => PartitionerHasherAny::CDC(CDCPartitioner.build_hasher()),
50 }
51 }
52}
53
54#[allow(clippy::upper_case_acronyms)]
55#[non_exhaustive]
56pub enum PartitionerHasherAny {
57 Murmur3(Murmur3PartitionerHasher),
58 CDC(CDCPartitionerHasher),
59}
60
61impl sealed::Sealed for PartitionerHasherAny {}
62impl PartitionerHasher for PartitionerHasherAny {
63 fn write(&mut self, pk_part: &[u8]) {
64 match self {
65 PartitionerHasherAny::Murmur3(h) => h.write(pk_part),
66 PartitionerHasherAny::CDC(h) => h.write(pk_part),
67 }
68 }
69
70 fn finish(&self) -> Token {
71 match self {
72 PartitionerHasherAny::Murmur3(h) => h.finish(),
73 PartitionerHasherAny::CDC(h) => h.finish(),
74 }
75 }
76}
77
78mod sealed {
79 #[allow(unknown_lints)] #[allow(unnameable_types)]
83 pub trait Sealed {}
84}
85
86pub trait Partitioner: sealed::Sealed {
92 type Hasher: PartitionerHasher;
93
94 fn build_hasher(&self) -> Self::Hasher;
95
96 #[allow(unused)] fn hash_one(&self, data: &[u8]) -> Token {
98 let mut hasher = self.build_hasher();
99 hasher.write(data);
100 hasher.finish()
101 }
102}
103
104pub trait PartitionerHasher: sealed::Sealed {
110 fn write(&mut self, pk_part: &[u8]);
111 fn finish(&self) -> Token;
112}
113
114pub struct Murmur3Partitioner;
115
116impl sealed::Sealed for Murmur3Partitioner {}
117impl Partitioner for Murmur3Partitioner {
118 type Hasher = Murmur3PartitionerHasher;
119
120 fn build_hasher(&self) -> Self::Hasher {
121 Self::Hasher {
122 total_len: 0,
123 buf: Default::default(),
124 h1: Wrapping(0),
125 h2: Wrapping(0),
126 }
127 }
128}
129
130pub struct Murmur3PartitionerHasher {
131 total_len: usize,
132 buf: [u8; Self::BUF_CAPACITY],
133 h1: Wrapping<i64>,
134 h2: Wrapping<i64>,
135}
136
137impl Murmur3PartitionerHasher {
138 const BUF_CAPACITY: usize = 16;
139
140 const C1: Wrapping<i64> = Wrapping(0x87c3_7b91_1142_53d5_u64 as i64);
141 const C2: Wrapping<i64> = Wrapping(0x4cf5_ad43_2745_937f_u64 as i64);
142
143 fn hash_16_bytes(&mut self, mut k1: Wrapping<i64>, mut k2: Wrapping<i64>) {
144 k1 *= Self::C1;
145 k1 = Self::rotl64(k1, 31);
146 k1 *= Self::C2;
147 self.h1 ^= k1;
148
149 self.h1 = Self::rotl64(self.h1, 27);
150 self.h1 += self.h2;
151 self.h1 = self.h1 * Wrapping(5) + Wrapping(0x52dce729);
152
153 k2 *= Self::C2;
154 k2 = Self::rotl64(k2, 33);
155 k2 *= Self::C1;
156 self.h2 ^= k2;
157
158 self.h2 = Self::rotl64(self.h2, 31);
159 self.h2 += self.h1;
160 self.h2 = self.h2 * Wrapping(5) + Wrapping(0x38495ab5);
161 }
162
163 fn fetch_16_bytes_from_buf(buf: &mut &[u8]) -> (Wrapping<i64>, Wrapping<i64>) {
164 let k1 = Wrapping(buf.get_i64_le());
165 let k2 = Wrapping(buf.get_i64_le());
166 (k1, k2)
167 }
168
169 #[inline]
170 fn rotl64(v: Wrapping<i64>, n: u32) -> Wrapping<i64> {
171 Wrapping((v.0 << n) | (v.0 as u64 >> (64 - n)) as i64)
172 }
173
174 #[inline]
175 fn fmix(mut k: Wrapping<i64>) -> Wrapping<i64> {
176 k ^= Wrapping((k.0 as u64 >> 33) as i64);
177 k *= Wrapping(0xff51afd7ed558ccd_u64 as i64);
178 k ^= Wrapping((k.0 as u64 >> 33) as i64);
179 k *= Wrapping(0xc4ceb9fe1a85ec53_u64 as i64);
180 k ^= Wrapping((k.0 as u64 >> 33) as i64);
181
182 k
183 }
184}
185
186impl sealed::Sealed for Murmur3PartitionerHasher {}
187
188impl PartitionerHasher for Murmur3PartitionerHasher {
200 fn write(&mut self, mut pk_part: &[u8]) {
201 let mut buf_len = self.total_len % Self::BUF_CAPACITY;
202 self.total_len += pk_part.len();
203
204 if buf_len > 0 && Self::BUF_CAPACITY - buf_len <= pk_part.len() {
207 let to_write = Ord::min(Self::BUF_CAPACITY - buf_len, pk_part.len());
209 self.buf[buf_len..buf_len + to_write].copy_from_slice(&pk_part[..to_write]);
210 pk_part.advance(to_write);
211 buf_len += to_write;
212
213 debug_assert_eq!(buf_len, Self::BUF_CAPACITY);
214 let mut buf_ptr = &self.buf[..];
216 let (k1, k2) = Self::fetch_16_bytes_from_buf(&mut buf_ptr);
217 debug_assert!(buf_ptr.is_empty());
218 self.hash_16_bytes(k1, k2);
219 buf_len = 0;
220 }
221
222 if buf_len == 0 {
225 while pk_part.len() >= Self::BUF_CAPACITY {
227 let (k1, k2) = Self::fetch_16_bytes_from_buf(&mut pk_part);
228 self.hash_16_bytes(k1, k2);
229 }
230 }
231
232 debug_assert!(pk_part.len() < Self::BUF_CAPACITY - buf_len);
234 let to_write = pk_part.len();
235 self.buf[buf_len..buf_len + to_write].copy_from_slice(&pk_part[..to_write]);
236 pk_part.advance(to_write);
237 buf_len += to_write;
238 debug_assert!(pk_part.is_empty());
239
240 debug_assert!(buf_len < Self::BUF_CAPACITY);
241 }
242
243 fn finish(&self) -> Token {
244 let mut h1 = self.h1;
245 let mut h2 = self.h2;
246
247 let mut k1 = Wrapping(0_i64);
248 let mut k2 = Wrapping(0_i64);
249
250 let buf_len = self.total_len % Self::BUF_CAPACITY;
251
252 if buf_len > 8 {
253 for i in (8..buf_len).rev() {
254 k2 ^= Wrapping(self.buf[i] as i8 as i64) << ((i - 8) * 8);
255 }
256
257 k2 *= Self::C2;
258 k2 = Self::rotl64(k2, 33);
259 k2 *= Self::C1;
260 h2 ^= k2;
261 }
262
263 if buf_len > 0 {
264 for i in (0..std::cmp::min(8, buf_len)).rev() {
265 k1 ^= Wrapping(self.buf[i] as i8 as i64) << (i * 8);
266 }
267
268 k1 *= Self::C1;
269 k1 = Self::rotl64(k1, 31);
270 k1 *= Self::C2;
271 h1 ^= k1;
272 }
273
274 h1 ^= Wrapping(self.total_len as i64);
275 h2 ^= Wrapping(self.total_len as i64);
276
277 h1 += h2;
278 h2 += h1;
279
280 h1 = Self::fmix(h1);
281 h2 = Self::fmix(h2);
282
283 h1 += h2;
284 h2 += h1;
285
286 Token::new((((h2.0 as i128) << 64) | h1.0 as i128) as i64)
287 }
288}
289
290enum CDCPartitionerHasherState {
291 Feeding {
292 len: usize,
293 buf: [u8; CDCPartitionerHasher::BUF_CAPACITY],
294 },
295 Computed(Token),
296}
297
298pub struct CDCPartitioner;
299
300pub struct CDCPartitionerHasher {
301 state: CDCPartitionerHasherState,
302}
303
304impl sealed::Sealed for CDCPartitioner {}
305impl Partitioner for CDCPartitioner {
306 type Hasher = CDCPartitionerHasher;
307
308 fn build_hasher(&self) -> Self::Hasher {
309 Self::Hasher {
310 state: CDCPartitionerHasherState::Feeding {
311 len: 0,
312 buf: Default::default(),
313 },
314 }
315 }
316}
317
318impl CDCPartitionerHasher {
319 const BUF_CAPACITY: usize = 8;
320}
321
322impl sealed::Sealed for CDCPartitionerHasher {}
323impl PartitionerHasher for CDCPartitionerHasher {
324 fn write(&mut self, pk_part: &[u8]) {
325 match &mut self.state {
326 CDCPartitionerHasherState::Feeding { len, buf } => {
327 let copied_len = Ord::min(pk_part.len(), Self::BUF_CAPACITY - *len);
329 buf[*len..*len + copied_len].copy_from_slice(&pk_part[..copied_len]);
330 *len += copied_len;
331
332 if *len == Self::BUF_CAPACITY {
334 let token = Token::new((&mut &buf[..]).get_i64());
335 self.state = CDCPartitionerHasherState::Computed(token);
336 }
337 }
338 CDCPartitionerHasherState::Computed(_) => (),
339 }
340 }
341
342 fn finish(&self) -> Token {
343 match self.state {
344 CDCPartitionerHasherState::Feeding { .. } => Token::INVALID,
350 CDCPartitionerHasherState::Computed(token) => token,
351 }
352 }
353}
354
355pub(crate) fn calculate_token_for_partition_key(
369 serialized_partition_key_values: &SerializedValues,
370 partitioner: &PartitionerName,
371) -> Result<Token, TokenCalculationError> {
372 let mut partitioner_hasher = partitioner.build_hasher();
373
374 if serialized_partition_key_values.element_count() == 1 {
375 let val = serialized_partition_key_values.iter().next().unwrap();
376 if let RawValue::Value(val) = val {
377 partitioner_hasher.write(val);
378 }
379 } else {
380 for val in serialized_partition_key_values
381 .iter()
382 .filter_map(|rv| rv.as_value())
383 {
384 let val_len_u16: u16 = val
385 .len()
386 .try_into()
387 .map_err(|_| TokenCalculationError::ValueTooLong(val.len()))?;
388 partitioner_hasher.write(&val_len_u16.to_be_bytes());
389 partitioner_hasher.write(val);
390 partitioner_hasher.write(&[0u8]);
391 }
392 }
393
394 Ok(partitioner_hasher.finish())
395}
396
397#[cfg(test)]
398mod tests {
399 use rand::Rng;
400 use rand_pcg::Pcg32;
401
402 use crate::test_utils::setup_tracing;
403
404 use super::{CDCPartitioner, Murmur3Partitioner, Partitioner, PartitionerHasher};
405
406 fn assert_correct_murmur3_hash(pk: &'static str, expected_hash: i64) {
407 let hash = Murmur3Partitioner.hash_one(pk.as_bytes()).value();
408 assert_eq!(hash, expected_hash);
409 }
410
411 #[test]
412 fn test_murmur3_partitioner() {
413 setup_tracing();
414 for s in [
415 ("test", -6017608668500074083),
416 ("xd", 4507812186440344727),
417 ("primary_key", -1632642444691073360),
418 ("kremówki", 4354931215268080151),
419 ] {
420 assert_correct_murmur3_hash(s.0, s.1);
421 }
422 }
423
424 fn assert_correct_cdc_hash(pk: &'static str, expected_hash: i64) {
425 let hash = CDCPartitioner.hash_one(pk.as_bytes()).value();
426 assert_eq!(hash, expected_hash);
427 }
428
429 #[test]
430 fn partitioners_output_same_result_no_matter_how_input_is_partitioned() {
431 setup_tracing();
432 let inputs: &[&[u8]] = &[
433 b"",
434 b"0",
435 "Ala ma kota, a kota ma Ala.".as_bytes(),
436 "Zażółć gęślą jaźń. Wsiadł rycerz Szaławiła na bułanego konia. Litwo, ojczyzno moja, ...".as_bytes(),
437 ];
438
439 let seed = 0x2137;
440 let mut randgen = Pcg32::new(seed, 0);
441
442 fn split_and_feed(
444 randgen: &mut impl Rng,
445 partitioner: &mut impl PartitionerHasher,
446 data: &[u8],
447 n: usize,
448 ) {
449 if n == 0 {
450 partitioner.write(data);
451 } else {
452 let pivot = if !data.is_empty() {
453 randgen.random_range(0..data.len())
454 } else {
455 0
456 };
457 let (data1, data2) = data.split_at(pivot);
458 for data in [data1, data2] {
459 split_and_feed(randgen, partitioner, data, n - 1);
460 }
461 }
462 }
463
464 fn check_for_partitioner<P: Partitioner>(
465 partitioner: P,
466 randgen: &mut impl Rng,
467 input: &[u8],
468 ) {
469 let result_single_batch = partitioner.hash_one(input);
470
471 let results_chunks = (0..1000).map(|_| {
472 let mut partitioner_hasher = partitioner.build_hasher();
473 split_and_feed(randgen, &mut partitioner_hasher, input, 2);
474 partitioner_hasher.finish()
475 });
476
477 for result_chunk in results_chunks {
478 assert_eq!(result_single_batch, result_chunk)
479 }
480 }
481
482 for input in inputs {
483 check_for_partitioner(Murmur3Partitioner, &mut randgen, input);
484 check_for_partitioner(CDCPartitioner, &mut randgen, input);
485 }
486 }
487
488 #[test]
489 fn test_cdc_partitioner() {
490 setup_tracing();
491 for s in [
492 ("test", -9223372036854775808),
493 ("xd", -9223372036854775808),
494 ("primary_key", 8102654598100187487),
495 ("kremówki", 7742362231512463211),
496 ] {
497 assert_correct_cdc_hash(s.0, s.1);
498 }
499 }
500}