scylla_cql/frame/request/
query.rs

1use std::{borrow::Cow, num::TryFromIntError, ops::ControlFlow, sync::Arc};
2
3use crate::frame::{frame_errors::CqlRequestSerializationError, types::SerialConsistency};
4use crate::serialize::row::SerializedValues;
5use bytes::{Buf, BufMut};
6use thiserror::Error;
7
8use crate::{
9    frame::request::{RequestOpcode, SerializableRequest},
10    frame::types,
11};
12
13use super::{DeserializableRequest, RequestDeserializationError};
14
15// Query flags
16const FLAG_VALUES: u8 = 0x01;
17const FLAG_SKIP_METADATA: u8 = 0x02;
18const FLAG_PAGE_SIZE: u8 = 0x04;
19const FLAG_WITH_PAGING_STATE: u8 = 0x08;
20const FLAG_WITH_SERIAL_CONSISTENCY: u8 = 0x10;
21const FLAG_WITH_DEFAULT_TIMESTAMP: u8 = 0x20;
22const FLAG_WITH_NAMES_FOR_VALUES: u8 = 0x40;
23const ALL_FLAGS: u8 = FLAG_VALUES
24    | FLAG_SKIP_METADATA
25    | FLAG_PAGE_SIZE
26    | FLAG_WITH_PAGING_STATE
27    | FLAG_WITH_SERIAL_CONSISTENCY
28    | FLAG_WITH_DEFAULT_TIMESTAMP
29    | FLAG_WITH_NAMES_FOR_VALUES;
30
31#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
32pub struct Query<'q> {
33    pub contents: Cow<'q, str>,
34    pub parameters: QueryParameters<'q>,
35}
36
37impl SerializableRequest for Query<'_> {
38    const OPCODE: RequestOpcode = RequestOpcode::Query;
39
40    fn serialize(&self, buf: &mut Vec<u8>) -> Result<(), CqlRequestSerializationError> {
41        types::write_long_string(&self.contents, buf)
42            .map_err(QuerySerializationError::StatementStringSerialization)?;
43        self.parameters
44            .serialize(buf)
45            .map_err(QuerySerializationError::QueryParametersSerialization)?;
46        Ok(())
47    }
48}
49
50impl DeserializableRequest for Query<'_> {
51    fn deserialize(buf: &mut &[u8]) -> Result<Self, RequestDeserializationError> {
52        let contents = Cow::Owned(types::read_long_string(buf)?.to_owned());
53        let parameters = QueryParameters::deserialize(buf)?;
54
55        Ok(Self {
56            contents,
57            parameters,
58        })
59    }
60}
61
62#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
63pub struct QueryParameters<'a> {
64    pub consistency: types::Consistency,
65    pub serial_consistency: Option<types::SerialConsistency>,
66    pub timestamp: Option<i64>,
67    pub page_size: Option<i32>,
68    pub paging_state: PagingState,
69    pub skip_metadata: bool,
70    pub values: Cow<'a, SerializedValues>,
71}
72
73impl Default for QueryParameters<'_> {
74    fn default() -> Self {
75        Self {
76            consistency: Default::default(),
77            serial_consistency: None,
78            timestamp: None,
79            page_size: None,
80            paging_state: PagingState::start(),
81            skip_metadata: false,
82            values: Cow::Borrowed(SerializedValues::EMPTY),
83        }
84    }
85}
86
87impl QueryParameters<'_> {
88    pub fn serialize(
89        &self,
90        buf: &mut impl BufMut,
91    ) -> Result<(), QueryParametersSerializationError> {
92        types::write_consistency(self.consistency, buf);
93
94        let paging_state_bytes = self.paging_state.as_bytes_slice();
95
96        let mut flags = 0;
97        if !self.values.is_empty() {
98            flags |= FLAG_VALUES;
99        }
100
101        if self.skip_metadata {
102            flags |= FLAG_SKIP_METADATA;
103        }
104
105        if self.page_size.is_some() {
106            flags |= FLAG_PAGE_SIZE;
107        }
108
109        if paging_state_bytes.is_some() {
110            flags |= FLAG_WITH_PAGING_STATE;
111        }
112
113        if self.serial_consistency.is_some() {
114            flags |= FLAG_WITH_SERIAL_CONSISTENCY;
115        }
116
117        if self.timestamp.is_some() {
118            flags |= FLAG_WITH_DEFAULT_TIMESTAMP;
119        }
120
121        buf.put_u8(flags);
122
123        if !self.values.is_empty() {
124            self.values.write_to_request(buf);
125        }
126
127        if let Some(page_size) = self.page_size {
128            types::write_int(page_size, buf);
129        }
130
131        if let Some(paging_state_bytes) = paging_state_bytes {
132            types::write_bytes(paging_state_bytes, buf)?;
133        }
134
135        if let Some(serial_consistency) = self.serial_consistency {
136            types::write_serial_consistency(serial_consistency, buf);
137        }
138
139        if let Some(timestamp) = self.timestamp {
140            types::write_long(timestamp, buf);
141        }
142
143        Ok(())
144    }
145}
146
147impl QueryParameters<'_> {
148    pub fn deserialize(buf: &mut &[u8]) -> Result<Self, RequestDeserializationError> {
149        let consistency = types::read_consistency(buf)?;
150
151        let flags = buf.get_u8();
152        let unknown_flags = flags & (!ALL_FLAGS);
153        if unknown_flags != 0 {
154            return Err(RequestDeserializationError::UnknownFlags {
155                flags: unknown_flags,
156            });
157        }
158        let values_flag = (flags & FLAG_VALUES) != 0;
159        let skip_metadata = (flags & FLAG_SKIP_METADATA) != 0;
160        let page_size_flag = (flags & FLAG_PAGE_SIZE) != 0;
161        let paging_state_flag = (flags & FLAG_WITH_PAGING_STATE) != 0;
162        let serial_consistency_flag = (flags & FLAG_WITH_SERIAL_CONSISTENCY) != 0;
163        let default_timestamp_flag = (flags & FLAG_WITH_DEFAULT_TIMESTAMP) != 0;
164        let values_have_names_flag = (flags & FLAG_WITH_NAMES_FOR_VALUES) != 0;
165
166        if values_have_names_flag {
167            return Err(RequestDeserializationError::NamedValuesUnsupported);
168        }
169
170        let values = Cow::Owned(if values_flag {
171            SerializedValues::new_from_frame(buf)?
172        } else {
173            SerializedValues::new()
174        });
175
176        let page_size = page_size_flag.then(|| types::read_int(buf)).transpose()?;
177        let paging_state = if paging_state_flag {
178            PagingState::new_from_raw_bytes(types::read_bytes(buf)?)
179        } else {
180            PagingState::start()
181        };
182        let serial_consistency = serial_consistency_flag
183            .then(|| types::read_consistency(buf))
184            .transpose()?
185            .map(
186                |consistency| match SerialConsistency::try_from(consistency) {
187                    Ok(serial_consistency) => Ok(serial_consistency),
188                    Err(_) => Err(RequestDeserializationError::ExpectedSerialConsistency(
189                        consistency,
190                    )),
191                },
192            )
193            .transpose()?;
194        let timestamp = if default_timestamp_flag {
195            Some(types::read_long(buf)?)
196        } else {
197            None
198        };
199
200        Ok(Self {
201            consistency,
202            serial_consistency,
203            timestamp,
204            page_size,
205            paging_state,
206            skip_metadata,
207            values,
208        })
209    }
210}
211
212#[derive(Debug, Clone)]
213pub enum PagingStateResponse {
214    HasMorePages { state: PagingState },
215    NoMorePages,
216}
217
218impl PagingStateResponse {
219    /// Determines if the query has finished or it should be resumed with given
220    /// [PagingState] in order to fetch next pages.
221    #[inline]
222    pub fn finished(&self) -> bool {
223        matches!(*self, Self::NoMorePages)
224    }
225
226    pub(crate) fn new_from_raw_bytes(raw_paging_state: Option<&[u8]>) -> Self {
227        match raw_paging_state {
228            Some(raw_bytes) => Self::HasMorePages {
229                state: PagingState::new_from_raw_bytes(raw_bytes),
230            },
231            None => Self::NoMorePages,
232        }
233    }
234
235    /// Converts the response into [ControlFlow], signalling whether the query has finished
236    /// or it should be resumed with given [PagingState] in order to fetch next pages.
237    #[inline]
238    pub fn into_paging_control_flow(self) -> ControlFlow<(), PagingState> {
239        match self {
240            Self::HasMorePages {
241                state: next_page_handle,
242            } => ControlFlow::Continue(next_page_handle),
243            Self::NoMorePages => ControlFlow::Break(()),
244        }
245    }
246}
247
248/// The state of a paged query, i.e. where to resume fetching result rows
249/// upon next request.
250///
251/// Cheaply clonable.
252#[derive(Debug, Clone, PartialEq, Eq)]
253pub struct PagingState(Option<Arc<[u8]>>);
254
255impl PagingState {
256    /// A start state - the state of a not-yet-started paged query.
257    #[inline]
258    pub fn start() -> Self {
259        Self(None)
260    }
261
262    /// Returns the inner representation of [PagingState].
263    /// One can use this to store paging state for a longer time,
264    /// and later restore it using [Self::new_from_raw_bytes].
265    /// In case None is returned, this signifies
266    /// [PagingState::start()] being underneath.
267    #[inline]
268    pub fn as_bytes_slice(&self) -> Option<&Arc<[u8]>> {
269        self.0.as_ref()
270    }
271
272    /// Creates PagingState from its inner representation.
273    /// One can use this to restore paging state after longer time,
274    /// having previously stored it using [Self::as_bytes_slice].
275    #[inline]
276    pub fn new_from_raw_bytes(raw_bytes: impl Into<Arc<[u8]>>) -> Self {
277        Self(Some(raw_bytes.into()))
278    }
279}
280
281impl Default for PagingState {
282    fn default() -> Self {
283        Self::start()
284    }
285}
286
287/// An error type returned when serialization of QUERY request fails.
288#[non_exhaustive]
289#[derive(Error, Debug, Clone)]
290pub enum QuerySerializationError {
291    /// Failed to serialize query parameters.
292    #[error("Invalid query parameters: {0}")]
293    QueryParametersSerialization(QueryParametersSerializationError),
294
295    /// Failed to serialize the CQL statement string.
296    #[error("Failed to serialize a statement content: {0}")]
297    StatementStringSerialization(TryFromIntError),
298}
299
300/// An error type returned when serialization of query parameters fails.
301#[non_exhaustive]
302#[derive(Error, Debug, Clone)]
303pub enum QueryParametersSerializationError {
304    /// Failed to serialize paging state.
305    #[error("Malformed paging state: {0}")]
306    BadPagingState(#[from] TryFromIntError),
307}