pub use scylla_cql::frame::response::error::{DbError, OperationType, WriteType};
use std::{
error::Error,
io::ErrorKind,
net::{AddrParseError, IpAddr, SocketAddr},
num::ParseIntError,
sync::Arc,
};
#[allow(deprecated)]
use scylla_cql::{
frame::{
frame_errors::{
CqlAuthChallengeParseError, CqlAuthSuccessParseError, CqlAuthenticateParseError,
CqlErrorParseError, CqlEventParseError, CqlRequestSerializationError,
CqlResponseParseError, CqlResultParseError, CqlSupportedParseError,
FrameBodyExtensionsParseError, FrameHeaderParseError,
},
request::CqlRequestKind,
response::CqlResponseKind,
value::SerializeValuesError,
},
types::{
deserialize::{DeserializationError, TypeCheckError},
serialize::SerializationError,
},
};
use thiserror::Error;
use crate::{authentication::AuthError, frame::response};
use super::iterator::NextRowError;
#[allow(deprecated)]
use super::legacy_query_result::IntoLegacyQueryResultError;
use super::query_result::{IntoRowsResultError, SingleRowError};
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
#[allow(deprecated)]
pub enum QueryError {
#[error("Database returned an error: {0}, Error message: {1}")]
DbError(DbError, String),
#[error(transparent)]
BadQuery(#[from] BadQuery),
#[error("Failed to serialize CQL request: {0}")]
CqlRequestSerialization(#[from] CqlRequestSerializationError),
#[error(transparent)]
BodyExtensionsParseError(#[from] FrameBodyExtensionsParseError),
#[error(
"Load balancing policy returned an empty plan.\
First thing to investigate should be the logic of custom LBP implementation.\
If you think that your LBP implementation is correct, or you make use of `DefaultPolicy`,\
then this is most probably a driver bug!"
)]
EmptyPlan,
#[error(transparent)]
CqlResultParseError(#[from] CqlResultParseError),
#[error("Failed to deserialize ERROR response: {0}")]
CqlErrorParseError(#[from] CqlErrorParseError),
#[error("Cluster metadata fetch error occurred during automatic schema agreement: {0}")]
MetadataError(#[from] MetadataError),
#[error("No connections in the pool: {0}")]
ConnectionPoolError(#[from] ConnectionPoolError),
#[error("Protocol error: {0}")]
ProtocolError(#[from] ProtocolError),
#[error("Timeout Error")]
TimeoutError,
#[error(transparent)]
BrokenConnection(#[from] BrokenConnectionError),
#[error("Unable to allocate stream id")]
UnableToAllocStreamId,
#[error("Request timeout: {0}")]
RequestTimeout(String),
#[error("An error occurred during async iteration over rows of result: {0}")]
NextRowError(#[from] NextRowError),
#[deprecated(
since = "0.15.1",
note = "Legacy deserialization API is inefficient and is going to be removed soon"
)]
#[allow(deprecated)]
#[error("Failed to convert `QueryResult` into `LegacyQueryResult`: {0}")]
IntoLegacyQueryResultError(#[from] IntoLegacyQueryResultError),
}
#[allow(deprecated)]
impl From<SerializeValuesError> for QueryError {
fn from(serialized_err: SerializeValuesError) -> QueryError {
QueryError::BadQuery(BadQuery::SerializeValuesError(serialized_err))
}
}
impl From<SerializationError> for QueryError {
fn from(serialized_err: SerializationError) -> QueryError {
QueryError::BadQuery(BadQuery::SerializationError(serialized_err))
}
}
impl From<tokio::time::error::Elapsed> for QueryError {
fn from(timer_error: tokio::time::error::Elapsed) -> QueryError {
QueryError::RequestTimeout(format!("{}", timer_error))
}
}
impl From<UserRequestError> for QueryError {
fn from(value: UserRequestError) -> Self {
match value {
UserRequestError::CqlRequestSerialization(e) => e.into(),
UserRequestError::DbError(err, msg) => QueryError::DbError(err, msg),
UserRequestError::CqlResultParseError(e) => e.into(),
UserRequestError::CqlErrorParseError(e) => e.into(),
UserRequestError::BrokenConnectionError(e) => e.into(),
UserRequestError::UnexpectedResponse(response) => {
ProtocolError::UnexpectedResponse(response).into()
}
UserRequestError::BodyExtensionsParseError(e) => e.into(),
UserRequestError::UnableToAllocStreamId => QueryError::UnableToAllocStreamId,
UserRequestError::RepreparedIdChanged {
statement,
expected_id,
reprepared_id,
} => ProtocolError::RepreparedIdChanged {
statement,
expected_id,
reprepared_id,
}
.into(),
}
}
}
impl From<QueryError> for NewSessionError {
fn from(query_error: QueryError) -> NewSessionError {
match query_error {
QueryError::DbError(e, msg) => NewSessionError::DbError(e, msg),
QueryError::BadQuery(e) => NewSessionError::BadQuery(e),
QueryError::CqlRequestSerialization(e) => NewSessionError::CqlRequestSerialization(e),
QueryError::CqlResultParseError(e) => NewSessionError::CqlResultParseError(e),
QueryError::CqlErrorParseError(e) => NewSessionError::CqlErrorParseError(e),
QueryError::BodyExtensionsParseError(e) => NewSessionError::BodyExtensionsParseError(e),
QueryError::EmptyPlan => NewSessionError::EmptyPlan,
QueryError::MetadataError(e) => NewSessionError::MetadataError(e),
QueryError::ConnectionPoolError(e) => NewSessionError::ConnectionPoolError(e),
QueryError::ProtocolError(e) => NewSessionError::ProtocolError(e),
QueryError::TimeoutError => NewSessionError::TimeoutError,
QueryError::BrokenConnection(e) => NewSessionError::BrokenConnection(e),
QueryError::UnableToAllocStreamId => NewSessionError::UnableToAllocStreamId,
QueryError::RequestTimeout(msg) => NewSessionError::RequestTimeout(msg),
#[allow(deprecated)]
QueryError::IntoLegacyQueryResultError(e) => {
NewSessionError::IntoLegacyQueryResultError(e)
}
QueryError::NextRowError(e) => NewSessionError::NextRowError(e),
}
}
}
impl From<BadKeyspaceName> for QueryError {
fn from(keyspace_err: BadKeyspaceName) -> QueryError {
QueryError::BadQuery(BadQuery::BadKeyspaceName(keyspace_err))
}
}
impl From<response::Error> for QueryError {
fn from(error: response::Error) -> QueryError {
QueryError::DbError(error.error, error.reason)
}
}
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
#[allow(deprecated)]
pub enum NewSessionError {
#[error("Couldn't resolve any hostname: {0:?}")]
FailedToResolveAnyHostname(Vec<String>),
#[error("Empty known nodes list")]
EmptyKnownNodesList,
#[error("Database returned an error: {0}, Error message: {1}")]
DbError(DbError, String),
#[error(transparent)]
BadQuery(#[from] BadQuery),
#[error("Failed to serialize CQL request: {0}")]
CqlRequestSerialization(#[from] CqlRequestSerializationError),
#[error(
"Load balancing policy returned an empty plan.\
First thing to investigate should be the logic of custom LBP implementation.\
If you think that your LBP implementation is correct, or you make use of `DefaultPolicy`,\
then this is most probably a driver bug!"
)]
EmptyPlan,
#[error(transparent)]
BodyExtensionsParseError(#[from] FrameBodyExtensionsParseError),
#[error("Failed to perform initial cluster metadata fetch: {0}")]
MetadataError(#[from] MetadataError),
#[error(transparent)]
CqlResultParseError(#[from] CqlResultParseError),
#[error("Failed to deserialize ERROR response: {0}")]
CqlErrorParseError(#[from] CqlErrorParseError),
#[error("No connections in the pool: {0}")]
ConnectionPoolError(#[from] ConnectionPoolError),
#[error("Protocol error: {0}")]
ProtocolError(#[from] ProtocolError),
#[error("Timeout Error")]
TimeoutError,
#[error(transparent)]
BrokenConnection(#[from] BrokenConnectionError),
#[error("Unable to allocate stream id")]
UnableToAllocStreamId,
#[error("Client timeout: {0}")]
RequestTimeout(String),
#[error("An error occurred during async iteration over rows of result: {0}")]
NextRowError(#[from] NextRowError),
#[deprecated(
since = "0.15.1",
note = "Legacy deserialization API is inefficient and is going to be removed soon"
)]
#[allow(deprecated)]
#[error("Failed to convert `QueryResult` into `LegacyQueryResult`: {0}")]
IntoLegacyQueryResultError(#[from] IntoLegacyQueryResultError),
}
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
pub enum ProtocolError {
#[error(
"Received unexpected response from the server: {0}. Expected RESULT or ERROR response."
)]
UnexpectedResponse(CqlResponseKind),
#[error(
"Prepared statement id mismatch between multiple connections - all result ids should be equal."
)]
PreparedStatementIdsMismatch,
#[error(
"Prepared statement id changed after repreparation; md5 sum (computed from the query string) should stay the same;\
Statement: \"{statement}\"; expected id: {expected_id:?}; reprepared id: {reprepared_id:?}"
)]
RepreparedIdChanged {
statement: String,
expected_id: Vec<u8>,
reprepared_id: Vec<u8>,
},
#[error("USE KEYSPACE protocol error: {0}")]
UseKeyspace(#[from] UseKeyspaceProtocolError),
#[error("Schema version fetch protocol error: {0}")]
SchemaVersionFetch(#[from] SchemaVersionFetchError),
#[error("Unpaged query returned a non-empty paging state! This is a driver-side or server-side bug.")]
NonfinishedPagingState,
#[error("Failed to parse a CQL type '{typ}', at position {position}: {reason}")]
InvalidCqlType {
typ: String,
position: usize,
reason: String,
},
#[error("Unable extract a partition key based on prepared statement's metadata")]
PartitionKeyExtraction,
#[error("Tracing info fetch protocol error: {0}")]
Tracing(#[from] TracingProtocolError),
#[error("Reprepared statement's id does not exist in the batch.")]
RepreparedIdMissingInBatch,
}
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
pub enum UseKeyspaceProtocolError {
#[error("Keyspace name mismtach; expected: {expected_keyspace_name_lowercase}, received: {result_keyspace_name_lowercase}")]
KeyspaceNameMismatch {
expected_keyspace_name_lowercase: String,
result_keyspace_name_lowercase: String,
},
#[error("Received unexpected response: {0}. Expected RESULT:Set_keyspace")]
UnexpectedResponse(CqlResponseKind),
}
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
pub enum SchemaVersionFetchError {
#[error("Failed to convert schema version query result into rows result: {0}")]
TracesEventsIntoRowsResultError(IntoRowsResultError),
#[error(transparent)]
SingleRowError(SingleRowError),
}
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
pub enum TracingProtocolError {
#[error("Failed to convert result of system_traces.session query to rows result")]
TracesSessionIntoRowsResultError(IntoRowsResultError),
#[error("system_traces.session has invalid column type: {0}")]
TracesSessionInvalidColumnType(TypeCheckError),
#[error("Response to system_traces.session failed to deserialize: {0}")]
TracesSessionDeserializationFailed(DeserializationError),
#[error("Failed to convert result of system_traces.events query to rows result")]
TracesEventsIntoRowsResultError(IntoRowsResultError),
#[error("system_traces.events has invalid column type: {0}")]
TracesEventsInvalidColumnType(TypeCheckError),
#[error("Response to system_traces.events failed to deserialize: {0}")]
TracesEventsDeserializationFailed(DeserializationError),
#[error(
"All tracing queries returned an empty result, \
maybe the trace information didn't propagate yet. \
Consider configuring Session with \
a longer fetch interval (tracing_info_fetch_interval)"
)]
EmptyResults,
}
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
pub enum MetadataError {
#[error("Bad peers metadata: {0}")]
Peers(#[from] PeersMetadataError),
#[error("Bad keyspaces metadata: {0}")]
Keyspaces(#[from] KeyspacesMetadataError),
#[error("Bad UDTs metadata: {0}")]
Udts(#[from] UdtMetadataError),
#[error("Bad tables metadata: {0}")]
Tables(#[from] TablesMetadataError),
#[error("Bad views metadata: {0}")]
Views(#[from] ViewsMetadataError),
}
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
pub enum PeersMetadataError {
#[error("system.peers has invalid column type: {0}")]
SystemPeersInvalidColumnType(TypeCheckError),
#[error("system.local has invalid column type: {0}")]
SystemLocalInvalidColumnType(TypeCheckError),
#[error("Peers list is empty")]
EmptyPeers,
#[error("All peers have empty token lists")]
EmptyTokenLists,
}
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
pub enum KeyspacesMetadataError {
#[error("system_schema.keyspaces has invalid column type: {0}")]
SchemaKeyspacesInvalidColumnType(TypeCheckError),
#[error("Bad keyspace <{keyspace}> replication strategy: {error}")]
Strategy {
keyspace: String,
error: KeyspaceStrategyError,
},
}
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
pub enum KeyspaceStrategyError {
#[error("keyspace strategy definition is missing a 'class' field")]
MissingClassForStrategyDefinition,
#[error("Missing replication factor field for SimpleStrategy")]
MissingReplicationFactorForSimpleStrategy,
#[error("Failed to parse a replication factor as unsigned integer: {0}")]
ReplicationFactorParseError(ParseIntError),
#[error("Unexpected NetworkTopologyStrategy option: '{key}': '{value}'")]
UnexpectedNetworkTopologyStrategyOption { key: String, value: String },
}
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
pub enum UdtMetadataError {
#[error("system_schema.types has invalid column type: {0}")]
SchemaTypesInvalidColumnType(TypeCheckError),
#[error("Detected circular dependency between user defined types - toposort is impossible!")]
CircularTypeDependency,
}
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
pub enum TablesMetadataError {
#[error("system_schema.tables has invalid column type: {0}")]
SchemaTablesInvalidColumnType(TypeCheckError),
#[error("system_schema.columns has invalid column type: {0}")]
SchemaColumnsInvalidColumnType(TypeCheckError),
#[error("Unknown column kind '{column_kind}' for {keyspace_name}.{table_name}.{column_name}")]
UnknownColumnKind {
keyspace_name: String,
table_name: String,
column_name: String,
column_kind: String,
},
}
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
pub enum ViewsMetadataError {
#[error("system_schema.views has invalid column type: {0}")]
SchemaViewsInvalidColumnType(TypeCheckError),
}
#[derive(Error, Debug, Clone)]
#[error("Invalid query passed to Session")]
#[non_exhaustive]
pub enum BadQuery {
#[deprecated(
since = "0.15.1",
note = "Legacy serialization API is not type-safe and is going to be removed soon"
)]
#[error("Serializing values failed: {0} ")]
#[allow(deprecated)]
SerializeValuesError(#[from] SerializeValuesError),
#[error("Serializing values failed: {0} ")]
SerializationError(#[from] SerializationError),
#[error("Serialized values are too long to compute partition key! Length: {0}, Max allowed length: {1}")]
ValuesTooLongForKey(usize, usize),
#[error("Passed invalid keyspace name to use: {0}")]
BadKeyspaceName(#[from] BadKeyspaceName),
#[error("Number of Queries in Batch Statement supplied is {0} which has exceeded the max value of 65,535")]
TooManyQueriesInBatchStatement(usize),
#[error("{0}")]
Other(String),
}
#[derive(Debug, Error, Clone)]
#[non_exhaustive]
pub enum BadKeyspaceName {
#[error("Keyspace name is empty")]
Empty,
#[error("Keyspace name too long, must be up to 48 characters, found {1} characters. Bad keyspace name: '{0}'")]
TooLong(String, usize),
#[error("Illegal character found: '{1}', only alphanumeric and underscores allowed. Bad keyspace name: '{0}'")]
IllegalCharacter(String, char),
}
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
pub enum ConnectionPoolError {
#[error("The pool is broken; Last connection failed with: {last_connection_error}")]
Broken {
last_connection_error: ConnectionError,
},
#[error("Pool is still being initialized")]
Initializing,
#[error("The node has been disabled by a host filter")]
NodeDisabledByHostFilter,
}
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
pub enum ConnectionError {
#[error("Connect timeout elapsed")]
ConnectTimeout,
#[error(transparent)]
IoError(Arc<std::io::Error>),
#[error("Could not find free source port for shard {0}")]
NoSourcePortForShard(u32),
#[error("Address translation failed: {0}")]
TranslationError(#[from] TranslationError),
#[error(transparent)]
BrokenConnection(#[from] BrokenConnectionError),
#[error(transparent)]
ConnectionSetupRequestError(#[from] ConnectionSetupRequestError),
}
impl From<std::io::Error> for ConnectionError {
fn from(value: std::io::Error) -> Self {
ConnectionError::IoError(Arc::new(value))
}
}
impl ConnectionError {
pub fn is_address_unavailable_for_use(&self) -> bool {
if let ConnectionError::IoError(io_error) = self {
match io_error.kind() {
ErrorKind::AddrInUse | ErrorKind::PermissionDenied => return true,
_ => {}
}
}
false
}
}
#[non_exhaustive]
#[derive(Debug, Clone, Error)]
pub enum TranslationError {
#[error("No rule for address {0}")]
NoRuleForAddress(SocketAddr),
#[error("Failed to parse translated address: {translated_addr_str}, reason: {reason}")]
InvalidAddressInRule {
translated_addr_str: &'static str,
reason: AddrParseError,
},
}
#[derive(Error, Debug, Clone)]
#[error("Failed to perform a connection setup request. Request: {request_kind}, reason: {error}")]
#[non_exhaustive]
pub struct ConnectionSetupRequestError {
pub request_kind: CqlRequestKind,
pub error: ConnectionSetupRequestErrorKind,
}
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
pub enum ConnectionSetupRequestErrorKind {
#[error("Failed to serialize CQL request: {0}")]
CqlRequestSerialization(#[from] CqlRequestSerializationError),
#[error(transparent)]
BodyExtensionsParseError(#[from] FrameBodyExtensionsParseError),
#[error("Unable to allocate stream id")]
UnableToAllocStreamId,
#[error(transparent)]
BrokenConnection(#[from] BrokenConnectionError),
#[error("Database returned an error: {0}, Error message: {1}")]
DbError(DbError, String),
#[error("Received unexpected response from the server: {0}")]
UnexpectedResponse(CqlResponseKind),
#[error("Failed to deserialize SUPPORTED response: {0}")]
CqlSupportedParseError(#[from] CqlSupportedParseError),
#[error("Failed to deserialize AUTHENTICATE response: {0}")]
CqlAuthenticateParseError(#[from] CqlAuthenticateParseError),
#[error("Failed to deserialize AUTH_SUCCESS response: {0}")]
CqlAuthSuccessParseError(#[from] CqlAuthSuccessParseError),
#[error("Failed to deserialize AUTH_CHALLENGE response: {0}")]
CqlAuthChallengeParseError(#[from] CqlAuthChallengeParseError),
#[error("Failed to deserialize ERROR response: {0}")]
CqlErrorParseError(#[from] CqlErrorParseError),
#[error("Failed to start client's auth session: {0}")]
StartAuthSessionError(AuthError),
#[error("Failed to evaluate auth challenge on client side: {0}")]
AuthChallengeEvaluationError(AuthError),
#[error("Failed to finish auth challenge on client side: {0}")]
AuthFinishError(AuthError),
#[error("Authentication is required. You can use SessionBuilder::user(\"user\", \"pass\") to provide credentials or SessionBuilder::authenticator_provider to provide custom authenticator")]
MissingAuthentication,
}
impl ConnectionSetupRequestError {
pub(crate) fn new(
request_kind: CqlRequestKind,
error: ConnectionSetupRequestErrorKind,
) -> Self {
ConnectionSetupRequestError {
request_kind,
error,
}
}
pub fn get_error(&self) -> &ConnectionSetupRequestErrorKind {
&self.error
}
}
#[derive(Error, Debug, Clone)]
#[error("Connection broken, reason: {0}")]
pub struct BrokenConnectionError(Arc<dyn Error + Sync + Send>);
impl BrokenConnectionError {
pub fn downcast_ref<T: Error + 'static>(&self) -> Option<&T> {
self.0.downcast_ref()
}
}
#[derive(Error, Debug)]
#[non_exhaustive]
pub enum BrokenConnectionErrorKind {
#[error("Timed out while waiting for response to keepalive request on connection to node {0}")]
KeepaliveTimeout(IpAddr),
#[error("Failed to execute keepalive query: {0}")]
KeepaliveQueryError(RequestError),
#[error("Failed to deserialize frame: {0}")]
FrameHeaderParseError(FrameHeaderParseError),
#[error("Failed to handle server event: {0}")]
CqlEventHandlingError(#[from] CqlEventHandlingError),
#[error("Received a server frame with unexpected stream id: {0}")]
UnexpectedStreamId(i16),
#[error("Failed to write data: {0}")]
WriteError(std::io::Error),
#[error("Too many orphaned stream ids: {0}")]
TooManyOrphanedStreamIds(u16),
#[error(
"Failed to send/receive data needed to perform a request via tokio channel.
It implies that other half of the channel has been dropped.
The connection was already broken for some other reason."
)]
ChannelError,
}
impl From<BrokenConnectionErrorKind> for BrokenConnectionError {
fn from(value: BrokenConnectionErrorKind) -> Self {
BrokenConnectionError(Arc::new(value))
}
}
#[derive(Error, Debug)]
#[non_exhaustive]
pub enum CqlEventHandlingError {
#[error("Failed to deserialize EVENT response: {0}")]
CqlEventParseError(#[from] CqlEventParseError),
#[error("Received unexpected server response on stream -1: {0}. Expected EVENT response")]
UnexpectedResponse(CqlResponseKind),
#[error("Failed to deserialize a header of frame received on stream -1: {0}")]
BodyExtensionParseError(#[from] FrameBodyExtensionsParseError),
#[error("Failed to send event info via channel. The channel is probably closed, which is caused by connection being broken")]
SendError,
}
#[derive(Error, Debug)]
pub(crate) enum UserRequestError {
#[error("Failed to serialize CQL request: {0}")]
CqlRequestSerialization(#[from] CqlRequestSerializationError),
#[error("Database returned an error: {0}, Error message: {1}")]
DbError(DbError, String),
#[error(transparent)]
CqlResultParseError(#[from] CqlResultParseError),
#[error("Failed to deserialize ERROR response: {0}")]
CqlErrorParseError(#[from] CqlErrorParseError),
#[error(
"Received unexpected response from the server: {0}. Expected RESULT or ERROR response."
)]
UnexpectedResponse(CqlResponseKind),
#[error(transparent)]
BrokenConnectionError(#[from] BrokenConnectionError),
#[error(transparent)]
BodyExtensionsParseError(#[from] FrameBodyExtensionsParseError),
#[error("Unable to allocate stream id")]
UnableToAllocStreamId,
#[error(
"Prepared statement id changed after repreparation; md5 sum (computed from the query string) should stay the same;\
Statement: \"{statement}\"; expected id: {expected_id:?}; reprepared id: {reprepared_id:?}"
)]
RepreparedIdChanged {
statement: String,
expected_id: Vec<u8>,
reprepared_id: Vec<u8>,
},
}
impl From<response::error::Error> for UserRequestError {
fn from(value: response::error::Error) -> Self {
UserRequestError::DbError(value.error, value.reason)
}
}
impl From<RequestError> for UserRequestError {
fn from(value: RequestError) -> Self {
match value {
RequestError::CqlRequestSerialization(e) => e.into(),
RequestError::BodyExtensionsParseError(e) => e.into(),
RequestError::CqlResponseParseError(e) => match e {
CqlResponseParseError::CqlErrorParseError(e) => e.into(),
CqlResponseParseError::CqlResultParseError(e) => e.into(),
_ => UserRequestError::UnexpectedResponse(e.to_response_kind()),
},
RequestError::BrokenConnection(e) => e.into(),
RequestError::UnableToAllocStreamId => UserRequestError::UnableToAllocStreamId,
}
}
}
#[derive(Error, Debug)]
#[non_exhaustive]
pub enum RequestError {
#[error("Failed to serialize CQL request: {0}")]
CqlRequestSerialization(#[from] CqlRequestSerializationError),
#[error(transparent)]
BodyExtensionsParseError(#[from] FrameBodyExtensionsParseError),
#[error(transparent)]
CqlResponseParseError(#[from] CqlResponseParseError),
#[error(transparent)]
BrokenConnection(#[from] BrokenConnectionError),
#[error("Unable to allocate a stream id")]
UnableToAllocStreamId,
}
impl From<ResponseParseError> for RequestError {
fn from(value: ResponseParseError) -> Self {
match value {
ResponseParseError::BodyExtensionsParseError(e) => e.into(),
ResponseParseError::CqlResponseParseError(e) => e.into(),
}
}
}
#[derive(Error, Debug)]
pub(crate) enum ResponseParseError {
#[error(transparent)]
BodyExtensionsParseError(#[from] FrameBodyExtensionsParseError),
#[error(transparent)]
CqlResponseParseError(#[from] CqlResponseParseError),
}
#[cfg(test)]
mod tests {
use scylla_cql::Consistency;
use crate::transport::errors::{DbError, QueryError, WriteType};
#[test]
fn write_type_from_str() {
let test_cases: [(&str, WriteType); 9] = [
("SIMPLE", WriteType::Simple),
("BATCH", WriteType::Batch),
("UNLOGGED_BATCH", WriteType::UnloggedBatch),
("COUNTER", WriteType::Counter),
("BATCH_LOG", WriteType::BatchLog),
("CAS", WriteType::Cas),
("VIEW", WriteType::View),
("CDC", WriteType::Cdc),
("SOMEOTHER", WriteType::Other("SOMEOTHER".to_string())),
];
for (write_type_str, expected_write_type) in &test_cases {
let write_type = WriteType::from(*write_type_str);
assert_eq!(write_type, *expected_write_type);
}
}
#[test]
fn dberror_full_info() {
let db_error = DbError::Unavailable {
consistency: Consistency::Three,
required: 3,
alive: 2,
};
let db_error_displayed: String = format!("{}", db_error);
let mut expected_dberr_msg =
"Not enough nodes are alive to satisfy required consistency level ".to_string();
expected_dberr_msg += "(consistency: Three, required: 3, alive: 2)";
assert_eq!(db_error_displayed, expected_dberr_msg);
let query_error =
QueryError::DbError(db_error, "a message about unavailable error".to_string());
let query_error_displayed: String = format!("{}", query_error);
let mut expected_querr_msg = "Database returned an error: ".to_string();
expected_querr_msg += &expected_dberr_msg;
expected_querr_msg += ", Error message: a message about unavailable error";
assert_eq!(query_error_displayed, expected_querr_msg);
}
}