1use scylla_cql::Consistency;
2use tracing::debug;
3
4use super::{RequestInfo, RetryDecision, RetryPolicy, RetrySession};
5use crate::errors::{DbError, RequestAttemptError, WriteType};
6
7#[derive(Debug)]
13pub struct DowngradingConsistencyRetryPolicy;
14
15impl DowngradingConsistencyRetryPolicy {
16 pub fn new() -> DowngradingConsistencyRetryPolicy {
17 DowngradingConsistencyRetryPolicy
18 }
19}
20
21impl Default for DowngradingConsistencyRetryPolicy {
22 fn default() -> DowngradingConsistencyRetryPolicy {
23 DowngradingConsistencyRetryPolicy::new()
24 }
25}
26
27impl RetryPolicy for DowngradingConsistencyRetryPolicy {
28 fn new_session(&self) -> Box<dyn RetrySession> {
29 Box::new(DowngradingConsistencyRetrySession::new())
30 }
31}
32
33pub struct DowngradingConsistencyRetrySession {
34 was_retry: bool,
35}
36
37impl DowngradingConsistencyRetrySession {
38 pub fn new() -> DowngradingConsistencyRetrySession {
39 DowngradingConsistencyRetrySession { was_retry: false }
40 }
41}
42
43impl Default for DowngradingConsistencyRetrySession {
44 fn default() -> DowngradingConsistencyRetrySession {
45 DowngradingConsistencyRetrySession::new()
46 }
47}
48
49impl RetrySession for DowngradingConsistencyRetrySession {
50 fn decide_should_retry(&mut self, request_info: RequestInfo) -> RetryDecision {
51 let cl = match request_info.consistency {
52 Consistency::Serial | Consistency::LocalSerial => {
53 return match request_info.error {
54 RequestAttemptError::DbError(DbError::Unavailable { .. }, _) => {
55 RetryDecision::RetryNextTarget(None)
59 }
60 _ => RetryDecision::DontRetry,
61 };
62 }
63 cl => cl,
64 };
65
66 fn max_likely_to_work_cl(known_ok: i32, previous_cl: Consistency) -> RetryDecision {
67 let decision = if known_ok >= 3 {
68 RetryDecision::RetrySameTarget(Some(Consistency::Three))
69 } else if known_ok == 2 {
70 RetryDecision::RetrySameTarget(Some(Consistency::Two))
71 } else if known_ok == 1 || previous_cl == Consistency::EachQuorum {
72 RetryDecision::RetrySameTarget(Some(Consistency::One))
76 } else {
77 RetryDecision::DontRetry
78 };
79 if let RetryDecision::RetrySameTarget(new_cl) = decision {
80 debug!(
81 "Decided to lower required consistency from {} to {:?}.",
82 previous_cl, new_cl
83 );
84 }
85 decision
86 }
87
88 match request_info.error {
89 RequestAttemptError::BrokenConnectionError(_)
92 | RequestAttemptError::DbError(DbError::Overloaded, _)
93 | RequestAttemptError::DbError(DbError::ServerError, _)
94 | RequestAttemptError::DbError(DbError::TruncateError, _) => {
95 if request_info.is_idempotent {
96 RetryDecision::RetryNextTarget(None)
97 } else {
98 RetryDecision::DontRetry
99 }
100 }
101 RequestAttemptError::DbError(DbError::Unavailable { alive, .. }, _) => {
104 if !self.was_retry {
105 self.was_retry = true;
106 max_likely_to_work_cl(*alive, cl)
107 } else {
108 RetryDecision::DontRetry
109 }
110 }
111 RequestAttemptError::DbError(
113 DbError::ReadTimeout {
114 received,
115 required,
116 data_present,
117 ..
118 },
119 _,
120 ) => {
121 if self.was_retry {
122 RetryDecision::DontRetry
123 } else if received < required {
124 self.was_retry = true;
125 max_likely_to_work_cl(*received, cl)
126 } else if !*data_present {
127 self.was_retry = true;
128 RetryDecision::RetrySameTarget(None)
129 } else {
130 RetryDecision::DontRetry
131 }
132 }
133 RequestAttemptError::DbError(
135 DbError::WriteTimeout {
136 write_type,
137 received,
138 ..
139 },
140 _,
141 ) => {
142 if self.was_retry || !request_info.is_idempotent {
143 RetryDecision::DontRetry
144 } else {
145 self.was_retry = true;
146 match write_type {
147 WriteType::Batch | WriteType::Simple if *received > 0 => {
148 RetryDecision::IgnoreWriteError
149 }
150
151 WriteType::UnloggedBatch => {
152 max_likely_to_work_cl(*received, cl)
155 }
156 WriteType::BatchLog => RetryDecision::RetrySameTarget(None),
157
158 _ => RetryDecision::DontRetry,
159 }
160 }
161 }
162 RequestAttemptError::DbError(DbError::IsBootstrapping, _) => {
164 RetryDecision::RetryNextTarget(None)
165 }
166 RequestAttemptError::UnableToAllocStreamId => RetryDecision::RetryNextTarget(None),
168 _ => RetryDecision::DontRetry,
170 }
171 }
172
173 fn reset(&mut self) {
174 *self = DowngradingConsistencyRetrySession::new();
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use bytes::Bytes;
181 use scylla_cql::frame::frame_errors::{BatchSerializationError, CqlRequestSerializationError};
182
183 use crate::errors::{BrokenConnectionErrorKind, RequestAttemptError};
184 use crate::test_utils::setup_tracing;
185
186 use super::*;
187
188 const CONSISTENCY_LEVELS: &[Consistency] = &[
189 Consistency::All,
190 Consistency::Any,
191 Consistency::EachQuorum,
192 Consistency::LocalOne,
193 Consistency::LocalQuorum,
194 Consistency::One,
195 Consistency::Quorum,
196 Consistency::Three,
197 Consistency::Two,
198 ];
199
200 fn make_request_info_with_cl(
201 error: &RequestAttemptError,
202 is_idempotent: bool,
203 cl: Consistency,
204 ) -> RequestInfo<'_> {
205 RequestInfo {
206 error,
207 is_idempotent,
208 consistency: cl,
209 }
210 }
211
212 fn downgrading_consistency_policy_assert_never_retries(
214 error: RequestAttemptError,
215 cl: Consistency,
216 ) {
217 let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
218 assert_eq!(
219 policy.decide_should_retry(make_request_info_with_cl(&error, false, cl)),
220 RetryDecision::DontRetry
221 );
222
223 let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
224 assert_eq!(
225 policy.decide_should_retry(make_request_info_with_cl(&error, true, cl)),
226 RetryDecision::DontRetry
227 );
228 }
229
230 #[test]
231 fn downgrading_consistency_never_retries() {
232 setup_tracing();
233 let never_retried_dberrors = vec![
234 DbError::SyntaxError,
235 DbError::Invalid,
236 DbError::AlreadyExists {
237 keyspace: String::new(),
238 table: String::new(),
239 },
240 DbError::FunctionFailure {
241 keyspace: String::new(),
242 function: String::new(),
243 arg_types: vec![],
244 },
245 DbError::AuthenticationError,
246 DbError::Unauthorized,
247 DbError::ConfigError,
248 DbError::ReadFailure {
249 consistency: Consistency::Two,
250 received: 1,
251 required: 2,
252 numfailures: 1,
253 data_present: false,
254 },
255 DbError::WriteFailure {
256 consistency: Consistency::Two,
257 received: 1,
258 required: 2,
259 numfailures: 1,
260 write_type: WriteType::BatchLog,
261 },
262 DbError::Unprepared {
263 statement_id: Bytes::from_static(b"deadbeef"),
264 },
265 DbError::ProtocolError,
266 DbError::Other(0x124816),
267 ];
268
269 for &cl in CONSISTENCY_LEVELS {
270 for dberror in never_retried_dberrors.clone() {
271 downgrading_consistency_policy_assert_never_retries(
272 RequestAttemptError::DbError(dberror, String::new()),
273 cl,
274 );
275 }
276
277 downgrading_consistency_policy_assert_never_retries(
278 RequestAttemptError::RepreparedIdMissingInBatch,
279 cl,
280 );
281 downgrading_consistency_policy_assert_never_retries(
282 RequestAttemptError::RepreparedIdChanged {
283 statement: String::new(),
284 expected_id: vec![],
285 reprepared_id: vec![],
286 },
287 cl,
288 );
289 downgrading_consistency_policy_assert_never_retries(
290 RequestAttemptError::CqlRequestSerialization(
291 CqlRequestSerializationError::BatchSerialization(
292 BatchSerializationError::TooManyStatements(u16::MAX as usize + 1),
293 ),
294 ),
295 cl,
296 );
297 }
298 }
299
300 fn downgrading_consistency_policy_assert_idempotent_next(
302 error: RequestAttemptError,
303 cl: Consistency,
304 ) {
305 let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
306 assert_eq!(
307 policy.decide_should_retry(make_request_info_with_cl(&error, false, cl)),
308 RetryDecision::DontRetry
309 );
310
311 let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
312 assert_eq!(
313 policy.decide_should_retry(make_request_info_with_cl(&error, true, cl)),
314 RetryDecision::RetryNextTarget(None)
315 );
316 }
317
318 fn max_likely_to_work_cl(known_ok: i32, current_cl: Consistency) -> RetryDecision {
319 if known_ok >= 3 {
320 RetryDecision::RetrySameTarget(Some(Consistency::Three))
321 } else if known_ok == 2 {
322 RetryDecision::RetrySameTarget(Some(Consistency::Two))
323 } else if known_ok == 1 || current_cl == Consistency::EachQuorum {
324 RetryDecision::RetrySameTarget(Some(Consistency::One))
328 } else {
329 RetryDecision::DontRetry
330 }
331 }
332
333 #[test]
334 fn downgrading_consistency_idempotent_next_retries() {
335 setup_tracing();
336 let idempotent_next_errors = vec![
337 RequestAttemptError::DbError(DbError::Overloaded, String::new()),
338 RequestAttemptError::DbError(DbError::TruncateError, String::new()),
339 RequestAttemptError::DbError(DbError::ServerError, String::new()),
340 RequestAttemptError::BrokenConnectionError(
341 BrokenConnectionErrorKind::TooManyOrphanedStreamIds(5).into(),
342 ),
343 ];
344
345 for &cl in CONSISTENCY_LEVELS {
346 for error in idempotent_next_errors.clone() {
347 downgrading_consistency_policy_assert_idempotent_next(error, cl);
348 }
349 }
350 }
351
352 #[test]
354 fn downgrading_consistency_bootstrapping() {
355 setup_tracing();
356 let error = RequestAttemptError::DbError(DbError::IsBootstrapping, String::new());
357
358 for &cl in CONSISTENCY_LEVELS {
359 let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
360 assert_eq!(
361 policy.decide_should_retry(make_request_info_with_cl(&error, false, cl)),
362 RetryDecision::RetryNextTarget(None)
363 );
364
365 let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
366 assert_eq!(
367 policy.decide_should_retry(make_request_info_with_cl(&error, true, cl)),
368 RetryDecision::RetryNextTarget(None)
369 );
370 }
371 }
372
373 #[test]
375 fn downgrading_consistency_unavailable() {
376 setup_tracing();
377 let alive = 1;
378 let error = RequestAttemptError::DbError(
379 DbError::Unavailable {
380 consistency: Consistency::Two,
381 required: 2,
382 alive,
383 },
384 String::new(),
385 );
386
387 for &cl in CONSISTENCY_LEVELS {
388 let mut policy_not_idempotent = DowngradingConsistencyRetryPolicy::new().new_session();
389 assert_eq!(
390 policy_not_idempotent
391 .decide_should_retry(make_request_info_with_cl(&error, false, cl)),
392 max_likely_to_work_cl(alive, cl)
393 );
394 assert_eq!(
395 policy_not_idempotent
396 .decide_should_retry(make_request_info_with_cl(&error, false, cl)),
397 RetryDecision::DontRetry
398 );
399
400 let mut policy_idempotent = DowngradingConsistencyRetryPolicy::new().new_session();
401 assert_eq!(
402 policy_idempotent.decide_should_retry(make_request_info_with_cl(&error, true, cl)),
403 max_likely_to_work_cl(alive, cl)
404 );
405 assert_eq!(
406 policy_idempotent.decide_should_retry(make_request_info_with_cl(&error, true, cl)),
407 RetryDecision::DontRetry
408 );
409 }
410 }
411
412 #[test]
414 fn downgrading_consistency_read_timeout() {
415 setup_tracing();
416 let enough_responses_no_data = RequestAttemptError::DbError(
418 DbError::ReadTimeout {
419 consistency: Consistency::Two,
420 received: 2,
421 required: 2,
422 data_present: false,
423 },
424 String::new(),
425 );
426
427 for &cl in CONSISTENCY_LEVELS {
428 let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
430 assert_eq!(
431 policy.decide_should_retry(make_request_info_with_cl(
432 &enough_responses_no_data,
433 false,
434 cl
435 )),
436 RetryDecision::RetrySameTarget(None)
437 );
438 assert_eq!(
439 policy.decide_should_retry(make_request_info_with_cl(
440 &enough_responses_no_data,
441 false,
442 cl
443 )),
444 RetryDecision::DontRetry
445 );
446
447 let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
449 assert_eq!(
450 policy.decide_should_retry(make_request_info_with_cl(
451 &enough_responses_no_data,
452 true,
453 cl
454 )),
455 RetryDecision::RetrySameTarget(None)
456 );
457 assert_eq!(
458 policy.decide_should_retry(make_request_info_with_cl(
459 &enough_responses_no_data,
460 true,
461 cl
462 )),
463 RetryDecision::DontRetry
464 );
465 }
466 let enough_responses_with_data = RequestAttemptError::DbError(
469 DbError::ReadTimeout {
470 consistency: Consistency::Two,
471 received: 2,
472 required: 2,
473 data_present: true,
474 },
475 String::new(),
476 );
477
478 for &cl in CONSISTENCY_LEVELS {
479 let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
481 assert_eq!(
482 policy.decide_should_retry(make_request_info_with_cl(
483 &enough_responses_with_data,
484 false,
485 cl
486 )),
487 RetryDecision::DontRetry
488 );
489
490 let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
492 assert_eq!(
493 policy.decide_should_retry(make_request_info_with_cl(
494 &enough_responses_with_data,
495 true,
496 cl
497 )),
498 RetryDecision::DontRetry
499 );
500 }
501
502 let received = 1;
504 let not_enough_responses_with_data = RequestAttemptError::DbError(
505 DbError::ReadTimeout {
506 consistency: Consistency::Two,
507 received,
508 required: 2,
509 data_present: true,
510 },
511 String::new(),
512 );
513 for &cl in CONSISTENCY_LEVELS {
514 let expected_decision = max_likely_to_work_cl(received, cl);
515
516 let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
518 assert_eq!(
519 policy.decide_should_retry(make_request_info_with_cl(
520 ¬_enough_responses_with_data,
521 false,
522 cl
523 )),
524 expected_decision
525 );
526 if let RetryDecision::RetrySameTarget(new_cl) = expected_decision {
527 assert_eq!(
528 policy.decide_should_retry(make_request_info_with_cl(
529 ¬_enough_responses_with_data,
530 false,
531 new_cl.unwrap_or(cl)
532 )),
533 RetryDecision::DontRetry
534 );
535 }
536
537 let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
539 assert_eq!(
540 policy.decide_should_retry(make_request_info_with_cl(
541 ¬_enough_responses_with_data,
542 true,
543 cl
544 )),
545 expected_decision
546 );
547 if let RetryDecision::RetrySameTarget(new_cl) = expected_decision {
548 assert_eq!(
549 policy.decide_should_retry(make_request_info_with_cl(
550 ¬_enough_responses_with_data,
551 true,
552 new_cl.unwrap_or(cl)
553 )),
554 RetryDecision::DontRetry
555 );
556 }
557 }
558 }
559
560 #[test]
562 fn downgrading_consistency_write_timeout() {
563 setup_tracing();
564 for (received, required) in (1..=5).zip(2..=6) {
565 let write_type_batchlog = RequestAttemptError::DbError(
567 DbError::WriteTimeout {
568 consistency: Consistency::Two,
569 received,
570 required,
571 write_type: WriteType::BatchLog,
572 },
573 String::new(),
574 );
575
576 for &cl in CONSISTENCY_LEVELS {
577 let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
579 assert_eq!(
580 policy.decide_should_retry(make_request_info_with_cl(
581 &write_type_batchlog,
582 false,
583 cl
584 )),
585 RetryDecision::DontRetry
586 );
587
588 let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
590 assert_eq!(
591 policy.decide_should_retry(make_request_info_with_cl(
592 &write_type_batchlog,
593 true,
594 cl
595 )),
596 RetryDecision::RetrySameTarget(None)
597 );
598 assert_eq!(
599 policy.decide_should_retry(make_request_info_with_cl(
600 &write_type_batchlog,
601 true,
602 cl
603 )),
604 RetryDecision::DontRetry
605 );
606 }
607
608 let write_type_unlogged_batch = RequestAttemptError::DbError(
610 DbError::WriteTimeout {
611 consistency: Consistency::Two,
612 received,
613 required,
614 write_type: WriteType::UnloggedBatch,
615 },
616 String::new(),
617 );
618
619 for &cl in CONSISTENCY_LEVELS {
620 let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
622 assert_eq!(
623 policy.decide_should_retry(make_request_info_with_cl(
624 &write_type_unlogged_batch,
625 false,
626 cl
627 )),
628 RetryDecision::DontRetry
629 );
630
631 let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
633 assert_eq!(
634 policy.decide_should_retry(make_request_info_with_cl(
635 &write_type_unlogged_batch,
636 true,
637 cl
638 )),
639 max_likely_to_work_cl(received, cl)
640 );
641 assert_eq!(
642 policy.decide_should_retry(make_request_info_with_cl(
643 &write_type_unlogged_batch,
644 true,
645 cl
646 )),
647 RetryDecision::DontRetry
648 );
649 }
650
651 let write_type_other = RequestAttemptError::DbError(
653 DbError::WriteTimeout {
654 consistency: Consistency::Two,
655 received,
656 required,
657 write_type: WriteType::Simple,
658 },
659 String::new(),
660 );
661
662 for &cl in CONSISTENCY_LEVELS {
663 let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
665 assert_eq!(
666 policy.decide_should_retry(make_request_info_with_cl(
667 &write_type_other,
668 false,
669 cl
670 )),
671 RetryDecision::DontRetry
672 );
673
674 let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
676 assert_eq!(
677 policy.decide_should_retry(make_request_info_with_cl(
678 &write_type_other,
679 true,
680 cl
681 )),
682 RetryDecision::IgnoreWriteError
683 );
684 assert_eq!(
685 policy.decide_should_retry(make_request_info_with_cl(
686 &write_type_other,
687 true,
688 cl
689 )),
690 RetryDecision::DontRetry
691 );
692 }
693 }
694 }
695}