scylla_cql/frame/response/
error.rs

1use crate::frame::frame_errors::{CqlErrorParseError, LowLevelDeserializationError};
2use crate::frame::protocol_features::ProtocolFeatures;
3use crate::frame::types;
4use crate::Consistency;
5use byteorder::ReadBytesExt;
6use bytes::Bytes;
7use thiserror::Error;
8
9#[derive(Debug, Clone)]
10pub struct Error {
11    pub error: DbError,
12    pub reason: String,
13}
14
15fn make_error_field_err(
16    db_error: &'static str,
17    field: &'static str,
18    err: impl Into<LowLevelDeserializationError>,
19) -> CqlErrorParseError {
20    CqlErrorParseError::MalformedErrorField {
21        db_error,
22        field,
23        err: err.into(),
24    }
25}
26
27impl Error {
28    pub fn deserialize(
29        features: &ProtocolFeatures,
30        buf: &mut &[u8],
31    ) -> Result<Self, CqlErrorParseError> {
32        let code = types::read_int(buf)
33            .map_err(|err| CqlErrorParseError::ErrorCodeParseError(err.into()))?;
34        let reason = types::read_string(buf)
35            .map_err(CqlErrorParseError::ReasonParseError)?
36            .to_owned();
37
38        let error: DbError = match code {
39            0x0000 => DbError::ServerError,
40            0x000A => DbError::ProtocolError,
41            0x0100 => DbError::AuthenticationError,
42            0x1000 => DbError::Unavailable {
43                consistency: types::read_consistency(buf)
44                    .map_err(|err| make_error_field_err("UNAVAILABLE", "CONSISTENCY", err))?,
45                required: types::read_int(buf)
46                    .map_err(|err| make_error_field_err("UNAVAILABLE", "REQUIRED", err))?,
47                alive: types::read_int(buf)
48                    .map_err(|err| make_error_field_err("UNAVAILABLE", "ALIVE", err))?,
49            },
50            0x1001 => DbError::Overloaded,
51            0x1002 => DbError::IsBootstrapping,
52            0x1003 => DbError::TruncateError,
53            0x1100 => DbError::WriteTimeout {
54                consistency: types::read_consistency(buf)
55                    .map_err(|err| make_error_field_err("WRITE_TIMEOUT", "CONSISTENCY", err))?,
56                received: types::read_int(buf)
57                    .map_err(|err| make_error_field_err("WRITE_TIMEOUT", "RECEIVED", err))?,
58                required: types::read_int(buf)
59                    .map_err(|err| make_error_field_err("WRITE_TIMEOUT", "REQUIRED", err))?,
60                write_type: WriteType::from(
61                    types::read_string(buf)
62                        .map_err(|err| make_error_field_err("WRITE_TIMEOUT", "WRITE_TYPE", err))?,
63                ),
64            },
65            0x1200 => DbError::ReadTimeout {
66                consistency: types::read_consistency(buf)
67                    .map_err(|err| make_error_field_err("READ_TIMEOUT", "CONSISTENCY", err))?,
68                received: types::read_int(buf)
69                    .map_err(|err| make_error_field_err("READ_TIMEOUT", "RECEIVED", err))?,
70                required: types::read_int(buf)
71                    .map_err(|err| make_error_field_err("READ_TIMEOUT", "REQUIRED", err))?,
72                data_present: buf
73                    .read_u8()
74                    .map_err(|err| make_error_field_err("READ_TIMEOUT", "DATA_PRESENT", err))?
75                    != 0,
76            },
77            0x1300 => DbError::ReadFailure {
78                consistency: types::read_consistency(buf)
79                    .map_err(|err| make_error_field_err("READ_FAILURE", "CONSISTENCY", err))?,
80                received: types::read_int(buf)
81                    .map_err(|err| make_error_field_err("READ_FAILURE", "RECEIVED", err))?,
82                required: types::read_int(buf)
83                    .map_err(|err| make_error_field_err("READ_FAILURE", "REQUIRED", err))?,
84                numfailures: types::read_int(buf)
85                    .map_err(|err| make_error_field_err("READ_FAILURE", "NUM_FAILURES", err))?,
86                data_present: buf
87                    .read_u8()
88                    .map_err(|err| make_error_field_err("READ_FAILURE", "DATA_PRESENT", err))?
89                    != 0,
90            },
91            0x1400 => DbError::FunctionFailure {
92                keyspace: types::read_string(buf)
93                    .map_err(|err| make_error_field_err("FUNCTION_FAILURE", "KEYSPACE", err))?
94                    .to_string(),
95                function: types::read_string(buf)
96                    .map_err(|err| make_error_field_err("FUNCTION_FAILURE", "FUNCTION", err))?
97                    .to_string(),
98                arg_types: types::read_string_list(buf)
99                    .map_err(|err| make_error_field_err("FUNCTION_FAILURE", "ARG_TYPES", err))?,
100            },
101            0x1500 => DbError::WriteFailure {
102                consistency: types::read_consistency(buf)
103                    .map_err(|err| make_error_field_err("WRITE_FAILURE", "CONSISTENCY", err))?,
104                received: types::read_int(buf)
105                    .map_err(|err| make_error_field_err("WRITE_FAILURE", "RECEIVED", err))?,
106                required: types::read_int(buf)
107                    .map_err(|err| make_error_field_err("WRITE_FAILURE", "REQUIRED", err))?,
108                numfailures: types::read_int(buf)
109                    .map_err(|err| make_error_field_err("WRITE_FAILURE", "NUM_FAILURES", err))?,
110                write_type: WriteType::from(
111                    types::read_string(buf)
112                        .map_err(|err| make_error_field_err("WRITE_FAILURE", "WRITE_TYPE", err))?,
113                ),
114            },
115            0x2000 => DbError::SyntaxError,
116            0x2100 => DbError::Unauthorized,
117            0x2200 => DbError::Invalid,
118            0x2300 => DbError::ConfigError,
119            0x2400 => DbError::AlreadyExists {
120                keyspace: types::read_string(buf)
121                    .map_err(|err| make_error_field_err("ALREADY_EXISTS", "KEYSPACE", err))?
122                    .to_string(),
123                table: types::read_string(buf)
124                    .map_err(|err| make_error_field_err("ALREADY_EXISTS", "TABLE", err))?
125                    .to_string(),
126            },
127            0x2500 => DbError::Unprepared {
128                statement_id: Bytes::from(
129                    types::read_short_bytes(buf)
130                        .map_err(|err| make_error_field_err("UNPREPARED", "STATEMENT_ID", err))?
131                        .to_owned(),
132                ),
133            },
134            code if Some(code) == features.rate_limit_error => {
135                DbError::RateLimitReached {
136                    op_type: OperationType::from(buf.read_u8().map_err(|err| {
137                        make_error_field_err("RATE_LIMIT_REACHED", "OP_TYPE", err)
138                    })?),
139                    rejected_by_coordinator: buf.read_u8().map_err(|err| {
140                        make_error_field_err("RATE_LIMIT_REACHED", "REJECTED_BY_COORDINATOR", err)
141                    })? != 0,
142                }
143            }
144            _ => DbError::Other(code),
145        };
146
147        Ok(Error { error, reason })
148    }
149}
150
151/// An error sent from the database in response to a query
152/// as described in the [specification](https://github.com/apache/cassandra/blob/5ed5e84613ef0e9664a774493db7d2604e3596e0/doc/native_protocol_v4.spec#L1029)\
153#[derive(Error, Debug, Clone, PartialEq, Eq)]
154#[non_exhaustive]
155pub enum DbError {
156    /// The submitted query has a syntax error
157    #[error("The submitted query has a syntax error")]
158    SyntaxError,
159
160    /// The query is syntactically correct but invalid
161    #[error("The query is syntactically correct but invalid")]
162    Invalid,
163
164    /// Attempted to create a keyspace or a table that was already existing
165    #[error(
166        "Attempted to create a keyspace or a table that was already existing \
167        (keyspace: {keyspace}, table: {table})"
168    )]
169    AlreadyExists {
170        /// Created keyspace name or name of the keyspace in which table was created
171        keyspace: String,
172        /// Name of the table created, in case of keyspace creation it's an empty string
173        table: String,
174    },
175
176    /// User defined function failed during execution
177    #[error(
178        "User defined function failed during execution \
179        (keyspace: {keyspace}, function: {function}, arg_types: {arg_types:?})"
180    )]
181    FunctionFailure {
182        /// Keyspace of the failed function
183        keyspace: String,
184        /// Name of the failed function
185        function: String,
186        /// Types of arguments passed to the function
187        arg_types: Vec<String>,
188    },
189
190    /// Authentication failed - bad credentials
191    #[error("Authentication failed - bad credentials")]
192    AuthenticationError,
193
194    /// The logged user doesn't have the right to perform the query
195    #[error("The logged user doesn't have the right to perform the query")]
196    Unauthorized,
197
198    /// The query is invalid because of some configuration issue
199    #[error("The query is invalid because of some configuration issue")]
200    ConfigError,
201
202    /// Not enough nodes are alive to satisfy required consistency level
203    #[error(
204        "Not enough nodes are alive to satisfy required consistency level \
205        (consistency: {consistency}, required: {required}, alive: {alive})"
206    )]
207    Unavailable {
208        /// Consistency level of the query
209        consistency: Consistency,
210        /// Number of nodes required to be alive to satisfy required consistency level
211        required: i32,
212        /// Found number of active nodes
213        alive: i32,
214    },
215
216    /// The request cannot be processed because the coordinator node is overloaded
217    #[error("The request cannot be processed because the coordinator node is overloaded")]
218    Overloaded,
219
220    /// The coordinator node is still bootstrapping
221    #[error("The coordinator node is still bootstrapping")]
222    IsBootstrapping,
223
224    /// Error during truncate operation
225    #[error("Error during truncate operation")]
226    TruncateError,
227
228    /// Not enough nodes responded to the read request in time to satisfy required consistency level
229    #[error("Not enough nodes responded to the read request in time to satisfy required consistency level \
230            (consistency: {consistency}, received: {received}, required: {required}, data_present: {data_present})")]
231    ReadTimeout {
232        /// Consistency level of the query
233        consistency: Consistency,
234        /// Number of nodes that responded to the read request
235        received: i32,
236        /// Number of nodes required to respond to satisfy required consistency level
237        required: i32,
238        /// Replica that was asked for data has responded
239        data_present: bool,
240    },
241
242    /// Not enough nodes responded to the write request in time to satisfy required consistency level
243    #[error("Not enough nodes responded to the write request in time to satisfy required consistency level \
244            (consistency: {consistency}, received: {received}, required: {required}, write_type: {write_type})")]
245    WriteTimeout {
246        /// Consistency level of the query
247        consistency: Consistency,
248        /// Number of nodes that responded to the write request
249        received: i32,
250        /// Number of nodes required to respond to satisfy required consistency level
251        required: i32,
252        /// Type of write operation requested
253        write_type: WriteType,
254    },
255
256    /// A non-timeout error during a read request
257    #[error(
258        "A non-timeout error during a read request \
259        (consistency: {consistency}, received: {received}, required: {required}, \
260        numfailures: {numfailures}, data_present: {data_present})"
261    )]
262    ReadFailure {
263        /// Consistency level of the query
264        consistency: Consistency,
265        /// Number of nodes that responded to the read request
266        received: i32,
267        /// Number of nodes required to respond to satisfy required consistency level
268        required: i32,
269        /// Number of nodes that experience a failure while executing the request
270        numfailures: i32,
271        /// Replica that was asked for data has responded
272        data_present: bool,
273    },
274
275    /// A non-timeout error during a write request
276    #[error(
277        "A non-timeout error during a write request \
278        (consistency: {consistency}, received: {received}, required: {required}, \
279        numfailures: {numfailures}, write_type: {write_type}"
280    )]
281    WriteFailure {
282        /// Consistency level of the query
283        consistency: Consistency,
284        /// Number of nodes that responded to the read request
285        received: i32,
286        /// Number of nodes required to respond to satisfy required consistency level
287        required: i32,
288        /// Number of nodes that experience a failure while executing the request
289        numfailures: i32,
290        /// Type of write operation requested
291        write_type: WriteType,
292    },
293
294    /// Tried to execute a prepared statement that is not prepared. Driver should prepare it again
295    #[error(
296        "Tried to execute a prepared statement that is not prepared. Driver should prepare it again"
297    )]
298    Unprepared {
299        /// Statement id of the requested prepared query
300        statement_id: Bytes,
301    },
302
303    /// Internal server error. This indicates a server-side bug
304    #[error("Internal server error. This indicates a server-side bug")]
305    ServerError,
306
307    /// Invalid protocol message received from the driver
308    #[error("Invalid protocol message received from the driver")]
309    ProtocolError,
310
311    /// Rate limit was exceeded for a partition affected by the request.
312    /// (Scylla-specific)
313    /// TODO: Should this have a "Scylla" prefix?
314    #[error("Rate limit was exceeded for a partition affected by the request")]
315    RateLimitReached {
316        /// Type of the operation rejected by rate limiting.
317        op_type: OperationType,
318        /// Whether the operation was rate limited on the coordinator or not.
319        /// Writes rejected on the coordinator are guaranteed not to be applied
320        /// on any replica.
321        rejected_by_coordinator: bool,
322    },
323
324    /// Other error code not specified in the specification
325    #[error("Other error not specified in the specification. Error code: {0}")]
326    Other(i32),
327}
328
329impl DbError {
330    pub fn code(&self, protocol_features: &ProtocolFeatures) -> i32 {
331        match self {
332            DbError::ServerError => 0x0000,
333            DbError::ProtocolError => 0x000A,
334            DbError::AuthenticationError => 0x0100,
335            DbError::Unavailable {
336                consistency: _,
337                required: _,
338                alive: _,
339            } => 0x1000,
340            DbError::Overloaded => 0x1001,
341            DbError::IsBootstrapping => 0x1002,
342            DbError::TruncateError => 0x1003,
343            DbError::WriteTimeout {
344                consistency: _,
345                received: _,
346                required: _,
347                write_type: _,
348            } => 0x1100,
349            DbError::ReadTimeout {
350                consistency: _,
351                received: _,
352                required: _,
353                data_present: _,
354            } => 0x1200,
355            DbError::ReadFailure {
356                consistency: _,
357                received: _,
358                required: _,
359                numfailures: _,
360                data_present: _,
361            } => 0x1300,
362            DbError::FunctionFailure {
363                keyspace: _,
364                function: _,
365                arg_types: _,
366            } => 0x1400,
367            DbError::WriteFailure {
368                consistency: _,
369                received: _,
370                required: _,
371                numfailures: _,
372                write_type: _,
373            } => 0x1500,
374            DbError::SyntaxError => 0x2000,
375            DbError::Unauthorized => 0x2100,
376            DbError::Invalid => 0x2200,
377            DbError::ConfigError => 0x2300,
378            DbError::AlreadyExists {
379                keyspace: _,
380                table: _,
381            } => 0x2400,
382            DbError::Unprepared { statement_id: _ } => 0x2500,
383            DbError::Other(code) => *code,
384            DbError::RateLimitReached {
385                op_type: _,
386                rejected_by_coordinator: _,
387            } => protocol_features.rate_limit_error.unwrap(),
388        }
389    }
390
391    /// Decides whether the error can be ignored. If true, the driver can perform
392    /// a speculative retry to the next target.
393    pub fn can_speculative_retry(&self) -> bool {
394        // Do not remove this lint!
395        // It's there for a reason - we don't want new variants
396        // automatically fall under `_` pattern when they are introduced.
397        #[deny(clippy::wildcard_enum_match_arm)]
398        match self {
399            // Errors that will almost certainly appear on other nodes as well
400            DbError::SyntaxError
401            | DbError::Invalid
402            | DbError::AlreadyExists { .. }
403            | DbError::Unauthorized
404            | DbError::ProtocolError => false,
405
406            // Errors that should not appear there - thus, should not be ignored.
407            DbError::AuthenticationError | DbError::Other(_) => false,
408
409            // For now, let's assume that UDF failure is not transient - don't ignore it
410            // TODO: investigate
411            DbError::FunctionFailure { .. } => false,
412
413            // Not sure when these can appear - don't ignore them
414            // TODO: Investigate these errors
415            DbError::ConfigError | DbError::TruncateError => false,
416
417            // Errors that we can ignore and perform a retry on some other node
418            DbError::Unavailable { .. }
419            | DbError::Overloaded
420            | DbError::IsBootstrapping
421            | DbError::ReadTimeout { .. }
422            | DbError::WriteTimeout { .. }
423            | DbError::ReadFailure { .. }
424            | DbError::WriteFailure { .. }
425            // Preparation may succeed on some other node.
426            | DbError::Unprepared { .. }
427            | DbError::ServerError
428            | DbError::RateLimitReached { .. } => true,
429        }
430    }
431}
432
433/// Type of the operation rejected by rate limiting
434#[derive(Debug, Clone, PartialEq, Eq)]
435pub enum OperationType {
436    Read,
437    Write,
438    Other(u8),
439}
440
441/// Type of write operation requested
442#[derive(Debug, Clone, PartialEq, Eq)]
443pub enum WriteType {
444    /// Non-batched non-counter write
445    Simple,
446    /// Logged batch write. If this type is received, it means the batch log has been successfully written
447    /// (otherwise BatchLog type would be present)
448    Batch,
449    /// Unlogged batch. No batch log write has been attempted.
450    UnloggedBatch,
451    /// Counter write (batched or not)
452    Counter,
453    /// Timeout occurred during the write to the batch log when a logged batch was requested
454    BatchLog,
455    /// Timeout occurred during Compare And Set write/update
456    Cas,
457    /// Write involves VIEW update and failure to acquire local view(MV) lock for key within timeout
458    View,
459    /// Timeout occurred  when a cdc_total_space_in_mb is exceeded when doing a write to data tracked by cdc
460    Cdc,
461    /// Other type not specified in the specification
462    Other(String),
463}
464
465impl std::fmt::Display for WriteType {
466    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
467        write!(f, "{:?}", self)
468    }
469}
470
471impl From<u8> for OperationType {
472    fn from(operation_type: u8) -> OperationType {
473        match operation_type {
474            0 => OperationType::Read,
475            1 => OperationType::Write,
476            other => OperationType::Other(other),
477        }
478    }
479}
480
481impl From<&str> for WriteType {
482    fn from(write_type_str: &str) -> WriteType {
483        match write_type_str {
484            "SIMPLE" => WriteType::Simple,
485            "BATCH" => WriteType::Batch,
486            "UNLOGGED_BATCH" => WriteType::UnloggedBatch,
487            "COUNTER" => WriteType::Counter,
488            "BATCH_LOG" => WriteType::BatchLog,
489            "CAS" => WriteType::Cas,
490            "VIEW" => WriteType::View,
491            "CDC" => WriteType::Cdc,
492            _ => WriteType::Other(write_type_str.to_string()),
493        }
494    }
495}
496
497impl WriteType {
498    pub fn as_str(&self) -> &str {
499        match self {
500            WriteType::Simple => "SIMPLE",
501            WriteType::Batch => "BATCH",
502            WriteType::UnloggedBatch => "UNLOGGED_BATCH",
503            WriteType::Counter => "COUNTER",
504            WriteType::BatchLog => "BATCH_LOG",
505            WriteType::Cas => "CAS",
506            WriteType::View => "VIEW",
507            WriteType::Cdc => "CDC",
508            WriteType::Other(write_type) => write_type.as_str(),
509        }
510    }
511}
512
513#[cfg(test)]
514mod tests {
515    use super::{DbError, Error, OperationType, WriteType};
516    use crate::frame::protocol_features::ProtocolFeatures;
517    use crate::Consistency;
518    use bytes::Bytes;
519    use std::convert::TryInto;
520
521    // Serializes the beginning of an ERROR response - error code and message
522    // All custom data depending on the error type is appended after these bytes
523    fn make_error_request_bytes(error_code: i32, message: &str) -> Vec<u8> {
524        let mut bytes: Vec<u8> = Vec::new();
525        let message_len: u16 = message.len().try_into().unwrap();
526
527        bytes.extend(error_code.to_be_bytes());
528        bytes.extend(message_len.to_be_bytes());
529        bytes.extend(message.as_bytes());
530
531        bytes
532    }
533
534    // Tests deserialization of all errors without and additional data
535    #[test]
536    fn deserialize_simple_errors() {
537        let simple_error_mappings: [(i32, DbError); 11] = [
538            (0x0000, DbError::ServerError),
539            (0x000A, DbError::ProtocolError),
540            (0x0100, DbError::AuthenticationError),
541            (0x1001, DbError::Overloaded),
542            (0x1002, DbError::IsBootstrapping),
543            (0x1003, DbError::TruncateError),
544            (0x2000, DbError::SyntaxError),
545            (0x2100, DbError::Unauthorized),
546            (0x2200, DbError::Invalid),
547            (0x2300, DbError::ConfigError),
548            (0x1234, DbError::Other(0x1234)),
549        ];
550
551        let features = ProtocolFeatures::default();
552
553        for (error_code, expected_error) in &simple_error_mappings {
554            let bytes: Vec<u8> = make_error_request_bytes(*error_code, "simple message");
555            let error: Error = Error::deserialize(&features, &mut bytes.as_slice()).unwrap();
556            assert_eq!(error.error, *expected_error);
557            assert_eq!(error.reason, "simple message");
558        }
559    }
560
561    #[test]
562    fn deserialize_unavailable() {
563        let features = ProtocolFeatures::default();
564
565        let mut bytes = make_error_request_bytes(0x1000, "message 2");
566        bytes.extend(1_i16.to_be_bytes());
567        bytes.extend(2_i32.to_be_bytes());
568        bytes.extend(3_i32.to_be_bytes());
569
570        let error: Error = Error::deserialize(&features, &mut bytes.as_slice()).unwrap();
571
572        assert_eq!(
573            error.error,
574            DbError::Unavailable {
575                consistency: Consistency::One,
576                required: 2,
577                alive: 3,
578            }
579        );
580        assert_eq!(error.reason, "message 2");
581    }
582
583    #[test]
584    fn deserialize_write_timeout() {
585        let features = ProtocolFeatures::default();
586
587        let mut bytes = make_error_request_bytes(0x1100, "message 2");
588        bytes.extend(0x0004_i16.to_be_bytes());
589        bytes.extend((-5_i32).to_be_bytes());
590        bytes.extend(100_i32.to_be_bytes());
591
592        let write_type_str = "SIMPLE";
593        let write_type_str_len: u16 = write_type_str.len().try_into().unwrap();
594        bytes.extend(write_type_str_len.to_be_bytes());
595        bytes.extend(write_type_str.as_bytes());
596
597        let error: Error = Error::deserialize(&features, &mut bytes.as_slice()).unwrap();
598
599        assert_eq!(
600            error.error,
601            DbError::WriteTimeout {
602                consistency: Consistency::Quorum,
603                received: -5, // Allow negative values when they don't make sense, it's better than crashing with ProtocolError
604                required: 100,
605                write_type: WriteType::Simple,
606            }
607        );
608        assert_eq!(error.reason, "message 2");
609    }
610
611    #[test]
612    fn deserialize_read_timeout() {
613        let features = ProtocolFeatures::default();
614
615        let mut bytes = make_error_request_bytes(0x1200, "message 2");
616        bytes.extend(0x0002_i16.to_be_bytes());
617        bytes.extend(8_i32.to_be_bytes());
618        bytes.extend(32_i32.to_be_bytes());
619        bytes.push(0_u8);
620
621        let error: Error = Error::deserialize(&features, &mut bytes.as_slice()).unwrap();
622
623        assert_eq!(
624            error.error,
625            DbError::ReadTimeout {
626                consistency: Consistency::Two,
627                received: 8,
628                required: 32,
629                data_present: false,
630            }
631        );
632        assert_eq!(error.reason, "message 2");
633    }
634
635    #[test]
636    fn deserialize_read_failure() {
637        let features = ProtocolFeatures::default();
638
639        let mut bytes = make_error_request_bytes(0x1300, "message 2");
640        bytes.extend(0x0003_i16.to_be_bytes());
641        bytes.extend(4_i32.to_be_bytes());
642        bytes.extend(5_i32.to_be_bytes());
643        bytes.extend(6_i32.to_be_bytes());
644        bytes.push(123_u8); // Any non-zero value means data_present is true
645
646        let error: Error = Error::deserialize(&features, &mut bytes.as_slice()).unwrap();
647
648        assert_eq!(
649            error.error,
650            DbError::ReadFailure {
651                consistency: Consistency::Three,
652                received: 4,
653                required: 5,
654                numfailures: 6,
655                data_present: true,
656            }
657        );
658        assert_eq!(error.reason, "message 2");
659    }
660
661    #[test]
662    fn deserialize_function_failure() {
663        let features = ProtocolFeatures::default();
664
665        let mut bytes = make_error_request_bytes(0x1400, "message 2");
666
667        let keyspace_name: &str = "keyspace_name";
668        let keyspace_name_len: u16 = keyspace_name.len().try_into().unwrap();
669
670        let function_name: &str = "function_name";
671        let function_name_len: u16 = function_name.len().try_into().unwrap();
672
673        let type1: &str = "type1";
674        let type1_len: u16 = type1.len().try_into().unwrap();
675
676        let type2: &str = "type2";
677        let type2_len: u16 = type1.len().try_into().unwrap();
678
679        bytes.extend(keyspace_name_len.to_be_bytes());
680        bytes.extend(keyspace_name.as_bytes());
681        bytes.extend(function_name_len.to_be_bytes());
682        bytes.extend(function_name.as_bytes());
683        bytes.extend(2_i16.to_be_bytes());
684        bytes.extend(type1_len.to_be_bytes());
685        bytes.extend(type1.as_bytes());
686        bytes.extend(type2_len.to_be_bytes());
687        bytes.extend(type2.as_bytes());
688
689        let error: Error = Error::deserialize(&features, &mut bytes.as_slice()).unwrap();
690
691        assert_eq!(
692            error.error,
693            DbError::FunctionFailure {
694                keyspace: "keyspace_name".to_string(),
695                function: "function_name".to_string(),
696                arg_types: vec!["type1".to_string(), "type2".to_string()]
697            }
698        );
699        assert_eq!(error.reason, "message 2");
700    }
701
702    #[test]
703    fn deserialize_write_failure() {
704        let features = ProtocolFeatures::default();
705
706        let mut bytes = make_error_request_bytes(0x1500, "message 2");
707
708        bytes.extend(0x0000_i16.to_be_bytes());
709        bytes.extend(2_i32.to_be_bytes());
710        bytes.extend(4_i32.to_be_bytes());
711        bytes.extend(8_i32.to_be_bytes());
712
713        let write_type_str = "COUNTER";
714        let write_type_str_len: u16 = write_type_str.len().try_into().unwrap();
715        bytes.extend(write_type_str_len.to_be_bytes());
716        bytes.extend(write_type_str.as_bytes());
717
718        let error: Error = Error::deserialize(&features, &mut bytes.as_slice()).unwrap();
719
720        assert_eq!(
721            error.error,
722            DbError::WriteFailure {
723                consistency: Consistency::Any,
724                received: 2,
725                required: 4,
726                numfailures: 8,
727                write_type: WriteType::Counter,
728            }
729        );
730        assert_eq!(error.reason, "message 2");
731    }
732
733    #[test]
734    fn deserialize_already_exists() {
735        let features = ProtocolFeatures::default();
736
737        let mut bytes = make_error_request_bytes(0x2400, "message 2");
738
739        let keyspace_name: &str = "keyspace_name";
740        let keyspace_name_len: u16 = keyspace_name.len().try_into().unwrap();
741
742        let table_name: &str = "table_name";
743        let table_name_len: u16 = table_name.len().try_into().unwrap();
744
745        bytes.extend(keyspace_name_len.to_be_bytes());
746        bytes.extend(keyspace_name.as_bytes());
747        bytes.extend(table_name_len.to_be_bytes());
748        bytes.extend(table_name.as_bytes());
749
750        let error: Error = Error::deserialize(&features, &mut bytes.as_slice()).unwrap();
751
752        assert_eq!(
753            error.error,
754            DbError::AlreadyExists {
755                keyspace: "keyspace_name".to_string(),
756                table: "table_name".to_string(),
757            }
758        );
759        assert_eq!(error.reason, "message 2");
760    }
761
762    #[test]
763    fn deserialize_unprepared() {
764        let features = ProtocolFeatures::default();
765
766        let mut bytes = make_error_request_bytes(0x2500, "message 3");
767        let statement_id = b"deadbeef";
768        bytes.extend((statement_id.len() as i16).to_be_bytes());
769        bytes.extend(statement_id);
770
771        let error: Error = Error::deserialize(&features, &mut bytes.as_slice()).unwrap();
772
773        assert_eq!(
774            error.error,
775            DbError::Unprepared {
776                statement_id: Bytes::from_static(b"deadbeef")
777            }
778        );
779        assert_eq!(error.reason, "message 3");
780    }
781
782    #[test]
783    fn deserialize_rate_limit_error() {
784        let features = ProtocolFeatures {
785            rate_limit_error: Some(0x4321),
786            ..Default::default()
787        };
788        let mut bytes = make_error_request_bytes(0x4321, "message 1");
789        bytes.extend([0u8]); // Read type
790        bytes.extend([1u8]); // Rejected by coordinator
791        let error = Error::deserialize(&features, &mut bytes.as_slice()).unwrap();
792
793        assert_eq!(
794            error.error,
795            DbError::RateLimitReached {
796                op_type: OperationType::Read,
797                rejected_by_coordinator: true,
798            }
799        );
800        assert_eq!(error.reason, "message 1");
801
802        let features = ProtocolFeatures {
803            rate_limit_error: Some(0x8765),
804            ..Default::default()
805        };
806        let mut bytes = make_error_request_bytes(0x8765, "message 2");
807        bytes.extend([1u8]); // Write type
808        bytes.extend([0u8]); // Not rejected by coordinator
809        let error = Error::deserialize(&features, &mut bytes.as_slice()).unwrap();
810
811        assert_eq!(
812            error.error,
813            DbError::RateLimitReached {
814                op_type: OperationType::Write,
815                rejected_by_coordinator: false,
816            }
817        );
818        assert_eq!(error.reason, "message 2");
819    }
820}