scylla/policies/retry/
downgrading_consistency.rs

1use scylla_cql::Consistency;
2use tracing::debug;
3
4use super::{RequestInfo, RetryDecision, RetryPolicy, RetrySession};
5use crate::errors::{DbError, RequestAttemptError, WriteType};
6
7/// Downgrading consistency retry policy - retries with lower consistency level if it knows\
8/// that the initial CL is unreachable. Also, it behaves as [DefaultRetryPolicy](crate::policies::retry::DefaultRetryPolicy)
9/// when it believes that the initial CL is reachable.
10/// Behaviour based on [DataStax Java Driver]\
11///(<https://docs.datastax.com/en/drivers/java/3.11/com/datastax/driver/core/policies/DowngradingConsistencyRetryPolicy.html>)
12#[derive(Debug)]
13pub struct DowngradingConsistencyRetryPolicy;
14
15impl DowngradingConsistencyRetryPolicy {
16    pub fn new() -> DowngradingConsistencyRetryPolicy {
17        DowngradingConsistencyRetryPolicy
18    }
19}
20
21impl Default for DowngradingConsistencyRetryPolicy {
22    fn default() -> DowngradingConsistencyRetryPolicy {
23        DowngradingConsistencyRetryPolicy::new()
24    }
25}
26
27impl RetryPolicy for DowngradingConsistencyRetryPolicy {
28    fn new_session(&self) -> Box<dyn RetrySession> {
29        Box::new(DowngradingConsistencyRetrySession::new())
30    }
31}
32
33pub struct DowngradingConsistencyRetrySession {
34    was_retry: bool,
35}
36
37impl DowngradingConsistencyRetrySession {
38    pub fn new() -> DowngradingConsistencyRetrySession {
39        DowngradingConsistencyRetrySession { was_retry: false }
40    }
41}
42
43impl Default for DowngradingConsistencyRetrySession {
44    fn default() -> DowngradingConsistencyRetrySession {
45        DowngradingConsistencyRetrySession::new()
46    }
47}
48
49impl RetrySession for DowngradingConsistencyRetrySession {
50    fn decide_should_retry(&mut self, request_info: RequestInfo) -> RetryDecision {
51        let cl = match request_info.consistency {
52            Consistency::Serial | Consistency::LocalSerial => {
53                return match request_info.error {
54                    RequestAttemptError::DbError(DbError::Unavailable { .. }, _) => {
55                        // JAVA-764: if the requested consistency level is serial, it means that the operation failed at
56                        // the paxos phase of a LWT.
57                        // Retry on the next target, on the assumption that the initial coordinator could be network-isolated.
58                        RetryDecision::RetryNextTarget(None)
59                    }
60                    _ => RetryDecision::DontRetry,
61                };
62            }
63            cl => cl,
64        };
65
66        fn max_likely_to_work_cl(known_ok: i32, previous_cl: Consistency) -> RetryDecision {
67            let decision = if known_ok >= 3 {
68                RetryDecision::RetrySameTarget(Some(Consistency::Three))
69            } else if known_ok == 2 {
70                RetryDecision::RetrySameTarget(Some(Consistency::Two))
71            } else if known_ok == 1 || previous_cl == Consistency::EachQuorum {
72                // JAVA-1005: EACH_QUORUM does not report a global number of alive replicas
73                // so even if we get 0 alive replicas, there might be
74                // a node up in some other datacenter
75                RetryDecision::RetrySameTarget(Some(Consistency::One))
76            } else {
77                RetryDecision::DontRetry
78            };
79            if let RetryDecision::RetrySameTarget(new_cl) = decision {
80                debug!(
81                    "Decided to lower required consistency from {} to {:?}.",
82                    previous_cl, new_cl
83                );
84            }
85            decision
86        }
87
88        match request_info.error {
89            // Basic errors - there are some problems on this node
90            // Retry on a different one if possible
91            RequestAttemptError::BrokenConnectionError(_)
92            | RequestAttemptError::DbError(DbError::Overloaded, _)
93            | RequestAttemptError::DbError(DbError::ServerError, _)
94            | RequestAttemptError::DbError(DbError::TruncateError, _) => {
95                if request_info.is_idempotent {
96                    RetryDecision::RetryNextTarget(None)
97                } else {
98                    RetryDecision::DontRetry
99                }
100            }
101            // Unavailable - the current node believes that not enough nodes
102            // are alive to satisfy specified consistency requirements.
103            RequestAttemptError::DbError(DbError::Unavailable { alive, .. }, _) => {
104                if !self.was_retry {
105                    self.was_retry = true;
106                    max_likely_to_work_cl(*alive, cl)
107                } else {
108                    RetryDecision::DontRetry
109                }
110            }
111            // ReadTimeout - coordinator didn't receive enough replies in time.
112            RequestAttemptError::DbError(
113                DbError::ReadTimeout {
114                    received,
115                    required,
116                    data_present,
117                    ..
118                },
119                _,
120            ) => {
121                if self.was_retry {
122                    RetryDecision::DontRetry
123                } else if received < required {
124                    self.was_retry = true;
125                    max_likely_to_work_cl(*received, cl)
126                } else if !*data_present {
127                    self.was_retry = true;
128                    RetryDecision::RetrySameTarget(None)
129                } else {
130                    RetryDecision::DontRetry
131                }
132            }
133            // Write timeout - coordinator didn't receive enough replies in time.
134            RequestAttemptError::DbError(
135                DbError::WriteTimeout {
136                    write_type,
137                    received,
138                    ..
139                },
140                _,
141            ) => {
142                if self.was_retry || !request_info.is_idempotent {
143                    RetryDecision::DontRetry
144                } else {
145                    self.was_retry = true;
146                    match write_type {
147                        WriteType::Batch | WriteType::Simple if *received > 0 => {
148                            RetryDecision::IgnoreWriteError
149                        }
150
151                        WriteType::UnloggedBatch => {
152                            // Since only part of the batch could have been persisted,
153                            // retry with whatever consistency should allow to persist all
154                            max_likely_to_work_cl(*received, cl)
155                        }
156                        WriteType::BatchLog => RetryDecision::RetrySameTarget(None),
157
158                        _ => RetryDecision::DontRetry,
159                    }
160                }
161            }
162            // The node is still bootstrapping it can't execute the request, we should try another one
163            RequestAttemptError::DbError(DbError::IsBootstrapping, _) => {
164                RetryDecision::RetryNextTarget(None)
165            }
166            // Connection to the contacted node is overloaded, try another one
167            RequestAttemptError::UnableToAllocStreamId => RetryDecision::RetryNextTarget(None),
168            // In all other cases propagate the error to the user
169            _ => RetryDecision::DontRetry,
170        }
171    }
172
173    fn reset(&mut self) {
174        *self = DowngradingConsistencyRetrySession::new();
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use bytes::Bytes;
181    use scylla_cql::frame::frame_errors::{BatchSerializationError, CqlRequestSerializationError};
182
183    use crate::errors::{BrokenConnectionErrorKind, RequestAttemptError};
184    use crate::test_utils::setup_tracing;
185
186    use super::*;
187
188    const CONSISTENCY_LEVELS: &[Consistency] = &[
189        Consistency::All,
190        Consistency::Any,
191        Consistency::EachQuorum,
192        Consistency::LocalOne,
193        Consistency::LocalQuorum,
194        Consistency::One,
195        Consistency::Quorum,
196        Consistency::Three,
197        Consistency::Two,
198    ];
199
200    fn make_request_info_with_cl(
201        error: &RequestAttemptError,
202        is_idempotent: bool,
203        cl: Consistency,
204    ) -> RequestInfo<'_> {
205        RequestInfo {
206            error,
207            is_idempotent,
208            consistency: cl,
209        }
210    }
211
212    // Asserts that downgrading consistency policy never retries for this Error
213    fn downgrading_consistency_policy_assert_never_retries(
214        error: RequestAttemptError,
215        cl: Consistency,
216    ) {
217        let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
218        assert_eq!(
219            policy.decide_should_retry(make_request_info_with_cl(&error, false, cl)),
220            RetryDecision::DontRetry
221        );
222
223        let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
224        assert_eq!(
225            policy.decide_should_retry(make_request_info_with_cl(&error, true, cl)),
226            RetryDecision::DontRetry
227        );
228    }
229
230    #[test]
231    fn downgrading_consistency_never_retries() {
232        setup_tracing();
233        let never_retried_dberrors = vec![
234            DbError::SyntaxError,
235            DbError::Invalid,
236            DbError::AlreadyExists {
237                keyspace: String::new(),
238                table: String::new(),
239            },
240            DbError::FunctionFailure {
241                keyspace: String::new(),
242                function: String::new(),
243                arg_types: vec![],
244            },
245            DbError::AuthenticationError,
246            DbError::Unauthorized,
247            DbError::ConfigError,
248            DbError::ReadFailure {
249                consistency: Consistency::Two,
250                received: 1,
251                required: 2,
252                numfailures: 1,
253                data_present: false,
254            },
255            DbError::WriteFailure {
256                consistency: Consistency::Two,
257                received: 1,
258                required: 2,
259                numfailures: 1,
260                write_type: WriteType::BatchLog,
261            },
262            DbError::Unprepared {
263                statement_id: Bytes::from_static(b"deadbeef"),
264            },
265            DbError::ProtocolError,
266            DbError::Other(0x124816),
267        ];
268
269        for &cl in CONSISTENCY_LEVELS {
270            for dberror in never_retried_dberrors.clone() {
271                downgrading_consistency_policy_assert_never_retries(
272                    RequestAttemptError::DbError(dberror, String::new()),
273                    cl,
274                );
275            }
276
277            downgrading_consistency_policy_assert_never_retries(
278                RequestAttemptError::RepreparedIdMissingInBatch,
279                cl,
280            );
281            downgrading_consistency_policy_assert_never_retries(
282                RequestAttemptError::RepreparedIdChanged {
283                    statement: String::new(),
284                    expected_id: vec![],
285                    reprepared_id: vec![],
286                },
287                cl,
288            );
289            downgrading_consistency_policy_assert_never_retries(
290                RequestAttemptError::CqlRequestSerialization(
291                    CqlRequestSerializationError::BatchSerialization(
292                        BatchSerializationError::TooManyStatements(u16::MAX as usize + 1),
293                    ),
294                ),
295                cl,
296            );
297        }
298    }
299
300    // Asserts that for this error policy retries on next on idempotent queries only
301    fn downgrading_consistency_policy_assert_idempotent_next(
302        error: RequestAttemptError,
303        cl: Consistency,
304    ) {
305        let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
306        assert_eq!(
307            policy.decide_should_retry(make_request_info_with_cl(&error, false, cl)),
308            RetryDecision::DontRetry
309        );
310
311        let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
312        assert_eq!(
313            policy.decide_should_retry(make_request_info_with_cl(&error, true, cl)),
314            RetryDecision::RetryNextTarget(None)
315        );
316    }
317
318    fn max_likely_to_work_cl(known_ok: i32, current_cl: Consistency) -> RetryDecision {
319        if known_ok >= 3 {
320            RetryDecision::RetrySameTarget(Some(Consistency::Three))
321        } else if known_ok == 2 {
322            RetryDecision::RetrySameTarget(Some(Consistency::Two))
323        } else if known_ok == 1 || current_cl == Consistency::EachQuorum {
324            // JAVA-1005: EACH_QUORUM does not report a global number of alive replicas
325            // so even if we get 0 alive replicas, there might be
326            // a node up in some other datacenter
327            RetryDecision::RetrySameTarget(Some(Consistency::One))
328        } else {
329            RetryDecision::DontRetry
330        }
331    }
332
333    #[test]
334    fn downgrading_consistency_idempotent_next_retries() {
335        setup_tracing();
336        let idempotent_next_errors = vec![
337            RequestAttemptError::DbError(DbError::Overloaded, String::new()),
338            RequestAttemptError::DbError(DbError::TruncateError, String::new()),
339            RequestAttemptError::DbError(DbError::ServerError, String::new()),
340            RequestAttemptError::BrokenConnectionError(
341                BrokenConnectionErrorKind::TooManyOrphanedStreamIds(5).into(),
342            ),
343        ];
344
345        for &cl in CONSISTENCY_LEVELS {
346            for error in idempotent_next_errors.clone() {
347                downgrading_consistency_policy_assert_idempotent_next(error, cl);
348            }
349        }
350    }
351
352    // Always retry on next node if current one is bootstrapping
353    #[test]
354    fn downgrading_consistency_bootstrapping() {
355        setup_tracing();
356        let error = RequestAttemptError::DbError(DbError::IsBootstrapping, String::new());
357
358        for &cl in CONSISTENCY_LEVELS {
359            let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
360            assert_eq!(
361                policy.decide_should_retry(make_request_info_with_cl(&error, false, cl)),
362                RetryDecision::RetryNextTarget(None)
363            );
364
365            let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
366            assert_eq!(
367                policy.decide_should_retry(make_request_info_with_cl(&error, true, cl)),
368                RetryDecision::RetryNextTarget(None)
369            );
370        }
371    }
372
373    // On Unavailable error we retry one time no matter the idempotence
374    #[test]
375    fn downgrading_consistency_unavailable() {
376        setup_tracing();
377        let alive = 1;
378        let error = RequestAttemptError::DbError(
379            DbError::Unavailable {
380                consistency: Consistency::Two,
381                required: 2,
382                alive,
383            },
384            String::new(),
385        );
386
387        for &cl in CONSISTENCY_LEVELS {
388            let mut policy_not_idempotent = DowngradingConsistencyRetryPolicy::new().new_session();
389            assert_eq!(
390                policy_not_idempotent
391                    .decide_should_retry(make_request_info_with_cl(&error, false, cl)),
392                max_likely_to_work_cl(alive, cl)
393            );
394            assert_eq!(
395                policy_not_idempotent
396                    .decide_should_retry(make_request_info_with_cl(&error, false, cl)),
397                RetryDecision::DontRetry
398            );
399
400            let mut policy_idempotent = DowngradingConsistencyRetryPolicy::new().new_session();
401            assert_eq!(
402                policy_idempotent.decide_should_retry(make_request_info_with_cl(&error, true, cl)),
403                max_likely_to_work_cl(alive, cl)
404            );
405            assert_eq!(
406                policy_idempotent.decide_should_retry(make_request_info_with_cl(&error, true, cl)),
407                RetryDecision::DontRetry
408            );
409        }
410    }
411
412    // On ReadTimeout we retry one time if there were enough responses and the data was present no matter the idempotence
413    #[test]
414    fn downgrading_consistency_read_timeout() {
415        setup_tracing();
416        // Enough responses and data_present == false - coordinator received only checksums
417        let enough_responses_no_data = RequestAttemptError::DbError(
418            DbError::ReadTimeout {
419                consistency: Consistency::Two,
420                received: 2,
421                required: 2,
422                data_present: false,
423            },
424            String::new(),
425        );
426
427        for &cl in CONSISTENCY_LEVELS {
428            // Not idempotent
429            let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
430            assert_eq!(
431                policy.decide_should_retry(make_request_info_with_cl(
432                    &enough_responses_no_data,
433                    false,
434                    cl
435                )),
436                RetryDecision::RetrySameTarget(None)
437            );
438            assert_eq!(
439                policy.decide_should_retry(make_request_info_with_cl(
440                    &enough_responses_no_data,
441                    false,
442                    cl
443                )),
444                RetryDecision::DontRetry
445            );
446
447            // Idempotent
448            let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
449            assert_eq!(
450                policy.decide_should_retry(make_request_info_with_cl(
451                    &enough_responses_no_data,
452                    true,
453                    cl
454                )),
455                RetryDecision::RetrySameTarget(None)
456            );
457            assert_eq!(
458                policy.decide_should_retry(make_request_info_with_cl(
459                    &enough_responses_no_data,
460                    true,
461                    cl
462                )),
463                RetryDecision::DontRetry
464            );
465        }
466        // Enough responses but data_present == true - coordinator probably timed out
467        // waiting for read-repair acknowledgement.
468        let enough_responses_with_data = RequestAttemptError::DbError(
469            DbError::ReadTimeout {
470                consistency: Consistency::Two,
471                received: 2,
472                required: 2,
473                data_present: true,
474            },
475            String::new(),
476        );
477
478        for &cl in CONSISTENCY_LEVELS {
479            // Not idempotent
480            let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
481            assert_eq!(
482                policy.decide_should_retry(make_request_info_with_cl(
483                    &enough_responses_with_data,
484                    false,
485                    cl
486                )),
487                RetryDecision::DontRetry
488            );
489
490            // Idempotent
491            let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
492            assert_eq!(
493                policy.decide_should_retry(make_request_info_with_cl(
494                    &enough_responses_with_data,
495                    true,
496                    cl
497                )),
498                RetryDecision::DontRetry
499            );
500        }
501
502        // Not enough responses, data_present == true
503        let received = 1;
504        let not_enough_responses_with_data = RequestAttemptError::DbError(
505            DbError::ReadTimeout {
506                consistency: Consistency::Two,
507                received,
508                required: 2,
509                data_present: true,
510            },
511            String::new(),
512        );
513        for &cl in CONSISTENCY_LEVELS {
514            let expected_decision = max_likely_to_work_cl(received, cl);
515
516            // Not idempotent
517            let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
518            assert_eq!(
519                policy.decide_should_retry(make_request_info_with_cl(
520                    &not_enough_responses_with_data,
521                    false,
522                    cl
523                )),
524                expected_decision
525            );
526            if let RetryDecision::RetrySameTarget(new_cl) = expected_decision {
527                assert_eq!(
528                    policy.decide_should_retry(make_request_info_with_cl(
529                        &not_enough_responses_with_data,
530                        false,
531                        new_cl.unwrap_or(cl)
532                    )),
533                    RetryDecision::DontRetry
534                );
535            }
536
537            // Idempotent
538            let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
539            assert_eq!(
540                policy.decide_should_retry(make_request_info_with_cl(
541                    &not_enough_responses_with_data,
542                    true,
543                    cl
544                )),
545                expected_decision
546            );
547            if let RetryDecision::RetrySameTarget(new_cl) = expected_decision {
548                assert_eq!(
549                    policy.decide_should_retry(make_request_info_with_cl(
550                        &not_enough_responses_with_data,
551                        true,
552                        new_cl.unwrap_or(cl)
553                    )),
554                    RetryDecision::DontRetry
555                );
556            }
557        }
558    }
559
560    // WriteTimeout will retry once when the request is idempotent and write_type == BatchLog
561    #[test]
562    fn downgrading_consistency_write_timeout() {
563        setup_tracing();
564        for (received, required) in (1..=5).zip(2..=6) {
565            // WriteType == BatchLog
566            let write_type_batchlog = RequestAttemptError::DbError(
567                DbError::WriteTimeout {
568                    consistency: Consistency::Two,
569                    received,
570                    required,
571                    write_type: WriteType::BatchLog,
572                },
573                String::new(),
574            );
575
576            for &cl in CONSISTENCY_LEVELS {
577                // Not idempotent
578                let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
579                assert_eq!(
580                    policy.decide_should_retry(make_request_info_with_cl(
581                        &write_type_batchlog,
582                        false,
583                        cl
584                    )),
585                    RetryDecision::DontRetry
586                );
587
588                // Idempotent
589                let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
590                assert_eq!(
591                    policy.decide_should_retry(make_request_info_with_cl(
592                        &write_type_batchlog,
593                        true,
594                        cl
595                    )),
596                    RetryDecision::RetrySameTarget(None)
597                );
598                assert_eq!(
599                    policy.decide_should_retry(make_request_info_with_cl(
600                        &write_type_batchlog,
601                        true,
602                        cl
603                    )),
604                    RetryDecision::DontRetry
605                );
606            }
607
608            // WriteType == UnloggedBatch
609            let write_type_unlogged_batch = RequestAttemptError::DbError(
610                DbError::WriteTimeout {
611                    consistency: Consistency::Two,
612                    received,
613                    required,
614                    write_type: WriteType::UnloggedBatch,
615                },
616                String::new(),
617            );
618
619            for &cl in CONSISTENCY_LEVELS {
620                // Not idempotent
621                let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
622                assert_eq!(
623                    policy.decide_should_retry(make_request_info_with_cl(
624                        &write_type_unlogged_batch,
625                        false,
626                        cl
627                    )),
628                    RetryDecision::DontRetry
629                );
630
631                // Idempotent
632                let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
633                assert_eq!(
634                    policy.decide_should_retry(make_request_info_with_cl(
635                        &write_type_unlogged_batch,
636                        true,
637                        cl
638                    )),
639                    max_likely_to_work_cl(received, cl)
640                );
641                assert_eq!(
642                    policy.decide_should_retry(make_request_info_with_cl(
643                        &write_type_unlogged_batch,
644                        true,
645                        cl
646                    )),
647                    RetryDecision::DontRetry
648                );
649            }
650
651            // WriteType == other
652            let write_type_other = RequestAttemptError::DbError(
653                DbError::WriteTimeout {
654                    consistency: Consistency::Two,
655                    received,
656                    required,
657                    write_type: WriteType::Simple,
658                },
659                String::new(),
660            );
661
662            for &cl in CONSISTENCY_LEVELS {
663                // Not idempotent
664                let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
665                assert_eq!(
666                    policy.decide_should_retry(make_request_info_with_cl(
667                        &write_type_other,
668                        false,
669                        cl
670                    )),
671                    RetryDecision::DontRetry
672                );
673
674                // Idempotent
675                let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
676                assert_eq!(
677                    policy.decide_should_retry(make_request_info_with_cl(
678                        &write_type_other,
679                        true,
680                        cl
681                    )),
682                    RetryDecision::IgnoreWriteError
683                );
684                assert_eq!(
685                    policy.decide_should_retry(make_request_info_with_cl(
686                        &write_type_other,
687                        true,
688                        cl
689                    )),
690                    RetryDecision::DontRetry
691                );
692            }
693        }
694    }
695}