scylla_cql/frame/request/
mod.rs

1//! CQL requests sent by the client.
2
3pub mod auth_response;
4pub mod batch;
5pub mod execute;
6pub mod options;
7pub mod prepare;
8pub mod query;
9pub mod register;
10pub mod startup;
11
12use batch::BatchTypeParseError;
13use thiserror::Error;
14
15use crate::serialize::row::SerializedValues;
16use crate::Consistency;
17use bytes::Bytes;
18
19pub use auth_response::AuthResponse;
20pub use batch::Batch;
21pub use execute::Execute;
22pub use options::Options;
23pub use prepare::Prepare;
24pub use query::Query;
25pub use startup::Startup;
26
27use self::batch::BatchStatement;
28
29use super::frame_errors::{CqlRequestSerializationError, LowLevelDeserializationError};
30use super::types::SerialConsistency;
31use super::TryFromPrimitiveError;
32
33/// Possible requests sent by the client.
34// Why is it distinct from [RequestOpcode]?
35// TODO(2.0): merge this with `RequestOpcode`.
36#[derive(Debug, Copy, Clone)]
37#[non_exhaustive]
38pub enum CqlRequestKind {
39    /// Initialize the connection. The server will respond by either a READY message
40    /// (in which case the connection is ready for queries) or an AUTHENTICATE message
41    /// (in which case credentials will need to be provided using AUTH_RESPONSE).
42    ///
43    /// This must be the first message of the connection, except for OPTIONS that can
44    /// be sent before to find out the options supported by the server. Once the
45    /// connection has been initialized, a client should not send any more STARTUP
46    /// messages.
47    Startup,
48
49    /// Answers a server authentication challenge.
50
51    /// Authentication in the protocol is SASL based. The server sends authentication
52    /// challenges (a bytes token) to which the client answers with this message. Those
53    /// exchanges continue until the server accepts the authentication by sending a
54    /// AUTH_SUCCESS message after a client AUTH_RESPONSE. Note that the exchange
55    /// begins with the client sending an initial AUTH_RESPONSE in response to a
56    /// server AUTHENTICATE request.
57    ///
58    /// The response to a AUTH_RESPONSE is either a follow-up AUTH_CHALLENGE message,
59    /// an AUTH_SUCCESS message or an ERROR message.
60    AuthResponse,
61
62    /// Asks the server to return which STARTUP options are supported. The server
63    /// will respond with a SUPPORTED message.
64    Options,
65
66    /// Performs a CQL query, i.e., executes an unprepared statement.
67    /// The server will respond to a QUERY message with a RESULT message, the content
68    /// of which depends on the query.
69    Query,
70
71    /// Prepares a query for later execution (through EXECUTE).
72    /// The server will respond with a RESULT::Prepared message.
73    Prepare,
74
75    /// Executes a prepared query.
76    /// The response from the server will be a RESULT message.
77    Execute,
78
79    /// Allows executing a list of queries (prepared or not) as a batch (note that
80    /// only DML statements are accepted in a batch).
81    /// The server will respond with a RESULT message.
82    Batch,
83
84    /// Register this connection to receive some types of events.
85    /// The response to a REGISTER message will be a READY message.
86    Register,
87}
88
89impl std::fmt::Display for CqlRequestKind {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        let kind_str = match self {
92            CqlRequestKind::Startup => "STARTUP",
93            CqlRequestKind::AuthResponse => "AUTH_RESPONSE",
94            CqlRequestKind::Options => "OPTIONS",
95            CqlRequestKind::Query => "QUERY",
96            CqlRequestKind::Prepare => "PREPARE",
97            CqlRequestKind::Execute => "EXECUTE",
98            CqlRequestKind::Batch => "BATCH",
99            CqlRequestKind::Register => "REGISTER",
100        };
101
102        f.write_str(kind_str)
103    }
104}
105
106/// Opcode of a request, used to identify the request type in a CQL frame.
107#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
108#[repr(u8)]
109pub enum RequestOpcode {
110    /// See [CqlRequestKind::Startup].
111    Startup = 0x01,
112    /// See [CqlRequestKind::Options].
113    Options = 0x05,
114    /// See [CqlRequestKind::Query].
115    Query = 0x07,
116    /// See [CqlRequestKind::Prepare].
117    Prepare = 0x09,
118    /// See [CqlRequestKind::Execute].
119    Execute = 0x0A,
120    /// See [CqlRequestKind::Register].
121    Register = 0x0B,
122    /// See [CqlRequestKind::Batch].
123    Batch = 0x0D,
124    /// See [CqlRequestKind::AuthResponse].
125    AuthResponse = 0x0F,
126}
127
128impl TryFrom<u8> for RequestOpcode {
129    type Error = TryFromPrimitiveError<u8>;
130
131    fn try_from(value: u8) -> Result<Self, Self::Error> {
132        match value {
133            0x01 => Ok(Self::Startup),
134            0x05 => Ok(Self::Options),
135            0x07 => Ok(Self::Query),
136            0x09 => Ok(Self::Prepare),
137            0x0A => Ok(Self::Execute),
138            0x0B => Ok(Self::Register),
139            0x0D => Ok(Self::Batch),
140            0x0F => Ok(Self::AuthResponse),
141            _ => Err(TryFromPrimitiveError {
142                enum_name: "RequestOpcode",
143                primitive: value,
144            }),
145        }
146    }
147}
148
149/// Requests that can be serialized into a CQL frame.
150pub trait SerializableRequest {
151    /// Opcode of the request, used to identify the request type in the CQL frame.
152    const OPCODE: RequestOpcode;
153
154    /// Serializes the request into the provided buffer.
155    fn serialize(&self, buf: &mut Vec<u8>) -> Result<(), CqlRequestSerializationError>;
156
157    /// Serializes the request into a heap-allocated `Bytes` object.
158    fn to_bytes(&self) -> Result<Bytes, CqlRequestSerializationError> {
159        let mut v = Vec::new();
160        self.serialize(&mut v)?;
161        Ok(v.into())
162    }
163}
164
165/// Requests that can be deserialized from a CQL frame.
166///
167/// Not intended for driver's direct usage (as driver has no interest in deserialising CQL requests),
168/// but very useful for testing (e.g. asserting that the sent requests have proper parameters set).
169pub trait DeserializableRequest: SerializableRequest + Sized {
170    /// Deserializes the request from the provided buffer.
171    fn deserialize(buf: &mut &[u8]) -> Result<Self, RequestDeserializationError>;
172}
173
174/// An error type returned by [`DeserializableRequest::deserialize`].
175/// This is not intended for driver's direct usage. It's a testing utility,
176/// mainly used by `scylla-proxy` crate.
177#[doc(hidden)]
178#[derive(Debug, Error)]
179pub enum RequestDeserializationError {
180    #[error("Low level deser error: {0}")]
181    LowLevelDeserialization(#[from] LowLevelDeserializationError),
182    #[error("Io error: {0}")]
183    IoError(#[from] std::io::Error),
184    #[error("Specified flags are not recognised: {:02x}", flags)]
185    UnknownFlags { flags: u8 },
186    #[error("Named values in frame are currently unsupported")]
187    NamedValuesUnsupported,
188    #[error("Expected SerialConsistency, got regular Consistency: {0}")]
189    ExpectedSerialConsistency(Consistency),
190    #[error(transparent)]
191    BatchTypeParse(#[from] BatchTypeParseError),
192    #[error("Unexpected batch statement kind: {0}")]
193    UnexpectedBatchStatementKind(u8),
194}
195
196/// A CQL request that can be sent to the server.
197#[non_exhaustive] // TODO: add remaining request types
198pub enum Request<'r> {
199    /// QUERY request, used to execute a single unprepared statement.
200    Query(Query<'r>),
201    /// EXECUTE request, used to execute a single prepared statement.
202    Execute(Execute<'r>),
203    /// BATCH request, used to execute a batch of (prepared, unprepared, or mix of both)
204    /// statements.
205    Batch(Batch<'r, BatchStatement<'r>, Vec<SerializedValues>>),
206}
207
208impl Request<'_> {
209    /// Deserializes the request from the provided buffer.
210    pub fn deserialize(
211        buf: &mut &[u8],
212        opcode: RequestOpcode,
213    ) -> Result<Self, RequestDeserializationError> {
214        match opcode {
215            RequestOpcode::Query => Query::deserialize(buf).map(Self::Query),
216            RequestOpcode::Execute => Execute::deserialize(buf).map(Self::Execute),
217            RequestOpcode::Batch => Batch::deserialize(buf).map(Self::Batch),
218            _ => unimplemented!(
219                "Deserialization of opcode {:?} is not yet supported",
220                opcode
221            ),
222        }
223    }
224
225    /// Retrieves consistency from request frame, if present.
226    pub fn get_consistency(&self) -> Option<Consistency> {
227        match self {
228            Request::Query(q) => Some(q.parameters.consistency),
229            Request::Execute(e) => Some(e.parameters.consistency),
230            Request::Batch(b) => Some(b.consistency),
231            #[expect(unreachable_patterns)] // until other opcodes are supported
232            _ => None,
233        }
234    }
235
236    /// Retrieves serial consistency from request frame.
237    pub fn get_serial_consistency(&self) -> Option<Option<SerialConsistency>> {
238        match self {
239            Request::Query(q) => Some(q.parameters.serial_consistency),
240            Request::Execute(e) => Some(e.parameters.serial_consistency),
241            Request::Batch(b) => Some(b.serial_consistency),
242            #[expect(unreachable_patterns)] // until other opcodes are supported
243            _ => None,
244        }
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use std::{borrow::Cow, ops::Deref};
251
252    use bytes::Bytes;
253
254    use crate::serialize::row::SerializedValues;
255    use crate::{
256        frame::{
257            request::{
258                batch::{Batch, BatchStatement, BatchType},
259                execute::Execute,
260                query::{Query, QueryParameters},
261                DeserializableRequest, SerializableRequest,
262            },
263            response::result::{ColumnType, NativeType},
264            types::{self, SerialConsistency},
265        },
266        Consistency,
267    };
268
269    use super::query::PagingState;
270
271    #[test]
272    fn request_ser_de_identity() {
273        // Query
274        let contents = Cow::Borrowed("SELECT host_id from system.peers");
275        let parameters = QueryParameters {
276            consistency: Consistency::All,
277            serial_consistency: Some(SerialConsistency::Serial),
278            timestamp: None,
279            page_size: Some(323),
280            paging_state: PagingState::new_from_raw_bytes(&[2_u8, 1, 3, 7] as &[u8]),
281            skip_metadata: false,
282            values: {
283                let mut vals = SerializedValues::new();
284                vals.add_value(&2137, &ColumnType::Native(NativeType::Int))
285                    .unwrap();
286                Cow::Owned(vals)
287            },
288        };
289        let query = Query {
290            contents,
291            parameters,
292        };
293
294        {
295            let mut buf = Vec::new();
296            query.serialize(&mut buf).unwrap();
297
298            let query_deserialized = Query::deserialize(&mut &buf[..]).unwrap();
299            assert_eq!(&query_deserialized, &query);
300        }
301
302        // Execute
303        let id: Bytes = vec![2, 4, 5, 2, 6, 7, 3, 1].into();
304        let parameters = QueryParameters {
305            consistency: Consistency::Any,
306            serial_consistency: None,
307            timestamp: Some(3423434),
308            page_size: None,
309            paging_state: PagingState::start(),
310            skip_metadata: false,
311            values: {
312                let mut vals = SerializedValues::new();
313                vals.add_value(&42, &ColumnType::Native(NativeType::Int))
314                    .unwrap();
315                vals.add_value(&2137, &ColumnType::Native(NativeType::Int))
316                    .unwrap();
317                Cow::Owned(vals)
318            },
319        };
320        let execute = Execute { id, parameters };
321        {
322            let mut buf = Vec::new();
323            execute.serialize(&mut buf).unwrap();
324
325            let execute_deserialized = Execute::deserialize(&mut &buf[..]).unwrap();
326            assert_eq!(&execute_deserialized, &execute);
327        }
328
329        // Batch
330        let statements = vec![
331            BatchStatement::Query {
332                text: query.contents,
333            },
334            BatchStatement::Prepared {
335                id: Cow::Borrowed(&execute.id),
336            },
337        ];
338        let batch = Batch {
339            statements: Cow::Owned(statements),
340            batch_type: BatchType::Logged,
341            consistency: Consistency::EachQuorum,
342            serial_consistency: Some(SerialConsistency::LocalSerial),
343            timestamp: Some(32432),
344
345            // Not execute's values, because named values are not supported in batches.
346            values: vec![
347                query.parameters.values.deref().clone(),
348                query.parameters.values.deref().clone(),
349            ],
350        };
351        {
352            let mut buf = Vec::new();
353            batch.serialize(&mut buf).unwrap();
354
355            let batch_deserialized = Batch::deserialize(&mut &buf[..]).unwrap();
356            assert_eq!(&batch_deserialized, &batch);
357        }
358    }
359
360    #[test]
361    fn deser_rejects_unknown_flags() {
362        // Query
363        let contents = Cow::Borrowed("SELECT host_id from system.peers");
364        let parameters = QueryParameters {
365            consistency: Default::default(),
366            serial_consistency: Some(SerialConsistency::LocalSerial),
367            timestamp: None,
368            page_size: None,
369            paging_state: PagingState::start(),
370            skip_metadata: false,
371            values: Cow::Borrowed(SerializedValues::EMPTY),
372        };
373        let query = Query {
374            contents: contents.clone(),
375            parameters,
376        };
377
378        {
379            let mut buf = Vec::new();
380            query.serialize(&mut buf).unwrap();
381
382            // Sanity check: query deserializes to the equivalent.
383            let query_deserialized = Query::deserialize(&mut &buf[..]).unwrap();
384            assert_eq!(&query_deserialized.contents, &query.contents);
385            assert_eq!(&query_deserialized.parameters, &query.parameters);
386
387            // Now modify flags by adding an unknown one.
388            // Find flags in buffer:
389            let mut buf_ptr = buf.as_slice();
390            let serialised_contents = types::read_long_string(&mut buf_ptr).unwrap();
391            assert_eq!(serialised_contents, contents);
392
393            // Now buf_ptr points at consistency.
394            let consistency = types::read_consistency(&mut buf_ptr).unwrap();
395            assert_eq!(consistency, Consistency::default());
396
397            // Now buf_ptr points at flags, but it is immutable. Get mutable reference into the buffer.
398            let flags_idx = buf.len() - buf_ptr.len();
399            let flags_mut = &mut buf[flags_idx];
400
401            // This assumes that the following flag is unknown, which is true at the time of writing this test.
402            *flags_mut |= 0x80;
403
404            // Unknown flag should lead to frame rejection, as unknown flags can be new protocol extensions
405            // leading to different semantics.
406            let _parse_error = Query::deserialize(&mut &buf[..]).unwrap_err();
407        }
408
409        // Batch
410        let statements = vec![BatchStatement::Query {
411            text: query.contents,
412        }];
413        let batch = Batch {
414            statements: Cow::Owned(statements),
415            batch_type: BatchType::Logged,
416            consistency: Consistency::EachQuorum,
417            serial_consistency: None,
418            timestamp: None,
419
420            values: vec![query.parameters.values.deref().clone()],
421        };
422        {
423            let mut buf = Vec::new();
424            batch.serialize(&mut buf).unwrap();
425
426            // Sanity check: batch deserializes to the equivalent.
427            let batch_deserialized = Batch::deserialize(&mut &buf[..]).unwrap();
428            assert_eq!(batch, batch_deserialized);
429
430            // Now modify flags by adding an unknown one.
431            // There are no timestamp nor serial consistency, so flags are the last byte in the buf.
432            let buf_len = buf.len();
433            let flags_mut = &mut buf[buf_len - 1];
434            // This assumes that the following flag is unknown, which is true at the time of writing this test.
435            *flags_mut |= 0x80;
436
437            // Unknown flag should lead to frame rejection, as unknown flags can be new protocol extensions
438            // leading to different semantics.
439            let _parse_error = Batch::deserialize(&mut &buf[..]).unwrap_err();
440        }
441    }
442}