scylla_cql/frame/request/
mod.rs1pub mod auth_response;
4pub mod batch;
5pub mod execute;
6pub mod options;
7pub mod prepare;
8pub mod query;
9pub mod register;
10pub mod startup;
11
12use batch::BatchTypeParseError;
13use thiserror::Error;
14
15use crate::serialize::row::SerializedValues;
16use crate::Consistency;
17use bytes::Bytes;
18
19pub use auth_response::AuthResponse;
20pub use batch::Batch;
21pub use execute::Execute;
22pub use options::Options;
23pub use prepare::Prepare;
24pub use query::Query;
25pub use startup::Startup;
26
27use self::batch::BatchStatement;
28
29use super::frame_errors::{CqlRequestSerializationError, LowLevelDeserializationError};
30use super::types::SerialConsistency;
31use super::TryFromPrimitiveError;
32
33#[derive(Debug, Copy, Clone)]
37#[non_exhaustive]
38pub enum CqlRequestKind {
39 Startup,
48
49 AuthResponse,
61
62 Options,
65
66 Query,
70
71 Prepare,
74
75 Execute,
78
79 Batch,
83
84 Register,
87}
88
89impl std::fmt::Display for CqlRequestKind {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 let kind_str = match self {
92 CqlRequestKind::Startup => "STARTUP",
93 CqlRequestKind::AuthResponse => "AUTH_RESPONSE",
94 CqlRequestKind::Options => "OPTIONS",
95 CqlRequestKind::Query => "QUERY",
96 CqlRequestKind::Prepare => "PREPARE",
97 CqlRequestKind::Execute => "EXECUTE",
98 CqlRequestKind::Batch => "BATCH",
99 CqlRequestKind::Register => "REGISTER",
100 };
101
102 f.write_str(kind_str)
103 }
104}
105
106#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
108#[repr(u8)]
109pub enum RequestOpcode {
110 Startup = 0x01,
112 Options = 0x05,
114 Query = 0x07,
116 Prepare = 0x09,
118 Execute = 0x0A,
120 Register = 0x0B,
122 Batch = 0x0D,
124 AuthResponse = 0x0F,
126}
127
128impl TryFrom<u8> for RequestOpcode {
129 type Error = TryFromPrimitiveError<u8>;
130
131 fn try_from(value: u8) -> Result<Self, Self::Error> {
132 match value {
133 0x01 => Ok(Self::Startup),
134 0x05 => Ok(Self::Options),
135 0x07 => Ok(Self::Query),
136 0x09 => Ok(Self::Prepare),
137 0x0A => Ok(Self::Execute),
138 0x0B => Ok(Self::Register),
139 0x0D => Ok(Self::Batch),
140 0x0F => Ok(Self::AuthResponse),
141 _ => Err(TryFromPrimitiveError {
142 enum_name: "RequestOpcode",
143 primitive: value,
144 }),
145 }
146 }
147}
148
149pub trait SerializableRequest {
151 const OPCODE: RequestOpcode;
153
154 fn serialize(&self, buf: &mut Vec<u8>) -> Result<(), CqlRequestSerializationError>;
156
157 fn to_bytes(&self) -> Result<Bytes, CqlRequestSerializationError> {
159 let mut v = Vec::new();
160 self.serialize(&mut v)?;
161 Ok(v.into())
162 }
163}
164
165pub trait DeserializableRequest: SerializableRequest + Sized {
170 fn deserialize(buf: &mut &[u8]) -> Result<Self, RequestDeserializationError>;
172}
173
174#[doc(hidden)]
178#[derive(Debug, Error)]
179pub enum RequestDeserializationError {
180 #[error("Low level deser error: {0}")]
181 LowLevelDeserialization(#[from] LowLevelDeserializationError),
182 #[error("Io error: {0}")]
183 IoError(#[from] std::io::Error),
184 #[error("Specified flags are not recognised: {:02x}", flags)]
185 UnknownFlags { flags: u8 },
186 #[error("Named values in frame are currently unsupported")]
187 NamedValuesUnsupported,
188 #[error("Expected SerialConsistency, got regular Consistency: {0}")]
189 ExpectedSerialConsistency(Consistency),
190 #[error(transparent)]
191 BatchTypeParse(#[from] BatchTypeParseError),
192 #[error("Unexpected batch statement kind: {0}")]
193 UnexpectedBatchStatementKind(u8),
194}
195
196#[non_exhaustive] pub enum Request<'r> {
199 Query(Query<'r>),
201 Execute(Execute<'r>),
203 Batch(Batch<'r, BatchStatement<'r>, Vec<SerializedValues>>),
206}
207
208impl Request<'_> {
209 pub fn deserialize(
211 buf: &mut &[u8],
212 opcode: RequestOpcode,
213 ) -> Result<Self, RequestDeserializationError> {
214 match opcode {
215 RequestOpcode::Query => Query::deserialize(buf).map(Self::Query),
216 RequestOpcode::Execute => Execute::deserialize(buf).map(Self::Execute),
217 RequestOpcode::Batch => Batch::deserialize(buf).map(Self::Batch),
218 _ => unimplemented!(
219 "Deserialization of opcode {:?} is not yet supported",
220 opcode
221 ),
222 }
223 }
224
225 pub fn get_consistency(&self) -> Option<Consistency> {
227 match self {
228 Request::Query(q) => Some(q.parameters.consistency),
229 Request::Execute(e) => Some(e.parameters.consistency),
230 Request::Batch(b) => Some(b.consistency),
231 #[expect(unreachable_patterns)] _ => None,
233 }
234 }
235
236 pub fn get_serial_consistency(&self) -> Option<Option<SerialConsistency>> {
238 match self {
239 Request::Query(q) => Some(q.parameters.serial_consistency),
240 Request::Execute(e) => Some(e.parameters.serial_consistency),
241 Request::Batch(b) => Some(b.serial_consistency),
242 #[expect(unreachable_patterns)] _ => None,
244 }
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use std::{borrow::Cow, ops::Deref};
251
252 use bytes::Bytes;
253
254 use crate::serialize::row::SerializedValues;
255 use crate::{
256 frame::{
257 request::{
258 batch::{Batch, BatchStatement, BatchType},
259 execute::Execute,
260 query::{Query, QueryParameters},
261 DeserializableRequest, SerializableRequest,
262 },
263 response::result::{ColumnType, NativeType},
264 types::{self, SerialConsistency},
265 },
266 Consistency,
267 };
268
269 use super::query::PagingState;
270
271 #[test]
272 fn request_ser_de_identity() {
273 let contents = Cow::Borrowed("SELECT host_id from system.peers");
275 let parameters = QueryParameters {
276 consistency: Consistency::All,
277 serial_consistency: Some(SerialConsistency::Serial),
278 timestamp: None,
279 page_size: Some(323),
280 paging_state: PagingState::new_from_raw_bytes(&[2_u8, 1, 3, 7] as &[u8]),
281 skip_metadata: false,
282 values: {
283 let mut vals = SerializedValues::new();
284 vals.add_value(&2137, &ColumnType::Native(NativeType::Int))
285 .unwrap();
286 Cow::Owned(vals)
287 },
288 };
289 let query = Query {
290 contents,
291 parameters,
292 };
293
294 {
295 let mut buf = Vec::new();
296 query.serialize(&mut buf).unwrap();
297
298 let query_deserialized = Query::deserialize(&mut &buf[..]).unwrap();
299 assert_eq!(&query_deserialized, &query);
300 }
301
302 let id: Bytes = vec![2, 4, 5, 2, 6, 7, 3, 1].into();
304 let parameters = QueryParameters {
305 consistency: Consistency::Any,
306 serial_consistency: None,
307 timestamp: Some(3423434),
308 page_size: None,
309 paging_state: PagingState::start(),
310 skip_metadata: false,
311 values: {
312 let mut vals = SerializedValues::new();
313 vals.add_value(&42, &ColumnType::Native(NativeType::Int))
314 .unwrap();
315 vals.add_value(&2137, &ColumnType::Native(NativeType::Int))
316 .unwrap();
317 Cow::Owned(vals)
318 },
319 };
320 let execute = Execute { id, parameters };
321 {
322 let mut buf = Vec::new();
323 execute.serialize(&mut buf).unwrap();
324
325 let execute_deserialized = Execute::deserialize(&mut &buf[..]).unwrap();
326 assert_eq!(&execute_deserialized, &execute);
327 }
328
329 let statements = vec![
331 BatchStatement::Query {
332 text: query.contents,
333 },
334 BatchStatement::Prepared {
335 id: Cow::Borrowed(&execute.id),
336 },
337 ];
338 let batch = Batch {
339 statements: Cow::Owned(statements),
340 batch_type: BatchType::Logged,
341 consistency: Consistency::EachQuorum,
342 serial_consistency: Some(SerialConsistency::LocalSerial),
343 timestamp: Some(32432),
344
345 values: vec![
347 query.parameters.values.deref().clone(),
348 query.parameters.values.deref().clone(),
349 ],
350 };
351 {
352 let mut buf = Vec::new();
353 batch.serialize(&mut buf).unwrap();
354
355 let batch_deserialized = Batch::deserialize(&mut &buf[..]).unwrap();
356 assert_eq!(&batch_deserialized, &batch);
357 }
358 }
359
360 #[test]
361 fn deser_rejects_unknown_flags() {
362 let contents = Cow::Borrowed("SELECT host_id from system.peers");
364 let parameters = QueryParameters {
365 consistency: Default::default(),
366 serial_consistency: Some(SerialConsistency::LocalSerial),
367 timestamp: None,
368 page_size: None,
369 paging_state: PagingState::start(),
370 skip_metadata: false,
371 values: Cow::Borrowed(SerializedValues::EMPTY),
372 };
373 let query = Query {
374 contents: contents.clone(),
375 parameters,
376 };
377
378 {
379 let mut buf = Vec::new();
380 query.serialize(&mut buf).unwrap();
381
382 let query_deserialized = Query::deserialize(&mut &buf[..]).unwrap();
384 assert_eq!(&query_deserialized.contents, &query.contents);
385 assert_eq!(&query_deserialized.parameters, &query.parameters);
386
387 let mut buf_ptr = buf.as_slice();
390 let serialised_contents = types::read_long_string(&mut buf_ptr).unwrap();
391 assert_eq!(serialised_contents, contents);
392
393 let consistency = types::read_consistency(&mut buf_ptr).unwrap();
395 assert_eq!(consistency, Consistency::default());
396
397 let flags_idx = buf.len() - buf_ptr.len();
399 let flags_mut = &mut buf[flags_idx];
400
401 *flags_mut |= 0x80;
403
404 let _parse_error = Query::deserialize(&mut &buf[..]).unwrap_err();
407 }
408
409 let statements = vec![BatchStatement::Query {
411 text: query.contents,
412 }];
413 let batch = Batch {
414 statements: Cow::Owned(statements),
415 batch_type: BatchType::Logged,
416 consistency: Consistency::EachQuorum,
417 serial_consistency: None,
418 timestamp: None,
419
420 values: vec![query.parameters.values.deref().clone()],
421 };
422 {
423 let mut buf = Vec::new();
424 batch.serialize(&mut buf).unwrap();
425
426 let batch_deserialized = Batch::deserialize(&mut &buf[..]).unwrap();
428 assert_eq!(batch, batch_deserialized);
429
430 let buf_len = buf.len();
433 let flags_mut = &mut buf[buf_len - 1];
434 *flags_mut |= 0x80;
436
437 let _parse_error = Batch::deserialize(&mut &buf[..]).unwrap_err();
440 }
441 }
442}