use scylla_cql::Consistency;
use tracing::debug;
use crate::{
retry_policy::{QueryInfo, RetryDecision, RetryPolicy, RetrySession},
transport::errors::{DbError, QueryError, WriteType},
};
#[derive(Debug)]
pub struct DowngradingConsistencyRetryPolicy;
impl DowngradingConsistencyRetryPolicy {
pub fn new() -> DowngradingConsistencyRetryPolicy {
DowngradingConsistencyRetryPolicy
}
}
impl Default for DowngradingConsistencyRetryPolicy {
fn default() -> DowngradingConsistencyRetryPolicy {
DowngradingConsistencyRetryPolicy::new()
}
}
impl RetryPolicy for DowngradingConsistencyRetryPolicy {
fn new_session(&self) -> Box<dyn RetrySession> {
Box::new(DowngradingConsistencyRetrySession::new())
}
}
pub struct DowngradingConsistencyRetrySession {
was_retry: bool,
}
impl DowngradingConsistencyRetrySession {
pub fn new() -> DowngradingConsistencyRetrySession {
DowngradingConsistencyRetrySession { was_retry: false }
}
}
impl Default for DowngradingConsistencyRetrySession {
fn default() -> DowngradingConsistencyRetrySession {
DowngradingConsistencyRetrySession::new()
}
}
impl RetrySession for DowngradingConsistencyRetrySession {
fn decide_should_retry(&mut self, query_info: QueryInfo) -> RetryDecision {
let cl = match query_info.consistency {
Consistency::Serial | Consistency::LocalSerial => {
return match query_info.error {
QueryError::DbError(DbError::Unavailable { .. }, _) => {
RetryDecision::RetryNextNode(None)
}
_ => RetryDecision::DontRetry,
};
}
cl => cl,
};
fn max_likely_to_work_cl(known_ok: i32, previous_cl: Consistency) -> RetryDecision {
let decision = if known_ok >= 3 {
RetryDecision::RetrySameNode(Some(Consistency::Three))
} else if known_ok == 2 {
RetryDecision::RetrySameNode(Some(Consistency::Two))
} else if known_ok == 1 || previous_cl == Consistency::EachQuorum {
RetryDecision::RetrySameNode(Some(Consistency::One))
} else {
RetryDecision::DontRetry
};
if let RetryDecision::RetrySameNode(new_cl) = decision {
debug!(
"Decided to lower required consistency from {} to {:?}.",
previous_cl, new_cl
);
}
decision
}
match query_info.error {
QueryError::BrokenConnection(_)
| QueryError::ConnectionPoolError(_)
| QueryError::DbError(DbError::Overloaded, _)
| QueryError::DbError(DbError::ServerError, _)
| QueryError::DbError(DbError::TruncateError, _) => {
if query_info.is_idempotent {
RetryDecision::RetryNextNode(None)
} else {
RetryDecision::DontRetry
}
}
QueryError::DbError(DbError::Unavailable { alive, .. }, _) => {
if !self.was_retry {
self.was_retry = true;
max_likely_to_work_cl(*alive, cl)
} else {
RetryDecision::DontRetry
}
}
QueryError::DbError(
DbError::ReadTimeout {
received,
required,
data_present,
..
},
_,
) => {
if self.was_retry {
RetryDecision::DontRetry
} else if received < required {
self.was_retry = true;
max_likely_to_work_cl(*received, cl)
} else if !*data_present {
self.was_retry = true;
RetryDecision::RetrySameNode(None)
} else {
RetryDecision::DontRetry
}
}
QueryError::DbError(
DbError::WriteTimeout {
write_type,
received,
..
},
_,
) => {
if self.was_retry || !query_info.is_idempotent {
RetryDecision::DontRetry
} else {
self.was_retry = true;
match write_type {
WriteType::Batch | WriteType::Simple if *received > 0 => {
RetryDecision::IgnoreWriteError
}
WriteType::UnloggedBatch => {
max_likely_to_work_cl(*received, cl)
}
WriteType::BatchLog => RetryDecision::RetrySameNode(None),
_ => RetryDecision::DontRetry,
}
}
}
QueryError::DbError(DbError::IsBootstrapping, _) => RetryDecision::RetryNextNode(None),
QueryError::UnableToAllocStreamId => RetryDecision::RetryNextNode(None),
_ => RetryDecision::DontRetry,
}
}
fn reset(&mut self) {
*self = DowngradingConsistencyRetrySession::new();
}
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
use crate::test_utils::setup_tracing;
use crate::transport::errors::{
BadQuery, BrokenConnectionErrorKind, ConnectionPoolError, ProtocolError,
};
use super::*;
const CONSISTENCY_LEVELS: &[Consistency] = &[
Consistency::All,
Consistency::Any,
Consistency::EachQuorum,
Consistency::LocalOne,
Consistency::LocalQuorum,
Consistency::One,
Consistency::Quorum,
Consistency::Three,
Consistency::Two,
];
fn make_query_info_with_cl(
error: &QueryError,
is_idempotent: bool,
cl: Consistency,
) -> QueryInfo<'_> {
QueryInfo {
error,
is_idempotent,
consistency: cl,
}
}
fn downgrading_consistency_policy_assert_never_retries(error: QueryError, cl: Consistency) {
let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info_with_cl(&error, false, cl)),
RetryDecision::DontRetry
);
let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info_with_cl(&error, true, cl)),
RetryDecision::DontRetry
);
}
#[test]
fn downgrading_consistency_never_retries() {
setup_tracing();
let never_retried_dberrors = vec![
DbError::SyntaxError,
DbError::Invalid,
DbError::AlreadyExists {
keyspace: String::new(),
table: String::new(),
},
DbError::FunctionFailure {
keyspace: String::new(),
function: String::new(),
arg_types: vec![],
},
DbError::AuthenticationError,
DbError::Unauthorized,
DbError::ConfigError,
DbError::ReadFailure {
consistency: Consistency::Two,
received: 1,
required: 2,
numfailures: 1,
data_present: false,
},
DbError::WriteFailure {
consistency: Consistency::Two,
received: 1,
required: 2,
numfailures: 1,
write_type: WriteType::BatchLog,
},
DbError::Unprepared {
statement_id: Bytes::from_static(b"deadbeef"),
},
DbError::ProtocolError,
DbError::Other(0x124816),
];
for &cl in CONSISTENCY_LEVELS {
for dberror in never_retried_dberrors.clone() {
downgrading_consistency_policy_assert_never_retries(
QueryError::DbError(dberror, String::new()),
cl,
);
}
downgrading_consistency_policy_assert_never_retries(
QueryError::BadQuery(BadQuery::Other(
"Length of provided values must be equal to number of batch statements \
(got 1 values, 2 statements)"
.to_owned(),
)),
cl,
);
downgrading_consistency_policy_assert_never_retries(
ProtocolError::NonfinishedPagingState.into(),
cl,
);
}
}
fn downgrading_consistency_policy_assert_idempotent_next(error: QueryError, cl: Consistency) {
let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info_with_cl(&error, false, cl)),
RetryDecision::DontRetry
);
let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info_with_cl(&error, true, cl)),
RetryDecision::RetryNextNode(None)
);
}
fn max_likely_to_work_cl(known_ok: i32, current_cl: Consistency) -> RetryDecision {
if known_ok >= 3 {
RetryDecision::RetrySameNode(Some(Consistency::Three))
} else if known_ok == 2 {
RetryDecision::RetrySameNode(Some(Consistency::Two))
} else if known_ok == 1 || current_cl == Consistency::EachQuorum {
RetryDecision::RetrySameNode(Some(Consistency::One))
} else {
RetryDecision::DontRetry
}
}
#[test]
fn downgrading_consistency_idempotent_next_retries() {
setup_tracing();
let idempotent_next_errors = vec![
QueryError::DbError(DbError::Overloaded, String::new()),
QueryError::DbError(DbError::TruncateError, String::new()),
QueryError::DbError(DbError::ServerError, String::new()),
QueryError::BrokenConnection(
BrokenConnectionErrorKind::TooManyOrphanedStreamIds(5).into(),
),
QueryError::ConnectionPoolError(ConnectionPoolError::Initializing),
];
for &cl in CONSISTENCY_LEVELS {
for error in idempotent_next_errors.clone() {
downgrading_consistency_policy_assert_idempotent_next(error, cl);
}
}
}
#[test]
fn downgrading_consistency_bootstrapping() {
setup_tracing();
let error = QueryError::DbError(DbError::IsBootstrapping, String::new());
for &cl in CONSISTENCY_LEVELS {
let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info_with_cl(&error, false, cl)),
RetryDecision::RetryNextNode(None)
);
let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info_with_cl(&error, true, cl)),
RetryDecision::RetryNextNode(None)
);
}
}
#[test]
fn downgrading_consistency_unavailable() {
setup_tracing();
let alive = 1;
let error = QueryError::DbError(
DbError::Unavailable {
consistency: Consistency::Two,
required: 2,
alive,
},
String::new(),
);
for &cl in CONSISTENCY_LEVELS {
let mut policy_not_idempotent = DowngradingConsistencyRetryPolicy::new().new_session();
assert_eq!(
policy_not_idempotent
.decide_should_retry(make_query_info_with_cl(&error, false, cl)),
max_likely_to_work_cl(alive, cl)
);
assert_eq!(
policy_not_idempotent
.decide_should_retry(make_query_info_with_cl(&error, false, cl)),
RetryDecision::DontRetry
);
let mut policy_idempotent = DowngradingConsistencyRetryPolicy::new().new_session();
assert_eq!(
policy_idempotent.decide_should_retry(make_query_info_with_cl(&error, true, cl)),
max_likely_to_work_cl(alive, cl)
);
assert_eq!(
policy_idempotent.decide_should_retry(make_query_info_with_cl(&error, true, cl)),
RetryDecision::DontRetry
);
}
}
#[test]
fn downgrading_consistency_read_timeout() {
setup_tracing();
let enough_responses_no_data = QueryError::DbError(
DbError::ReadTimeout {
consistency: Consistency::Two,
received: 2,
required: 2,
data_present: false,
},
String::new(),
);
for &cl in CONSISTENCY_LEVELS {
let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info_with_cl(
&enough_responses_no_data,
false,
cl
)),
RetryDecision::RetrySameNode(None)
);
assert_eq!(
policy.decide_should_retry(make_query_info_with_cl(
&enough_responses_no_data,
false,
cl
)),
RetryDecision::DontRetry
);
let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info_with_cl(
&enough_responses_no_data,
true,
cl
)),
RetryDecision::RetrySameNode(None)
);
assert_eq!(
policy.decide_should_retry(make_query_info_with_cl(
&enough_responses_no_data,
true,
cl
)),
RetryDecision::DontRetry
);
}
let enough_responses_with_data = QueryError::DbError(
DbError::ReadTimeout {
consistency: Consistency::Two,
received: 2,
required: 2,
data_present: true,
},
String::new(),
);
for &cl in CONSISTENCY_LEVELS {
let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info_with_cl(
&enough_responses_with_data,
false,
cl
)),
RetryDecision::DontRetry
);
let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info_with_cl(
&enough_responses_with_data,
true,
cl
)),
RetryDecision::DontRetry
);
}
let received = 1;
let not_enough_responses_with_data = QueryError::DbError(
DbError::ReadTimeout {
consistency: Consistency::Two,
received,
required: 2,
data_present: true,
},
String::new(),
);
for &cl in CONSISTENCY_LEVELS {
let expected_decision = max_likely_to_work_cl(received, cl);
let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info_with_cl(
¬_enough_responses_with_data,
false,
cl
)),
expected_decision
);
if let RetryDecision::RetrySameNode(new_cl) = expected_decision {
assert_eq!(
policy.decide_should_retry(make_query_info_with_cl(
¬_enough_responses_with_data,
false,
new_cl.unwrap_or(cl)
)),
RetryDecision::DontRetry
);
}
let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info_with_cl(
¬_enough_responses_with_data,
true,
cl
)),
expected_decision
);
if let RetryDecision::RetrySameNode(new_cl) = expected_decision {
assert_eq!(
policy.decide_should_retry(make_query_info_with_cl(
¬_enough_responses_with_data,
true,
new_cl.unwrap_or(cl)
)),
RetryDecision::DontRetry
);
}
}
}
#[test]
fn downgrading_consistency_write_timeout() {
setup_tracing();
for (received, required) in (1..=5).zip(2..=6) {
let write_type_batchlog = QueryError::DbError(
DbError::WriteTimeout {
consistency: Consistency::Two,
received,
required,
write_type: WriteType::BatchLog,
},
String::new(),
);
for &cl in CONSISTENCY_LEVELS {
let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info_with_cl(
&write_type_batchlog,
false,
cl
)),
RetryDecision::DontRetry
);
let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info_with_cl(
&write_type_batchlog,
true,
cl
)),
RetryDecision::RetrySameNode(None)
);
assert_eq!(
policy.decide_should_retry(make_query_info_with_cl(
&write_type_batchlog,
true,
cl
)),
RetryDecision::DontRetry
);
}
let write_type_unlogged_batch = QueryError::DbError(
DbError::WriteTimeout {
consistency: Consistency::Two,
received,
required,
write_type: WriteType::UnloggedBatch,
},
String::new(),
);
for &cl in CONSISTENCY_LEVELS {
let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info_with_cl(
&write_type_unlogged_batch,
false,
cl
)),
RetryDecision::DontRetry
);
let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info_with_cl(
&write_type_unlogged_batch,
true,
cl
)),
max_likely_to_work_cl(received, cl)
);
assert_eq!(
policy.decide_should_retry(make_query_info_with_cl(
&write_type_unlogged_batch,
true,
cl
)),
RetryDecision::DontRetry
);
}
let write_type_other = QueryError::DbError(
DbError::WriteTimeout {
consistency: Consistency::Two,
received,
required,
write_type: WriteType::Simple,
},
String::new(),
);
for &cl in CONSISTENCY_LEVELS {
let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info_with_cl(
&write_type_other,
false,
cl
)),
RetryDecision::DontRetry
);
let mut policy = DowngradingConsistencyRetryPolicy::new().new_session();
assert_eq!(
policy.decide_should_retry(make_query_info_with_cl(
&write_type_other,
true,
cl
)),
RetryDecision::IgnoreWriteError
);
assert_eq!(
policy.decide_should_retry(make_query_info_with_cl(
&write_type_other,
true,
cl
)),
RetryDecision::DontRetry
);
}
}
}
}