scylla_cql/frame/request/
query.rs

1//! CQL protocol-level representation of a `QUERY` request.
2
3use std::{borrow::Cow, num::TryFromIntError, ops::ControlFlow, sync::Arc};
4
5use crate::frame::{frame_errors::CqlRequestSerializationError, types::SerialConsistency};
6use crate::serialize::row::SerializedValues;
7use bytes::{Buf, BufMut};
8use thiserror::Error;
9
10use crate::{
11    frame::request::{RequestOpcode, SerializableRequest},
12    frame::types,
13};
14
15use super::{DeserializableRequest, RequestDeserializationError};
16
17// Query flags
18const FLAG_VALUES: u8 = 0x01;
19const FLAG_SKIP_METADATA: u8 = 0x02;
20const FLAG_PAGE_SIZE: u8 = 0x04;
21const FLAG_WITH_PAGING_STATE: u8 = 0x08;
22const FLAG_WITH_SERIAL_CONSISTENCY: u8 = 0x10;
23const FLAG_WITH_DEFAULT_TIMESTAMP: u8 = 0x20;
24const FLAG_WITH_NAMES_FOR_VALUES: u8 = 0x40;
25const ALL_FLAGS: u8 = FLAG_VALUES
26    | FLAG_SKIP_METADATA
27    | FLAG_PAGE_SIZE
28    | FLAG_WITH_PAGING_STATE
29    | FLAG_WITH_SERIAL_CONSISTENCY
30    | FLAG_WITH_DEFAULT_TIMESTAMP
31    | FLAG_WITH_NAMES_FOR_VALUES;
32
33/// CQL protocol-level representation of an `QUERY` request,
34/// used to execute a single unprepared statement.
35#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
36pub struct Query<'q> {
37    /// CQL statement string to execute.
38    pub contents: Cow<'q, str>,
39
40    /// Various parameters controlling the execution of the statement.
41    pub parameters: QueryParameters<'q>,
42}
43
44impl SerializableRequest for Query<'_> {
45    const OPCODE: RequestOpcode = RequestOpcode::Query;
46
47    fn serialize(&self, buf: &mut Vec<u8>) -> Result<(), CqlRequestSerializationError> {
48        types::write_long_string(&self.contents, buf)
49            .map_err(QuerySerializationError::StatementStringSerialization)?;
50        self.parameters
51            .serialize(buf)
52            .map_err(QuerySerializationError::QueryParametersSerialization)?;
53        Ok(())
54    }
55}
56
57impl DeserializableRequest for Query<'_> {
58    fn deserialize(buf: &mut &[u8]) -> Result<Self, RequestDeserializationError> {
59        let contents = Cow::Owned(types::read_long_string(buf)?.to_owned());
60        let parameters = QueryParameters::deserialize(buf)?;
61
62        Ok(Self {
63            contents,
64            parameters,
65        })
66    }
67}
68
69/// Various parameters controlling the execution of the statement.
70#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
71pub struct QueryParameters<'a> {
72    /// Consistency level for the query.
73    pub consistency: types::Consistency,
74
75    /// Serial consistency level for the query, if specified.
76    pub serial_consistency: Option<types::SerialConsistency>,
77
78    /// Client-side-assigned timestamp for the query, if specified.
79    pub timestamp: Option<i64>,
80
81    /// Maximum number of rows to return for the query, if specified.
82    /// If not specified, the server will not page the result, and instead return all rows
83    /// in a single response. This is not recommended for large result sets, as it puts
84    /// a lot of pressure on the server and network, as well as brings high memory usage
85    /// on both client and server sides.
86    pub page_size: Option<i32>,
87
88    /// Paging state for the query, used to resume fetching results from a previous query.
89    /// If empty paging state is provided, the query will start from the beginning.
90    pub paging_state: PagingState,
91
92    /// Whether to skip metadata for the values in the result set.
93    /// This is an optimisation that can be used when the client does not need
94    /// the metadata for the values, because it has already been fetched upon
95    /// preparing the statement.
96    pub skip_metadata: bool,
97
98    /// Values bound to the statements.
99    pub values: Cow<'a, SerializedValues>,
100}
101
102impl Default for QueryParameters<'_> {
103    fn default() -> Self {
104        Self {
105            consistency: Default::default(),
106            serial_consistency: None,
107            timestamp: None,
108            page_size: None,
109            paging_state: PagingState::start(),
110            skip_metadata: false,
111            values: Cow::Borrowed(SerializedValues::EMPTY),
112        }
113    }
114}
115
116impl QueryParameters<'_> {
117    /// Serializes the parameters into the provided buffer.
118    pub fn serialize(
119        &self,
120        buf: &mut impl BufMut,
121    ) -> Result<(), QueryParametersSerializationError> {
122        types::write_consistency(self.consistency, buf);
123
124        let paging_state_bytes = self.paging_state.as_bytes_slice();
125
126        let mut flags = 0;
127        if !self.values.is_empty() {
128            flags |= FLAG_VALUES;
129        }
130
131        if self.skip_metadata {
132            flags |= FLAG_SKIP_METADATA;
133        }
134
135        if self.page_size.is_some() {
136            flags |= FLAG_PAGE_SIZE;
137        }
138
139        if paging_state_bytes.is_some() {
140            flags |= FLAG_WITH_PAGING_STATE;
141        }
142
143        if self.serial_consistency.is_some() {
144            flags |= FLAG_WITH_SERIAL_CONSISTENCY;
145        }
146
147        if self.timestamp.is_some() {
148            flags |= FLAG_WITH_DEFAULT_TIMESTAMP;
149        }
150
151        buf.put_u8(flags);
152
153        if !self.values.is_empty() {
154            self.values.write_to_request(buf);
155        }
156
157        if let Some(page_size) = self.page_size {
158            types::write_int(page_size, buf);
159        }
160
161        if let Some(paging_state_bytes) = paging_state_bytes {
162            types::write_bytes(paging_state_bytes, buf)?;
163        }
164
165        if let Some(serial_consistency) = self.serial_consistency {
166            types::write_serial_consistency(serial_consistency, buf);
167        }
168
169        if let Some(timestamp) = self.timestamp {
170            types::write_long(timestamp, buf);
171        }
172
173        Ok(())
174    }
175}
176
177impl QueryParameters<'_> {
178    /// Deserializes the parameters from the provided buffer.
179    pub fn deserialize(buf: &mut &[u8]) -> Result<Self, RequestDeserializationError> {
180        let consistency = types::read_consistency(buf)?;
181
182        let flags = buf.get_u8();
183        let unknown_flags = flags & (!ALL_FLAGS);
184        if unknown_flags != 0 {
185            return Err(RequestDeserializationError::UnknownFlags {
186                flags: unknown_flags,
187            });
188        }
189        let values_flag = (flags & FLAG_VALUES) != 0;
190        let skip_metadata = (flags & FLAG_SKIP_METADATA) != 0;
191        let page_size_flag = (flags & FLAG_PAGE_SIZE) != 0;
192        let paging_state_flag = (flags & FLAG_WITH_PAGING_STATE) != 0;
193        let serial_consistency_flag = (flags & FLAG_WITH_SERIAL_CONSISTENCY) != 0;
194        let default_timestamp_flag = (flags & FLAG_WITH_DEFAULT_TIMESTAMP) != 0;
195        let values_have_names_flag = (flags & FLAG_WITH_NAMES_FOR_VALUES) != 0;
196
197        if values_have_names_flag {
198            return Err(RequestDeserializationError::NamedValuesUnsupported);
199        }
200
201        let values = Cow::Owned(if values_flag {
202            SerializedValues::new_from_frame(buf)?
203        } else {
204            SerializedValues::new()
205        });
206
207        let page_size = page_size_flag.then(|| types::read_int(buf)).transpose()?;
208        let paging_state = if paging_state_flag {
209            PagingState::new_from_raw_bytes(types::read_bytes(buf)?)
210        } else {
211            PagingState::start()
212        };
213        let serial_consistency = serial_consistency_flag
214            .then(|| types::read_consistency(buf))
215            .transpose()?
216            .map(
217                |consistency| match SerialConsistency::try_from(consistency) {
218                    Ok(serial_consistency) => Ok(serial_consistency),
219                    Err(_) => Err(RequestDeserializationError::ExpectedSerialConsistency(
220                        consistency,
221                    )),
222                },
223            )
224            .transpose()?;
225        let timestamp = if default_timestamp_flag {
226            Some(types::read_long(buf)?)
227        } else {
228            None
229        };
230
231        Ok(Self {
232            consistency,
233            serial_consistency,
234            timestamp,
235            page_size,
236            paging_state,
237            skip_metadata,
238            values,
239        })
240    }
241}
242
243/// A response containing the paging state of a paged query,
244/// i.e. whether there are more pages to fetch or not, and if so,
245/// what is the state to use for resuming the query from the next page.
246#[derive(Debug, Clone)]
247pub enum PagingStateResponse {
248    /// Indicates that there are more pages to fetch, and provides the
249    /// [PagingState] to use for resuming the query from the next page.
250    HasMorePages {
251        /// The paging state to use for resuming the query from the next page.
252        state: PagingState,
253    },
254
255    /// Indicates that there are no more pages to fetch, and the query has finished.
256    NoMorePages,
257}
258
259impl PagingStateResponse {
260    /// Determines if the query has finished or it should be resumed with given
261    /// [PagingState] in order to fetch next pages.
262    #[inline]
263    pub fn finished(&self) -> bool {
264        matches!(*self, Self::NoMorePages)
265    }
266
267    pub(crate) fn new_from_raw_bytes(raw_paging_state: Option<&[u8]>) -> Self {
268        match raw_paging_state {
269            Some(raw_bytes) => Self::HasMorePages {
270                state: PagingState::new_from_raw_bytes(raw_bytes),
271            },
272            None => Self::NoMorePages,
273        }
274    }
275
276    /// Converts the response into [ControlFlow], signalling whether the query has finished
277    /// or it should be resumed with given [PagingState] in order to fetch next pages.
278    #[inline]
279    pub fn into_paging_control_flow(self) -> ControlFlow<(), PagingState> {
280        match self {
281            Self::HasMorePages {
282                state: next_page_handle,
283            } => ControlFlow::Continue(next_page_handle),
284            Self::NoMorePages => ControlFlow::Break(()),
285        }
286    }
287}
288
289/// The state of a paged query, i.e. where to resume fetching result rows
290/// upon next request.
291///
292/// Cheaply clonable.
293#[derive(Debug, Clone, PartialEq, Eq)]
294pub struct PagingState(Option<Arc<[u8]>>);
295
296impl PagingState {
297    /// A start state - the state of a not-yet-started paged query.
298    #[inline]
299    pub fn start() -> Self {
300        Self(None)
301    }
302
303    /// Returns the inner representation of [PagingState].
304    /// One can use this to store paging state for a longer time,
305    /// and later restore it using [Self::new_from_raw_bytes].
306    /// In case None is returned, this signifies
307    /// [PagingState::start()] being underneath.
308    #[inline]
309    pub fn as_bytes_slice(&self) -> Option<&Arc<[u8]>> {
310        self.0.as_ref()
311    }
312
313    /// Creates PagingState from its inner representation.
314    /// One can use this to restore paging state after longer time,
315    /// having previously stored it using [Self::as_bytes_slice].
316    #[inline]
317    pub fn new_from_raw_bytes(raw_bytes: impl Into<Arc<[u8]>>) -> Self {
318        Self(Some(raw_bytes.into()))
319    }
320}
321
322impl Default for PagingState {
323    fn default() -> Self {
324        Self::start()
325    }
326}
327
328/// An error type returned when serialization of QUERY request fails.
329#[non_exhaustive]
330#[derive(Error, Debug, Clone)]
331pub enum QuerySerializationError {
332    /// Failed to serialize query parameters.
333    #[error("Invalid query parameters: {0}")]
334    QueryParametersSerialization(QueryParametersSerializationError),
335
336    /// Failed to serialize the CQL statement string.
337    #[error("Failed to serialize a statement content: {0}")]
338    StatementStringSerialization(TryFromIntError),
339}
340
341/// An error type returned when serialization of query parameters fails.
342#[non_exhaustive]
343#[derive(Error, Debug, Clone)]
344pub enum QueryParametersSerializationError {
345    /// Failed to serialize paging state.
346    #[error("Malformed paging state: {0}")]
347    BadPagingState(#[from] TryFromIntError),
348}