1use scylla_cql::frame::response::error::{DbError, WriteType};
2
3use crate::errors::RequestAttemptError;
4
5use super::{RequestInfo, RetryDecision, RetryPolicy, RetrySession};
6
7#[derive(Debug)]
10pub struct DefaultRetryPolicy;
11
12impl DefaultRetryPolicy {
13 pub fn new() -> DefaultRetryPolicy {
14 DefaultRetryPolicy
15 }
16}
17
18impl Default for DefaultRetryPolicy {
19 fn default() -> DefaultRetryPolicy {
20 DefaultRetryPolicy::new()
21 }
22}
23
24impl RetryPolicy for DefaultRetryPolicy {
25 fn new_session(&self) -> Box<dyn RetrySession> {
26 Box::new(DefaultRetrySession::new())
27 }
28}
29
30pub struct DefaultRetrySession {
31 was_unavailable_retry: bool,
32 was_read_timeout_retry: bool,
33 was_write_timeout_retry: bool,
34}
35
36impl DefaultRetrySession {
37 pub fn new() -> DefaultRetrySession {
38 DefaultRetrySession {
39 was_unavailable_retry: false,
40 was_read_timeout_retry: false,
41 was_write_timeout_retry: false,
42 }
43 }
44}
45
46impl Default for DefaultRetrySession {
47 fn default() -> DefaultRetrySession {
48 DefaultRetrySession::new()
49 }
50}
51
52impl RetrySession for DefaultRetrySession {
53 fn decide_should_retry(&mut self, request_info: RequestInfo) -> RetryDecision {
54 if request_info.consistency.is_serial() {
55 return RetryDecision::DontRetry;
56 };
57 match request_info.error {
58 RequestAttemptError::BrokenConnectionError(_)
61 | RequestAttemptError::DbError(DbError::Overloaded, _)
62 | RequestAttemptError::DbError(DbError::ServerError, _)
63 | RequestAttemptError::DbError(DbError::TruncateError, _) => {
64 if request_info.is_idempotent {
65 RetryDecision::RetryNextTarget(None)
66 } else {
67 RetryDecision::DontRetry
68 }
69 }
70 RequestAttemptError::DbError(DbError::Unavailable { .. }, _) => {
76 if !self.was_unavailable_retry {
77 self.was_unavailable_retry = true;
78 RetryDecision::RetryNextTarget(None)
79 } else {
80 RetryDecision::DontRetry
81 }
82 }
83 RequestAttemptError::DbError(
90 DbError::ReadTimeout {
91 received,
92 required,
93 data_present,
94 ..
95 },
96 _,
97 ) => {
98 if !self.was_read_timeout_retry && received >= required && !*data_present {
99 self.was_read_timeout_retry = true;
100 RetryDecision::RetrySameTarget(None)
101 } else {
102 RetryDecision::DontRetry
103 }
104 }
105 RequestAttemptError::DbError(DbError::WriteTimeout { write_type, .. }, _) => {
110 if !self.was_write_timeout_retry
111 && request_info.is_idempotent
112 && *write_type == WriteType::BatchLog
113 {
114 self.was_write_timeout_retry = true;
115 RetryDecision::RetrySameTarget(None)
116 } else {
117 RetryDecision::DontRetry
118 }
119 }
120 RequestAttemptError::DbError(DbError::IsBootstrapping, _) => {
122 RetryDecision::RetryNextTarget(None)
123 }
124 RequestAttemptError::UnableToAllocStreamId => RetryDecision::RetryNextTarget(None),
126 _ => RetryDecision::DontRetry,
128 }
129 }
130
131 fn reset(&mut self) {
132 *self = DefaultRetrySession::new();
133 }
134}
135
136#[cfg(test)]
137mod tests {
138 use super::{DefaultRetryPolicy, RequestInfo, RetryDecision, RetryPolicy};
139 use crate::errors::{BrokenConnectionErrorKind, RequestAttemptError};
140 use crate::errors::{DbError, WriteType};
141 use crate::statement::Consistency;
142 use crate::test_utils::setup_tracing;
143 use bytes::Bytes;
144 use scylla_cql::frame::frame_errors::{BatchSerializationError, CqlRequestSerializationError};
145
146 fn make_request_info(error: &RequestAttemptError, is_idempotent: bool) -> RequestInfo<'_> {
147 RequestInfo {
148 error,
149 is_idempotent,
150 consistency: Consistency::One,
151 }
152 }
153
154 fn default_policy_assert_never_retries(error: RequestAttemptError) {
156 let mut policy = DefaultRetryPolicy::new().new_session();
157 assert_eq!(
158 policy.decide_should_retry(make_request_info(&error, false)),
159 RetryDecision::DontRetry
160 );
161
162 let mut policy = DefaultRetryPolicy::new().new_session();
163 assert_eq!(
164 policy.decide_should_retry(make_request_info(&error, true)),
165 RetryDecision::DontRetry
166 );
167 }
168
169 #[test]
170 fn default_never_retries() {
171 setup_tracing();
172 let never_retried_dberrors = vec![
173 DbError::SyntaxError,
174 DbError::Invalid,
175 DbError::AlreadyExists {
176 keyspace: String::new(),
177 table: String::new(),
178 },
179 DbError::FunctionFailure {
180 keyspace: String::new(),
181 function: String::new(),
182 arg_types: vec![],
183 },
184 DbError::AuthenticationError,
185 DbError::Unauthorized,
186 DbError::ConfigError,
187 DbError::ReadFailure {
188 consistency: Consistency::Two,
189 received: 2,
190 required: 1,
191 numfailures: 1,
192 data_present: false,
193 },
194 DbError::WriteFailure {
195 consistency: Consistency::Two,
196 received: 1,
197 required: 2,
198 numfailures: 1,
199 write_type: WriteType::BatchLog,
200 },
201 DbError::Unprepared {
202 statement_id: Bytes::from_static(b"deadbeef"),
203 },
204 DbError::ProtocolError,
205 DbError::Other(0x124816),
206 ];
207
208 for dberror in never_retried_dberrors {
209 default_policy_assert_never_retries(RequestAttemptError::DbError(
210 dberror,
211 String::new(),
212 ));
213 }
214
215 default_policy_assert_never_retries(RequestAttemptError::RepreparedIdMissingInBatch);
216 default_policy_assert_never_retries(RequestAttemptError::RepreparedIdChanged {
217 statement: String::new(),
218 expected_id: vec![],
219 reprepared_id: vec![],
220 });
221 default_policy_assert_never_retries(RequestAttemptError::CqlRequestSerialization(
222 CqlRequestSerializationError::BatchSerialization(
223 BatchSerializationError::TooManyStatements(u16::MAX as usize + 1),
224 ),
225 ));
226 }
227
228 fn default_policy_assert_idempotent_next(error: RequestAttemptError) {
230 let mut policy = DefaultRetryPolicy::new().new_session();
231 assert_eq!(
232 policy.decide_should_retry(make_request_info(&error, false)),
233 RetryDecision::DontRetry
234 );
235
236 let mut policy = DefaultRetryPolicy::new().new_session();
237 assert_eq!(
238 policy.decide_should_retry(make_request_info(&error, true)),
239 RetryDecision::RetryNextTarget(None)
240 );
241 }
242
243 #[test]
244 fn default_idempotent_next_retries() {
245 setup_tracing();
246 let idempotent_next_errors = vec![
247 RequestAttemptError::DbError(DbError::Overloaded, String::new()),
248 RequestAttemptError::DbError(DbError::TruncateError, String::new()),
249 RequestAttemptError::DbError(DbError::ServerError, String::new()),
250 RequestAttemptError::BrokenConnectionError(
251 BrokenConnectionErrorKind::TooManyOrphanedStreamIds(5).into(),
252 ),
253 ];
254
255 for error in idempotent_next_errors {
256 default_policy_assert_idempotent_next(error);
257 }
258 }
259
260 #[test]
262 fn default_bootstrapping() {
263 setup_tracing();
264 let error = RequestAttemptError::DbError(DbError::IsBootstrapping, String::new());
265
266 let mut policy = DefaultRetryPolicy::new().new_session();
267 assert_eq!(
268 policy.decide_should_retry(make_request_info(&error, false)),
269 RetryDecision::RetryNextTarget(None)
270 );
271
272 let mut policy = DefaultRetryPolicy::new().new_session();
273 assert_eq!(
274 policy.decide_should_retry(make_request_info(&error, true)),
275 RetryDecision::RetryNextTarget(None)
276 );
277 }
278
279 #[test]
281 fn default_unavailable() {
282 setup_tracing();
283 let error = RequestAttemptError::DbError(
284 DbError::Unavailable {
285 consistency: Consistency::Two,
286 required: 2,
287 alive: 1,
288 },
289 String::new(),
290 );
291
292 let mut policy_not_idempotent = DefaultRetryPolicy::new().new_session();
293 assert_eq!(
294 policy_not_idempotent.decide_should_retry(make_request_info(&error, false)),
295 RetryDecision::RetryNextTarget(None)
296 );
297 assert_eq!(
298 policy_not_idempotent.decide_should_retry(make_request_info(&error, false)),
299 RetryDecision::DontRetry
300 );
301
302 let mut policy_idempotent = DefaultRetryPolicy::new().new_session();
303 assert_eq!(
304 policy_idempotent.decide_should_retry(make_request_info(&error, true)),
305 RetryDecision::RetryNextTarget(None)
306 );
307 assert_eq!(
308 policy_idempotent.decide_should_retry(make_request_info(&error, true)),
309 RetryDecision::DontRetry
310 );
311 }
312
313 #[test]
315 fn default_read_timeout() {
316 setup_tracing();
317 let enough_responses_no_data = RequestAttemptError::DbError(
319 DbError::ReadTimeout {
320 consistency: Consistency::Two,
321 received: 2,
322 required: 2,
323 data_present: false,
324 },
325 String::new(),
326 );
327
328 let mut policy = DefaultRetryPolicy::new().new_session();
330 assert_eq!(
331 policy.decide_should_retry(make_request_info(&enough_responses_no_data, false)),
332 RetryDecision::RetrySameTarget(None)
333 );
334 assert_eq!(
335 policy.decide_should_retry(make_request_info(&enough_responses_no_data, false)),
336 RetryDecision::DontRetry
337 );
338
339 let mut policy = DefaultRetryPolicy::new().new_session();
341 assert_eq!(
342 policy.decide_should_retry(make_request_info(&enough_responses_no_data, true)),
343 RetryDecision::RetrySameTarget(None)
344 );
345 assert_eq!(
346 policy.decide_should_retry(make_request_info(&enough_responses_no_data, true)),
347 RetryDecision::DontRetry
348 );
349
350 let enough_responses_with_data = RequestAttemptError::DbError(
353 DbError::ReadTimeout {
354 consistency: Consistency::Two,
355 received: 2,
356 required: 2,
357 data_present: true,
358 },
359 String::new(),
360 );
361
362 let mut policy = DefaultRetryPolicy::new().new_session();
364 assert_eq!(
365 policy.decide_should_retry(make_request_info(&enough_responses_with_data, false)),
366 RetryDecision::DontRetry
367 );
368
369 let mut policy = DefaultRetryPolicy::new().new_session();
371 assert_eq!(
372 policy.decide_should_retry(make_request_info(&enough_responses_with_data, true)),
373 RetryDecision::DontRetry
374 );
375
376 let not_enough_responses_with_data = RequestAttemptError::DbError(
378 DbError::ReadTimeout {
379 consistency: Consistency::Two,
380 received: 1,
381 required: 2,
382 data_present: true,
383 },
384 String::new(),
385 );
386
387 let mut policy = DefaultRetryPolicy::new().new_session();
389 assert_eq!(
390 policy.decide_should_retry(make_request_info(¬_enough_responses_with_data, false)),
391 RetryDecision::DontRetry
392 );
393
394 let mut policy = DefaultRetryPolicy::new().new_session();
396 assert_eq!(
397 policy.decide_should_retry(make_request_info(¬_enough_responses_with_data, true)),
398 RetryDecision::DontRetry
399 );
400 }
401
402 #[test]
404 fn default_write_timeout() {
405 setup_tracing();
406 let good_write_type = RequestAttemptError::DbError(
408 DbError::WriteTimeout {
409 consistency: Consistency::Two,
410 received: 1,
411 required: 2,
412 write_type: WriteType::BatchLog,
413 },
414 String::new(),
415 );
416
417 let mut policy = DefaultRetryPolicy::new().new_session();
419 assert_eq!(
420 policy.decide_should_retry(make_request_info(&good_write_type, false)),
421 RetryDecision::DontRetry
422 );
423
424 let mut policy = DefaultRetryPolicy::new().new_session();
426 assert_eq!(
427 policy.decide_should_retry(make_request_info(&good_write_type, true)),
428 RetryDecision::RetrySameTarget(None)
429 );
430 assert_eq!(
431 policy.decide_should_retry(make_request_info(&good_write_type, true)),
432 RetryDecision::DontRetry
433 );
434
435 let bad_write_type = RequestAttemptError::DbError(
437 DbError::WriteTimeout {
438 consistency: Consistency::Two,
439 received: 4,
440 required: 2,
441 write_type: WriteType::Simple,
442 },
443 String::new(),
444 );
445
446 let mut policy = DefaultRetryPolicy::new().new_session();
448 assert_eq!(
449 policy.decide_should_retry(make_request_info(&bad_write_type, false)),
450 RetryDecision::DontRetry
451 );
452
453 let mut policy = DefaultRetryPolicy::new().new_session();
455 assert_eq!(
456 policy.decide_should_retry(make_request_info(&bad_write_type, true)),
457 RetryDecision::DontRetry
458 );
459 }
460}