1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
use futures::{
stream::{FuturesUnordered, StreamExt},
use scylla_cql::frame::response::error::DbError;
use std::{future::Future, sync::Arc, time::Duration};
use tracing::{trace_span, warn, Instrument};
use crate::transport::errors::QueryError;
use super::metrics::Metrics;
/// Context is passed as an argument to `SpeculativeExecutionPolicy` methods
pub struct Context {
pub metrics: Arc<Metrics>,
/// The policy that decides if the driver will send speculative queries to the
/// next hosts when the current host takes too long to respond.
pub trait SpeculativeExecutionPolicy: std::fmt::Debug + Send + Sync {
/// The maximum number of speculative executions that will be triggered
/// for a given request (does not include the initial request)
fn max_retry_count(&self, context: &Context) -> usize;
/// The delay between each speculative execution
fn retry_interval(&self, context: &Context) -> Duration;
/// A SpeculativeExecutionPolicy that schedules a given number of speculative
/// executions, separated by a fixed delay.
#[derive(Debug, Clone)]
pub struct SimpleSpeculativeExecutionPolicy {
/// The maximum number of speculative executions that will be triggered
/// for a given request (does not include the initial request)
pub max_retry_count: usize,
/// The delay between each speculative execution
pub retry_interval: Duration,
/// A policy that triggers speculative executions when the request to the current
/// host is above a given percentile.
#[derive(Debug, Clone)]
pub struct PercentileSpeculativeExecutionPolicy {
/// The maximum number of speculative executions that will be triggered
/// for a given request (does not include the initial request)
pub max_retry_count: usize,
/// The percentile that a request's latency must fall into to be considered
/// slow (ex: 99.0)
pub percentile: f64,
impl SpeculativeExecutionPolicy for SimpleSpeculativeExecutionPolicy {
fn max_retry_count(&self, _: &Context) -> usize {
fn retry_interval(&self, _: &Context) -> Duration {
impl SpeculativeExecutionPolicy for PercentileSpeculativeExecutionPolicy {
fn max_retry_count(&self, _: &Context) -> usize {
fn retry_interval(&self, context: &Context) -> Duration {
let interval = context.metrics.get_latency_percentile_ms(self.percentile);
let ms = match interval {
Ok(d) => d,
Err(e) => {
"Failed to get latency percentile ({}), defaulting to 100 ms",
/// Checks if a result created in a speculative execution branch can be ignored.
/// We should ignore errors such that their presence when executing the request
/// on one node, does not imply that the same error will appear during retry on some other node.
fn can_be_ignored<ResT>(result: &Result<ResT, QueryError>) -> bool {
match result {
Ok(_) => false,
// Do not remove this lint!
// It's there for a reason - we don't want new variants
// automatically fall under `_` pattern when they are introduced.
Err(e) => match e {
// Errors that will almost certainly appear for other nodes as well
| QueryError::CqlRequestSerialization(_)
| QueryError::BodyExtensionsParseError(_)
| QueryError::CqlResultParseError(_)
| QueryError::CqlErrorParseError(_)
| QueryError::ProtocolError(_) => false,
// EmptyPlan is not returned by `Session::execute_query`.
// It is represented by None, which is then transformed
// to QueryError::EmptyPlan by the caller
// (either here is speculative_execution module, or for non-speculative execution).
// I believe this should not be ignored, since we do not expect it here.
QueryError::EmptyPlan => false,
// Errors that should not appear here, thus should not be ignored
| QueryError::IntoLegacyQueryResultError(_)
| QueryError::TimeoutError
| QueryError::RequestTimeout(_)
| QueryError::MetadataError(_) => false,
// Errors that can be ignored
| QueryError::UnableToAllocStreamId
| QueryError::ConnectionPoolError(_) => true,
// Handle DbErrors
QueryError::DbError(db_error, _) => {
// Do not remove this lint!
// It's there for a reason - we don't want new variants
// automatically fall under `_` pattern when they are introduced.
match db_error {
// Errors that will almost certainly appear on other nodes as well
| DbError::Invalid
| DbError::AlreadyExists { .. }
| DbError::Unauthorized
| DbError::ProtocolError => false,
// Errors that should not appear there - thus, should not be ignored.
DbError::AuthenticationError | DbError::Other(_) => false,
// For now, let's assume that UDF failure is not transient - don't ignore it
// TODO: investigate
DbError::FunctionFailure { .. } => false,
// Not sure when these can appear - don't ignore them
// TODO: Investigate these errors
DbError::ConfigError | DbError::TruncateError => false,
// Errors that we can ignore and perform a retry on some other node
DbError::Unavailable { .. }
| DbError::Overloaded
| DbError::IsBootstrapping
| DbError::ReadTimeout { .. }
| DbError::WriteTimeout { .. }
| DbError::ReadFailure { .. }
| DbError::WriteFailure { .. }
// Preparation may succeed on some other node.
| DbError::Unprepared { .. }
| DbError::ServerError
| DbError::RateLimitReached { .. } => true,
const EMPTY_PLAN_ERROR: QueryError = QueryError::EmptyPlan;
pub(crate) async fn execute<QueryFut, ResT>(
policy: &dyn SpeculativeExecutionPolicy,
context: &Context,
query_runner_generator: impl Fn(bool) -> QueryFut,
) -> Result<ResT, QueryError>
QueryFut: Future<Output = Option<Result<ResT, QueryError>>>,
let mut retries_remaining = policy.max_retry_count(context);
let retry_interval = policy.retry_interval(context);
let mut async_tasks = FuturesUnordered::new();
.instrument(trace_span!("Speculative execution: original query")),
let sleep = tokio::time::sleep(retry_interval).fuse();
let mut last_error = None;
loop {
futures::select! {
_ = &mut sleep => {
if retries_remaining > 0 {
async_tasks.push(query_runner_generator(true).instrument(trace_span!("Speculative execution", retries_remaining = retries_remaining)));
retries_remaining -= 1;
// reset the timeout
res = async_tasks.select_next_some() => {
if let Some(r) = res {
if !can_be_ignored(&r) {
return r;
} else {
last_error = Some(r)
if async_tasks.is_empty() && retries_remaining == 0 {
return last_error.unwrap_or({