scylla/policies/retry/
default.rs

1use scylla_cql::frame::response::error::{DbError, WriteType};
2
3use crate::errors::RequestAttemptError;
4
5use super::{RequestInfo, RetryDecision, RetryPolicy, RetrySession};
6
7/// Default retry policy - retries when there is a high chance that a retry might help.\
8/// Behaviour based on [DataStax Java Driver](https://docs.datastax.com/en/developer/java-driver/4.10/manual/core/retries/)
9#[derive(Debug)]
10pub struct DefaultRetryPolicy;
11
12impl DefaultRetryPolicy {
13    pub fn new() -> DefaultRetryPolicy {
14        DefaultRetryPolicy
15    }
16}
17
18impl Default for DefaultRetryPolicy {
19    fn default() -> DefaultRetryPolicy {
20        DefaultRetryPolicy::new()
21    }
22}
23
24impl RetryPolicy for DefaultRetryPolicy {
25    fn new_session(&self) -> Box<dyn RetrySession> {
26        Box::new(DefaultRetrySession::new())
27    }
28}
29
30pub struct DefaultRetrySession {
31    was_unavailable_retry: bool,
32    was_read_timeout_retry: bool,
33    was_write_timeout_retry: bool,
34}
35
36impl DefaultRetrySession {
37    pub fn new() -> DefaultRetrySession {
38        DefaultRetrySession {
39            was_unavailable_retry: false,
40            was_read_timeout_retry: false,
41            was_write_timeout_retry: false,
42        }
43    }
44}
45
46impl Default for DefaultRetrySession {
47    fn default() -> DefaultRetrySession {
48        DefaultRetrySession::new()
49    }
50}
51
52impl RetrySession for DefaultRetrySession {
53    fn decide_should_retry(&mut self, request_info: RequestInfo) -> RetryDecision {
54        if request_info.consistency.is_serial() {
55            return RetryDecision::DontRetry;
56        };
57        match request_info.error {
58            // Basic errors - there are some problems on this node
59            // Retry on a different one if possible
60            RequestAttemptError::BrokenConnectionError(_)
61            | RequestAttemptError::DbError(DbError::Overloaded, _)
62            | RequestAttemptError::DbError(DbError::ServerError, _)
63            | RequestAttemptError::DbError(DbError::TruncateError, _) => {
64                if request_info.is_idempotent {
65                    RetryDecision::RetryNextTarget(None)
66                } else {
67                    RetryDecision::DontRetry
68                }
69            }
70            // Unavailable - the current node believes that not enough nodes
71            // are alive to satisfy specified consistency requirements.
72            // Maybe this node has network problems - try a different one.
73            // Perform at most one retry - it's unlikely that two nodes
74            // have network problems at the same time
75            RequestAttemptError::DbError(DbError::Unavailable { .. }, _) => {
76                if !self.was_unavailable_retry {
77                    self.was_unavailable_retry = true;
78                    RetryDecision::RetryNextTarget(None)
79                } else {
80                    RetryDecision::DontRetry
81                }
82            }
83            // ReadTimeout - coordinator didn't receive enough replies in time.
84            // Retry at most once and only if there were actually enough replies
85            // to satisfy consistency but they were all just checksums (data_present == false).
86            // This happens when the coordinator picked replicas that were overloaded/dying.
87            // Retried request should have some useful response because the node will detect
88            // that these replicas are dead.
89            RequestAttemptError::DbError(
90                DbError::ReadTimeout {
91                    received,
92                    required,
93                    data_present,
94                    ..
95                },
96                _,
97            ) => {
98                if !self.was_read_timeout_retry && received >= required && !*data_present {
99                    self.was_read_timeout_retry = true;
100                    RetryDecision::RetrySameTarget(None)
101                } else {
102                    RetryDecision::DontRetry
103                }
104            }
105            // Write timeout - coordinator didn't receive enough replies in time.
106            // Retry at most once and only for BatchLog write.
107            // Coordinator probably didn't detect the nodes as dead.
108            // By the time we retry they should be detected as dead.
109            RequestAttemptError::DbError(DbError::WriteTimeout { write_type, .. }, _) => {
110                if !self.was_write_timeout_retry
111                    && request_info.is_idempotent
112                    && *write_type == WriteType::BatchLog
113                {
114                    self.was_write_timeout_retry = true;
115                    RetryDecision::RetrySameTarget(None)
116                } else {
117                    RetryDecision::DontRetry
118                }
119            }
120            // The node is still bootstrapping it can't execute the request, we should try another one
121            RequestAttemptError::DbError(DbError::IsBootstrapping, _) => {
122                RetryDecision::RetryNextTarget(None)
123            }
124            // Connection to the contacted node is overloaded, try another one
125            RequestAttemptError::UnableToAllocStreamId => RetryDecision::RetryNextTarget(None),
126            // In all other cases propagate the error to the user
127            _ => RetryDecision::DontRetry,
128        }
129    }
130
131    fn reset(&mut self) {
132        *self = DefaultRetrySession::new();
133    }
134}
135
136#[cfg(test)]
137mod tests {
138    use super::{DefaultRetryPolicy, RequestInfo, RetryDecision, RetryPolicy};
139    use crate::errors::{BrokenConnectionErrorKind, RequestAttemptError};
140    use crate::errors::{DbError, WriteType};
141    use crate::statement::Consistency;
142    use crate::test_utils::setup_tracing;
143    use bytes::Bytes;
144    use scylla_cql::frame::frame_errors::{BatchSerializationError, CqlRequestSerializationError};
145
146    fn make_request_info(error: &RequestAttemptError, is_idempotent: bool) -> RequestInfo<'_> {
147        RequestInfo {
148            error,
149            is_idempotent,
150            consistency: Consistency::One,
151        }
152    }
153
154    // Asserts that default policy never retries for this Error
155    fn default_policy_assert_never_retries(error: RequestAttemptError) {
156        let mut policy = DefaultRetryPolicy::new().new_session();
157        assert_eq!(
158            policy.decide_should_retry(make_request_info(&error, false)),
159            RetryDecision::DontRetry
160        );
161
162        let mut policy = DefaultRetryPolicy::new().new_session();
163        assert_eq!(
164            policy.decide_should_retry(make_request_info(&error, true)),
165            RetryDecision::DontRetry
166        );
167    }
168
169    #[test]
170    fn default_never_retries() {
171        setup_tracing();
172        let never_retried_dberrors = vec![
173            DbError::SyntaxError,
174            DbError::Invalid,
175            DbError::AlreadyExists {
176                keyspace: String::new(),
177                table: String::new(),
178            },
179            DbError::FunctionFailure {
180                keyspace: String::new(),
181                function: String::new(),
182                arg_types: vec![],
183            },
184            DbError::AuthenticationError,
185            DbError::Unauthorized,
186            DbError::ConfigError,
187            DbError::ReadFailure {
188                consistency: Consistency::Two,
189                received: 2,
190                required: 1,
191                numfailures: 1,
192                data_present: false,
193            },
194            DbError::WriteFailure {
195                consistency: Consistency::Two,
196                received: 1,
197                required: 2,
198                numfailures: 1,
199                write_type: WriteType::BatchLog,
200            },
201            DbError::Unprepared {
202                statement_id: Bytes::from_static(b"deadbeef"),
203            },
204            DbError::ProtocolError,
205            DbError::Other(0x124816),
206        ];
207
208        for dberror in never_retried_dberrors {
209            default_policy_assert_never_retries(RequestAttemptError::DbError(
210                dberror,
211                String::new(),
212            ));
213        }
214
215        default_policy_assert_never_retries(RequestAttemptError::RepreparedIdMissingInBatch);
216        default_policy_assert_never_retries(RequestAttemptError::RepreparedIdChanged {
217            statement: String::new(),
218            expected_id: vec![],
219            reprepared_id: vec![],
220        });
221        default_policy_assert_never_retries(RequestAttemptError::CqlRequestSerialization(
222            CqlRequestSerializationError::BatchSerialization(
223                BatchSerializationError::TooManyStatements(u16::MAX as usize + 1),
224            ),
225        ));
226    }
227
228    // Asserts that for this error policy retries on next on idempotent queries only
229    fn default_policy_assert_idempotent_next(error: RequestAttemptError) {
230        let mut policy = DefaultRetryPolicy::new().new_session();
231        assert_eq!(
232            policy.decide_should_retry(make_request_info(&error, false)),
233            RetryDecision::DontRetry
234        );
235
236        let mut policy = DefaultRetryPolicy::new().new_session();
237        assert_eq!(
238            policy.decide_should_retry(make_request_info(&error, true)),
239            RetryDecision::RetryNextTarget(None)
240        );
241    }
242
243    #[test]
244    fn default_idempotent_next_retries() {
245        setup_tracing();
246        let idempotent_next_errors = vec![
247            RequestAttemptError::DbError(DbError::Overloaded, String::new()),
248            RequestAttemptError::DbError(DbError::TruncateError, String::new()),
249            RequestAttemptError::DbError(DbError::ServerError, String::new()),
250            RequestAttemptError::BrokenConnectionError(
251                BrokenConnectionErrorKind::TooManyOrphanedStreamIds(5).into(),
252            ),
253        ];
254
255        for error in idempotent_next_errors {
256            default_policy_assert_idempotent_next(error);
257        }
258    }
259
260    // Always retry on next node if current one is bootstrapping
261    #[test]
262    fn default_bootstrapping() {
263        setup_tracing();
264        let error = RequestAttemptError::DbError(DbError::IsBootstrapping, String::new());
265
266        let mut policy = DefaultRetryPolicy::new().new_session();
267        assert_eq!(
268            policy.decide_should_retry(make_request_info(&error, false)),
269            RetryDecision::RetryNextTarget(None)
270        );
271
272        let mut policy = DefaultRetryPolicy::new().new_session();
273        assert_eq!(
274            policy.decide_should_retry(make_request_info(&error, true)),
275            RetryDecision::RetryNextTarget(None)
276        );
277    }
278
279    // On Unavailable error we retry one time no matter the idempotence
280    #[test]
281    fn default_unavailable() {
282        setup_tracing();
283        let error = RequestAttemptError::DbError(
284            DbError::Unavailable {
285                consistency: Consistency::Two,
286                required: 2,
287                alive: 1,
288            },
289            String::new(),
290        );
291
292        let mut policy_not_idempotent = DefaultRetryPolicy::new().new_session();
293        assert_eq!(
294            policy_not_idempotent.decide_should_retry(make_request_info(&error, false)),
295            RetryDecision::RetryNextTarget(None)
296        );
297        assert_eq!(
298            policy_not_idempotent.decide_should_retry(make_request_info(&error, false)),
299            RetryDecision::DontRetry
300        );
301
302        let mut policy_idempotent = DefaultRetryPolicy::new().new_session();
303        assert_eq!(
304            policy_idempotent.decide_should_retry(make_request_info(&error, true)),
305            RetryDecision::RetryNextTarget(None)
306        );
307        assert_eq!(
308            policy_idempotent.decide_should_retry(make_request_info(&error, true)),
309            RetryDecision::DontRetry
310        );
311    }
312
313    // On ReadTimeout we retry one time if there were enough responses and the data was present no matter the idempotence
314    #[test]
315    fn default_read_timeout() {
316        setup_tracing();
317        // Enough responses and data_present == false - coordinator received only checksums
318        let enough_responses_no_data = RequestAttemptError::DbError(
319            DbError::ReadTimeout {
320                consistency: Consistency::Two,
321                received: 2,
322                required: 2,
323                data_present: false,
324            },
325            String::new(),
326        );
327
328        // Not idempotent
329        let mut policy = DefaultRetryPolicy::new().new_session();
330        assert_eq!(
331            policy.decide_should_retry(make_request_info(&enough_responses_no_data, false)),
332            RetryDecision::RetrySameTarget(None)
333        );
334        assert_eq!(
335            policy.decide_should_retry(make_request_info(&enough_responses_no_data, false)),
336            RetryDecision::DontRetry
337        );
338
339        // Idempotent
340        let mut policy = DefaultRetryPolicy::new().new_session();
341        assert_eq!(
342            policy.decide_should_retry(make_request_info(&enough_responses_no_data, true)),
343            RetryDecision::RetrySameTarget(None)
344        );
345        assert_eq!(
346            policy.decide_should_retry(make_request_info(&enough_responses_no_data, true)),
347            RetryDecision::DontRetry
348        );
349
350        // Enough responses but data_present == true - coordinator probably timed out
351        // waiting for read-repair acknowledgement.
352        let enough_responses_with_data = RequestAttemptError::DbError(
353            DbError::ReadTimeout {
354                consistency: Consistency::Two,
355                received: 2,
356                required: 2,
357                data_present: true,
358            },
359            String::new(),
360        );
361
362        // Not idempotent
363        let mut policy = DefaultRetryPolicy::new().new_session();
364        assert_eq!(
365            policy.decide_should_retry(make_request_info(&enough_responses_with_data, false)),
366            RetryDecision::DontRetry
367        );
368
369        // Idempotent
370        let mut policy = DefaultRetryPolicy::new().new_session();
371        assert_eq!(
372            policy.decide_should_retry(make_request_info(&enough_responses_with_data, true)),
373            RetryDecision::DontRetry
374        );
375
376        // Not enough responses, data_present == true
377        let not_enough_responses_with_data = RequestAttemptError::DbError(
378            DbError::ReadTimeout {
379                consistency: Consistency::Two,
380                received: 1,
381                required: 2,
382                data_present: true,
383            },
384            String::new(),
385        );
386
387        // Not idempotent
388        let mut policy = DefaultRetryPolicy::new().new_session();
389        assert_eq!(
390            policy.decide_should_retry(make_request_info(&not_enough_responses_with_data, false)),
391            RetryDecision::DontRetry
392        );
393
394        // Idempotent
395        let mut policy = DefaultRetryPolicy::new().new_session();
396        assert_eq!(
397            policy.decide_should_retry(make_request_info(&not_enough_responses_with_data, true)),
398            RetryDecision::DontRetry
399        );
400    }
401
402    // WriteTimeout will retry once when the request is idempotent and write_type == BatchLog
403    #[test]
404    fn default_write_timeout() {
405        setup_tracing();
406        // WriteType == BatchLog
407        let good_write_type = RequestAttemptError::DbError(
408            DbError::WriteTimeout {
409                consistency: Consistency::Two,
410                received: 1,
411                required: 2,
412                write_type: WriteType::BatchLog,
413            },
414            String::new(),
415        );
416
417        // Not idempotent
418        let mut policy = DefaultRetryPolicy::new().new_session();
419        assert_eq!(
420            policy.decide_should_retry(make_request_info(&good_write_type, false)),
421            RetryDecision::DontRetry
422        );
423
424        // Idempotent
425        let mut policy = DefaultRetryPolicy::new().new_session();
426        assert_eq!(
427            policy.decide_should_retry(make_request_info(&good_write_type, true)),
428            RetryDecision::RetrySameTarget(None)
429        );
430        assert_eq!(
431            policy.decide_should_retry(make_request_info(&good_write_type, true)),
432            RetryDecision::DontRetry
433        );
434
435        // WriteType != BatchLog
436        let bad_write_type = RequestAttemptError::DbError(
437            DbError::WriteTimeout {
438                consistency: Consistency::Two,
439                received: 4,
440                required: 2,
441                write_type: WriteType::Simple,
442            },
443            String::new(),
444        );
445
446        // Not idempotent
447        let mut policy = DefaultRetryPolicy::new().new_session();
448        assert_eq!(
449            policy.decide_should_retry(make_request_info(&bad_write_type, false)),
450            RetryDecision::DontRetry
451        );
452
453        // Idempotent
454        let mut policy = DefaultRetryPolicy::new().new_session();
455        assert_eq!(
456            policy.decide_should_retry(make_request_info(&bad_write_type, true)),
457            RetryDecision::DontRetry
458        );
459    }
460}