scylla_cql/frame/request/
query.rs1use std::{borrow::Cow, num::TryFromIntError, ops::ControlFlow, sync::Arc};
4
5use crate::frame::{frame_errors::CqlRequestSerializationError, types::SerialConsistency};
6use crate::serialize::row::SerializedValues;
7use bytes::{Buf, BufMut};
8use thiserror::Error;
9
10use crate::{
11 frame::request::{RequestOpcode, SerializableRequest},
12 frame::types,
13};
14
15use super::{DeserializableRequest, RequestDeserializationError};
16
17const FLAG_VALUES: u8 = 0x01;
19const FLAG_SKIP_METADATA: u8 = 0x02;
20const FLAG_PAGE_SIZE: u8 = 0x04;
21const FLAG_WITH_PAGING_STATE: u8 = 0x08;
22const FLAG_WITH_SERIAL_CONSISTENCY: u8 = 0x10;
23const FLAG_WITH_DEFAULT_TIMESTAMP: u8 = 0x20;
24const FLAG_WITH_NAMES_FOR_VALUES: u8 = 0x40;
25const ALL_FLAGS: u8 = FLAG_VALUES
26 | FLAG_SKIP_METADATA
27 | FLAG_PAGE_SIZE
28 | FLAG_WITH_PAGING_STATE
29 | FLAG_WITH_SERIAL_CONSISTENCY
30 | FLAG_WITH_DEFAULT_TIMESTAMP
31 | FLAG_WITH_NAMES_FOR_VALUES;
32
33#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
36pub struct Query<'q> {
37 pub contents: Cow<'q, str>,
39
40 pub parameters: QueryParameters<'q>,
42}
43
44impl SerializableRequest for Query<'_> {
45 const OPCODE: RequestOpcode = RequestOpcode::Query;
46
47 fn serialize(&self, buf: &mut Vec<u8>) -> Result<(), CqlRequestSerializationError> {
48 types::write_long_string(&self.contents, buf)
49 .map_err(QuerySerializationError::StatementStringSerialization)?;
50 self.parameters
51 .serialize(buf)
52 .map_err(QuerySerializationError::QueryParametersSerialization)?;
53 Ok(())
54 }
55}
56
57impl DeserializableRequest for Query<'_> {
58 fn deserialize(buf: &mut &[u8]) -> Result<Self, RequestDeserializationError> {
59 let contents = Cow::Owned(types::read_long_string(buf)?.to_owned());
60 let parameters = QueryParameters::deserialize(buf)?;
61
62 Ok(Self {
63 contents,
64 parameters,
65 })
66 }
67}
68
69#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
71pub struct QueryParameters<'a> {
72 pub consistency: types::Consistency,
74
75 pub serial_consistency: Option<types::SerialConsistency>,
77
78 pub timestamp: Option<i64>,
80
81 pub page_size: Option<i32>,
87
88 pub paging_state: PagingState,
91
92 pub skip_metadata: bool,
97
98 pub values: Cow<'a, SerializedValues>,
100}
101
102impl Default for QueryParameters<'_> {
103 fn default() -> Self {
104 Self {
105 consistency: Default::default(),
106 serial_consistency: None,
107 timestamp: None,
108 page_size: None,
109 paging_state: PagingState::start(),
110 skip_metadata: false,
111 values: Cow::Borrowed(SerializedValues::EMPTY),
112 }
113 }
114}
115
116impl QueryParameters<'_> {
117 pub fn serialize(
119 &self,
120 buf: &mut impl BufMut,
121 ) -> Result<(), QueryParametersSerializationError> {
122 types::write_consistency(self.consistency, buf);
123
124 let paging_state_bytes = self.paging_state.as_bytes_slice();
125
126 let mut flags = 0;
127 if !self.values.is_empty() {
128 flags |= FLAG_VALUES;
129 }
130
131 if self.skip_metadata {
132 flags |= FLAG_SKIP_METADATA;
133 }
134
135 if self.page_size.is_some() {
136 flags |= FLAG_PAGE_SIZE;
137 }
138
139 if paging_state_bytes.is_some() {
140 flags |= FLAG_WITH_PAGING_STATE;
141 }
142
143 if self.serial_consistency.is_some() {
144 flags |= FLAG_WITH_SERIAL_CONSISTENCY;
145 }
146
147 if self.timestamp.is_some() {
148 flags |= FLAG_WITH_DEFAULT_TIMESTAMP;
149 }
150
151 buf.put_u8(flags);
152
153 if !self.values.is_empty() {
154 self.values.write_to_request(buf);
155 }
156
157 if let Some(page_size) = self.page_size {
158 types::write_int(page_size, buf);
159 }
160
161 if let Some(paging_state_bytes) = paging_state_bytes {
162 types::write_bytes(paging_state_bytes, buf)?;
163 }
164
165 if let Some(serial_consistency) = self.serial_consistency {
166 types::write_serial_consistency(serial_consistency, buf);
167 }
168
169 if let Some(timestamp) = self.timestamp {
170 types::write_long(timestamp, buf);
171 }
172
173 Ok(())
174 }
175}
176
177impl QueryParameters<'_> {
178 pub fn deserialize(buf: &mut &[u8]) -> Result<Self, RequestDeserializationError> {
180 let consistency = types::read_consistency(buf)?;
181
182 let flags = buf.get_u8();
183 let unknown_flags = flags & (!ALL_FLAGS);
184 if unknown_flags != 0 {
185 return Err(RequestDeserializationError::UnknownFlags {
186 flags: unknown_flags,
187 });
188 }
189 let values_flag = (flags & FLAG_VALUES) != 0;
190 let skip_metadata = (flags & FLAG_SKIP_METADATA) != 0;
191 let page_size_flag = (flags & FLAG_PAGE_SIZE) != 0;
192 let paging_state_flag = (flags & FLAG_WITH_PAGING_STATE) != 0;
193 let serial_consistency_flag = (flags & FLAG_WITH_SERIAL_CONSISTENCY) != 0;
194 let default_timestamp_flag = (flags & FLAG_WITH_DEFAULT_TIMESTAMP) != 0;
195 let values_have_names_flag = (flags & FLAG_WITH_NAMES_FOR_VALUES) != 0;
196
197 if values_have_names_flag {
198 return Err(RequestDeserializationError::NamedValuesUnsupported);
199 }
200
201 let values = Cow::Owned(if values_flag {
202 SerializedValues::new_from_frame(buf)?
203 } else {
204 SerializedValues::new()
205 });
206
207 let page_size = page_size_flag.then(|| types::read_int(buf)).transpose()?;
208 let paging_state = if paging_state_flag {
209 PagingState::new_from_raw_bytes(types::read_bytes(buf)?)
210 } else {
211 PagingState::start()
212 };
213 let serial_consistency = serial_consistency_flag
214 .then(|| types::read_consistency(buf))
215 .transpose()?
216 .map(
217 |consistency| match SerialConsistency::try_from(consistency) {
218 Ok(serial_consistency) => Ok(serial_consistency),
219 Err(_) => Err(RequestDeserializationError::ExpectedSerialConsistency(
220 consistency,
221 )),
222 },
223 )
224 .transpose()?;
225 let timestamp = if default_timestamp_flag {
226 Some(types::read_long(buf)?)
227 } else {
228 None
229 };
230
231 Ok(Self {
232 consistency,
233 serial_consistency,
234 timestamp,
235 page_size,
236 paging_state,
237 skip_metadata,
238 values,
239 })
240 }
241}
242
243#[derive(Debug, Clone)]
247pub enum PagingStateResponse {
248 HasMorePages {
251 state: PagingState,
253 },
254
255 NoMorePages,
257}
258
259impl PagingStateResponse {
260 #[inline]
263 pub fn finished(&self) -> bool {
264 matches!(*self, Self::NoMorePages)
265 }
266
267 pub(crate) fn new_from_raw_bytes(raw_paging_state: Option<&[u8]>) -> Self {
268 match raw_paging_state {
269 Some(raw_bytes) => Self::HasMorePages {
270 state: PagingState::new_from_raw_bytes(raw_bytes),
271 },
272 None => Self::NoMorePages,
273 }
274 }
275
276 #[inline]
279 pub fn into_paging_control_flow(self) -> ControlFlow<(), PagingState> {
280 match self {
281 Self::HasMorePages {
282 state: next_page_handle,
283 } => ControlFlow::Continue(next_page_handle),
284 Self::NoMorePages => ControlFlow::Break(()),
285 }
286 }
287}
288
289#[derive(Debug, Clone, PartialEq, Eq)]
294pub struct PagingState(Option<Arc<[u8]>>);
295
296impl PagingState {
297 #[inline]
299 pub fn start() -> Self {
300 Self(None)
301 }
302
303 #[inline]
309 pub fn as_bytes_slice(&self) -> Option<&Arc<[u8]>> {
310 self.0.as_ref()
311 }
312
313 #[inline]
317 pub fn new_from_raw_bytes(raw_bytes: impl Into<Arc<[u8]>>) -> Self {
318 Self(Some(raw_bytes.into()))
319 }
320}
321
322impl Default for PagingState {
323 fn default() -> Self {
324 Self::start()
325 }
326}
327
328#[non_exhaustive]
330#[derive(Error, Debug, Clone)]
331pub enum QuerySerializationError {
332 #[error("Invalid query parameters: {0}")]
334 QueryParametersSerialization(QueryParametersSerializationError),
335
336 #[error("Failed to serialize a statement content: {0}")]
338 StatementStringSerialization(TryFromIntError),
339}
340
341#[non_exhaustive]
343#[derive(Error, Debug, Clone)]
344pub enum QueryParametersSerializationError {
345 #[error("Malformed paging state: {0}")]
347 BadPagingState(#[from] TryFromIntError),
348}