use crate::frame::types::Consistency;
use crate::transport::errors::{DbError, QueryError, WriteType};
pub struct QueryInfo<'a> {
pub error: &'a QueryError,
pub is_idempotent: bool,
pub consistency: Consistency,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum RetryDecision {
RetrySameNode(Option<Consistency>), RetryNextNode(Option<Consistency>), DontRetry,
IgnoreWriteError,
}
pub trait RetryPolicy: std::fmt::Debug + Send + Sync {
fn new_session(&self) -> Box<dyn RetrySession>;
}
pub trait RetrySession: Send + Sync {
fn decide_should_retry(&mut self, query_info: QueryInfo) -> RetryDecision;
fn reset(&mut self);
}
#[derive(Debug)]
pub struct FallthroughRetryPolicy;
pub struct FallthroughRetrySession;
impl FallthroughRetryPolicy {
pub fn new() -> FallthroughRetryPolicy {
FallthroughRetryPolicy
}
}
impl Default for FallthroughRetryPolicy {
fn default() -> FallthroughRetryPolicy {
FallthroughRetryPolicy
}
}
impl RetryPolicy for FallthroughRetryPolicy {
fn new_session(&self) -> Box<dyn RetrySession> {
Box::new(FallthroughRetrySession)
}
}
impl RetrySession for FallthroughRetrySession {
fn decide_should_retry(&mut self, _query_info: QueryInfo) -> RetryDecision {
RetryDecision::DontRetry
}
fn reset(&mut self) {}
}
#[derive(Debug)]
pub struct DefaultRetryPolicy;
impl DefaultRetryPolicy {
pub fn new() -> DefaultRetryPolicy {
DefaultRetryPolicy
}
}
impl Default for DefaultRetryPolicy {
fn default() -> DefaultRetryPolicy {
DefaultRetryPolicy::new()
}
}
impl RetryPolicy for DefaultRetryPolicy {
fn new_session(&self) -> Box<dyn RetrySession> {
Box::new(DefaultRetrySession::new())
}
}
pub struct DefaultRetrySession {
was_unavailable_retry: bool,
was_read_timeout_retry: bool,
was_write_timeout_retry: bool,
}
impl DefaultRetrySession {
pub fn new() -> DefaultRetrySession {
DefaultRetrySession {
was_unavailable_retry: false,
was_read_timeout_retry: false,
was_write_timeout_retry: false,
}
}
}
impl Default for DefaultRetrySession {
fn default() -> DefaultRetrySession {
DefaultRetrySession::new()
}
}
impl RetrySession for DefaultRetrySession {
fn decide_should_retry(&mut self, query_info: QueryInfo) -> RetryDecision {
if query_info.consistency.is_serial() {
return RetryDecision::DontRetry;
};
match query_info.error {
QueryError::BrokenConnection(_)
| QueryError::ConnectionPoolError(_)
| QueryError::DbError(DbError::Overloaded, _)
| QueryError::DbError(DbError::ServerError, _)
| QueryError::DbError(DbError::TruncateError, _) => {
if query_info.is_idempotent {
RetryDecision::RetryNextNode(None)
} else {
RetryDecision::DontRetry
}
}
QueryError::DbError(DbError::Unavailable { .. }, _) => {
if !self.was_unavailable_retry {
self.was_unavailable_retry = true;
RetryDecision::RetryNextNode(None)
} else {
RetryDecision::DontRetry
}
}
QueryError::DbError(
DbError::ReadTimeout {
received,
required,
data_present,
..
},
_,
) => {
if !self.was_read_timeout_retry && received >= required && !*data_present {
self.was_read_timeout_retry = true;
RetryDecision::RetrySameNode(None)
} else {
RetryDecision::DontRetry
}
}
QueryError::DbError(DbError::WriteTimeout { write_type, .. }, _) => {
if !self.was_write_timeout_retry
&& query_info.is_idempotent
&& *write_type == WriteType::BatchLog
{
self.was_write_timeout_retry = true;
RetryDecision::RetrySameNode(None)
} else {
RetryDecision::DontRetry
}
}
QueryError::DbError(DbError::IsBootstrapping, _) => RetryDecision::RetryNextNode(None),
QueryError::UnableToAllocStreamId => RetryDecision::RetryNextNode(None),
_ => RetryDecision::DontRetry,
}
}
fn reset(&mut self) {
*self = DefaultRetrySession::new();
}
}
#[cfg(test)]
mod tests {
use super::{DefaultRetryPolicy, QueryInfo, RetryDecision, RetryPolicy};
use crate::statement::Consistency;
use crate::test_utils::setup_tracing;
use crate::transport::errors::{
BadQuery, BrokenConnectionErrorKind, ConnectionPoolError, ProtocolError, QueryError,
};
use crate::transport::errors::{DbError, WriteType};
use bytes::Bytes;
fn make_query_info(error: &QueryError, is_idempotent: bool) -> QueryInfo<'_> {
QueryInfo {
error,
is_idempotent,
consistency: Consistency::One,
}
}
fn default_policy_assert_never_retries(error: QueryError) {
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info(&error, false)),
RetryDecision::DontRetry
);
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info(&error, true)),
RetryDecision::DontRetry
);
}
#[test]
fn default_never_retries() {
setup_tracing();
let never_retried_dberrors = vec![
DbError::SyntaxError,
DbError::Invalid,
DbError::AlreadyExists {
keyspace: String::new(),
table: String::new(),
},
DbError::FunctionFailure {
keyspace: String::new(),
function: String::new(),
arg_types: vec![],
},
DbError::AuthenticationError,
DbError::Unauthorized,
DbError::ConfigError,
DbError::ReadFailure {
consistency: Consistency::Two,
received: 2,
required: 1,
numfailures: 1,
data_present: false,
},
DbError::WriteFailure {
consistency: Consistency::Two,
received: 1,
required: 2,
numfailures: 1,
write_type: WriteType::BatchLog,
},
DbError::Unprepared {
statement_id: Bytes::from_static(b"deadbeef"),
},
DbError::ProtocolError,
DbError::Other(0x124816),
];
for dberror in never_retried_dberrors {
default_policy_assert_never_retries(QueryError::DbError(dberror, String::new()));
}
default_policy_assert_never_retries(QueryError::BadQuery(BadQuery::Other(
"Length of provided values must be equal to number of batch statements \
(got 1 values, 2 statements)"
.to_owned(),
)));
default_policy_assert_never_retries(ProtocolError::NonfinishedPagingState.into());
}
fn default_policy_assert_idempotent_next(error: QueryError) {
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info(&error, false)),
RetryDecision::DontRetry
);
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info(&error, true)),
RetryDecision::RetryNextNode(None)
);
}
#[test]
fn default_idempotent_next_retries() {
setup_tracing();
let idempotent_next_errors = vec![
QueryError::DbError(DbError::Overloaded, String::new()),
QueryError::DbError(DbError::TruncateError, String::new()),
QueryError::DbError(DbError::ServerError, String::new()),
QueryError::BrokenConnection(
BrokenConnectionErrorKind::TooManyOrphanedStreamIds(5).into(),
),
QueryError::ConnectionPoolError(ConnectionPoolError::Initializing),
];
for error in idempotent_next_errors {
default_policy_assert_idempotent_next(error);
}
}
#[test]
fn default_bootstrapping() {
setup_tracing();
let error = QueryError::DbError(DbError::IsBootstrapping, String::new());
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info(&error, false)),
RetryDecision::RetryNextNode(None)
);
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info(&error, true)),
RetryDecision::RetryNextNode(None)
);
}
#[test]
fn default_unavailable() {
setup_tracing();
let error = QueryError::DbError(
DbError::Unavailable {
consistency: Consistency::Two,
required: 2,
alive: 1,
},
String::new(),
);
let mut policy_not_idempotent = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy_not_idempotent.decide_should_retry(make_query_info(&error, false)),
RetryDecision::RetryNextNode(None)
);
assert_eq!(
policy_not_idempotent.decide_should_retry(make_query_info(&error, false)),
RetryDecision::DontRetry
);
let mut policy_idempotent = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy_idempotent.decide_should_retry(make_query_info(&error, true)),
RetryDecision::RetryNextNode(None)
);
assert_eq!(
policy_idempotent.decide_should_retry(make_query_info(&error, true)),
RetryDecision::DontRetry
);
}
#[test]
fn default_read_timeout() {
setup_tracing();
let enough_responses_no_data = QueryError::DbError(
DbError::ReadTimeout {
consistency: Consistency::Two,
received: 2,
required: 2,
data_present: false,
},
String::new(),
);
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info(&enough_responses_no_data, false)),
RetryDecision::RetrySameNode(None)
);
assert_eq!(
policy.decide_should_retry(make_query_info(&enough_responses_no_data, false)),
RetryDecision::DontRetry
);
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info(&enough_responses_no_data, true)),
RetryDecision::RetrySameNode(None)
);
assert_eq!(
policy.decide_should_retry(make_query_info(&enough_responses_no_data, true)),
RetryDecision::DontRetry
);
let enough_responses_with_data = QueryError::DbError(
DbError::ReadTimeout {
consistency: Consistency::Two,
received: 2,
required: 2,
data_present: true,
},
String::new(),
);
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info(&enough_responses_with_data, false)),
RetryDecision::DontRetry
);
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info(&enough_responses_with_data, true)),
RetryDecision::DontRetry
);
let not_enough_responses_with_data = QueryError::DbError(
DbError::ReadTimeout {
consistency: Consistency::Two,
received: 1,
required: 2,
data_present: true,
},
String::new(),
);
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info(¬_enough_responses_with_data, false)),
RetryDecision::DontRetry
);
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info(¬_enough_responses_with_data, true)),
RetryDecision::DontRetry
);
}
#[test]
fn default_write_timeout() {
setup_tracing();
let good_write_type = QueryError::DbError(
DbError::WriteTimeout {
consistency: Consistency::Two,
received: 1,
required: 2,
write_type: WriteType::BatchLog,
},
String::new(),
);
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info(&good_write_type, false)),
RetryDecision::DontRetry
);
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info(&good_write_type, true)),
RetryDecision::RetrySameNode(None)
);
assert_eq!(
policy.decide_should_retry(make_query_info(&good_write_type, true)),
RetryDecision::DontRetry
);
let bad_write_type = QueryError::DbError(
DbError::WriteTimeout {
consistency: Consistency::Two,
received: 4,
required: 2,
write_type: WriteType::Simple,
},
String::new(),
);
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info(&bad_write_type, false)),
RetryDecision::DontRetry
);
let mut policy = DefaultRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info(&bad_write_type, true)),
RetryDecision::DontRetry
);
}
}