scylla_cql/frame/request/
query.rs1use std::{borrow::Cow, num::TryFromIntError, ops::ControlFlow, sync::Arc};
2
3use crate::frame::{frame_errors::CqlRequestSerializationError, types::SerialConsistency};
4use crate::serialize::row::SerializedValues;
5use bytes::{Buf, BufMut};
6use thiserror::Error;
7
8use crate::{
9 frame::request::{RequestOpcode, SerializableRequest},
10 frame::types,
11};
12
13use super::{DeserializableRequest, RequestDeserializationError};
14
15const FLAG_VALUES: u8 = 0x01;
17const FLAG_SKIP_METADATA: u8 = 0x02;
18const FLAG_PAGE_SIZE: u8 = 0x04;
19const FLAG_WITH_PAGING_STATE: u8 = 0x08;
20const FLAG_WITH_SERIAL_CONSISTENCY: u8 = 0x10;
21const FLAG_WITH_DEFAULT_TIMESTAMP: u8 = 0x20;
22const FLAG_WITH_NAMES_FOR_VALUES: u8 = 0x40;
23const ALL_FLAGS: u8 = FLAG_VALUES
24 | FLAG_SKIP_METADATA
25 | FLAG_PAGE_SIZE
26 | FLAG_WITH_PAGING_STATE
27 | FLAG_WITH_SERIAL_CONSISTENCY
28 | FLAG_WITH_DEFAULT_TIMESTAMP
29 | FLAG_WITH_NAMES_FOR_VALUES;
30
31#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
32pub struct Query<'q> {
33 pub contents: Cow<'q, str>,
34 pub parameters: QueryParameters<'q>,
35}
36
37impl SerializableRequest for Query<'_> {
38 const OPCODE: RequestOpcode = RequestOpcode::Query;
39
40 fn serialize(&self, buf: &mut Vec<u8>) -> Result<(), CqlRequestSerializationError> {
41 types::write_long_string(&self.contents, buf)
42 .map_err(QuerySerializationError::StatementStringSerialization)?;
43 self.parameters
44 .serialize(buf)
45 .map_err(QuerySerializationError::QueryParametersSerialization)?;
46 Ok(())
47 }
48}
49
50impl DeserializableRequest for Query<'_> {
51 fn deserialize(buf: &mut &[u8]) -> Result<Self, RequestDeserializationError> {
52 let contents = Cow::Owned(types::read_long_string(buf)?.to_owned());
53 let parameters = QueryParameters::deserialize(buf)?;
54
55 Ok(Self {
56 contents,
57 parameters,
58 })
59 }
60}
61
62#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
63pub struct QueryParameters<'a> {
64 pub consistency: types::Consistency,
65 pub serial_consistency: Option<types::SerialConsistency>,
66 pub timestamp: Option<i64>,
67 pub page_size: Option<i32>,
68 pub paging_state: PagingState,
69 pub skip_metadata: bool,
70 pub values: Cow<'a, SerializedValues>,
71}
72
73impl Default for QueryParameters<'_> {
74 fn default() -> Self {
75 Self {
76 consistency: Default::default(),
77 serial_consistency: None,
78 timestamp: None,
79 page_size: None,
80 paging_state: PagingState::start(),
81 skip_metadata: false,
82 values: Cow::Borrowed(SerializedValues::EMPTY),
83 }
84 }
85}
86
87impl QueryParameters<'_> {
88 pub fn serialize(
89 &self,
90 buf: &mut impl BufMut,
91 ) -> Result<(), QueryParametersSerializationError> {
92 types::write_consistency(self.consistency, buf);
93
94 let paging_state_bytes = self.paging_state.as_bytes_slice();
95
96 let mut flags = 0;
97 if !self.values.is_empty() {
98 flags |= FLAG_VALUES;
99 }
100
101 if self.skip_metadata {
102 flags |= FLAG_SKIP_METADATA;
103 }
104
105 if self.page_size.is_some() {
106 flags |= FLAG_PAGE_SIZE;
107 }
108
109 if paging_state_bytes.is_some() {
110 flags |= FLAG_WITH_PAGING_STATE;
111 }
112
113 if self.serial_consistency.is_some() {
114 flags |= FLAG_WITH_SERIAL_CONSISTENCY;
115 }
116
117 if self.timestamp.is_some() {
118 flags |= FLAG_WITH_DEFAULT_TIMESTAMP;
119 }
120
121 buf.put_u8(flags);
122
123 if !self.values.is_empty() {
124 self.values.write_to_request(buf);
125 }
126
127 if let Some(page_size) = self.page_size {
128 types::write_int(page_size, buf);
129 }
130
131 if let Some(paging_state_bytes) = paging_state_bytes {
132 types::write_bytes(paging_state_bytes, buf)?;
133 }
134
135 if let Some(serial_consistency) = self.serial_consistency {
136 types::write_serial_consistency(serial_consistency, buf);
137 }
138
139 if let Some(timestamp) = self.timestamp {
140 types::write_long(timestamp, buf);
141 }
142
143 Ok(())
144 }
145}
146
147impl QueryParameters<'_> {
148 pub fn deserialize(buf: &mut &[u8]) -> Result<Self, RequestDeserializationError> {
149 let consistency = types::read_consistency(buf)?;
150
151 let flags = buf.get_u8();
152 let unknown_flags = flags & (!ALL_FLAGS);
153 if unknown_flags != 0 {
154 return Err(RequestDeserializationError::UnknownFlags {
155 flags: unknown_flags,
156 });
157 }
158 let values_flag = (flags & FLAG_VALUES) != 0;
159 let skip_metadata = (flags & FLAG_SKIP_METADATA) != 0;
160 let page_size_flag = (flags & FLAG_PAGE_SIZE) != 0;
161 let paging_state_flag = (flags & FLAG_WITH_PAGING_STATE) != 0;
162 let serial_consistency_flag = (flags & FLAG_WITH_SERIAL_CONSISTENCY) != 0;
163 let default_timestamp_flag = (flags & FLAG_WITH_DEFAULT_TIMESTAMP) != 0;
164 let values_have_names_flag = (flags & FLAG_WITH_NAMES_FOR_VALUES) != 0;
165
166 if values_have_names_flag {
167 return Err(RequestDeserializationError::NamedValuesUnsupported);
168 }
169
170 let values = Cow::Owned(if values_flag {
171 SerializedValues::new_from_frame(buf)?
172 } else {
173 SerializedValues::new()
174 });
175
176 let page_size = page_size_flag.then(|| types::read_int(buf)).transpose()?;
177 let paging_state = if paging_state_flag {
178 PagingState::new_from_raw_bytes(types::read_bytes(buf)?)
179 } else {
180 PagingState::start()
181 };
182 let serial_consistency = serial_consistency_flag
183 .then(|| types::read_consistency(buf))
184 .transpose()?
185 .map(
186 |consistency| match SerialConsistency::try_from(consistency) {
187 Ok(serial_consistency) => Ok(serial_consistency),
188 Err(_) => Err(RequestDeserializationError::ExpectedSerialConsistency(
189 consistency,
190 )),
191 },
192 )
193 .transpose()?;
194 let timestamp = if default_timestamp_flag {
195 Some(types::read_long(buf)?)
196 } else {
197 None
198 };
199
200 Ok(Self {
201 consistency,
202 serial_consistency,
203 timestamp,
204 page_size,
205 paging_state,
206 skip_metadata,
207 values,
208 })
209 }
210}
211
212#[derive(Debug, Clone)]
213pub enum PagingStateResponse {
214 HasMorePages { state: PagingState },
215 NoMorePages,
216}
217
218impl PagingStateResponse {
219 #[inline]
222 pub fn finished(&self) -> bool {
223 matches!(*self, Self::NoMorePages)
224 }
225
226 pub(crate) fn new_from_raw_bytes(raw_paging_state: Option<&[u8]>) -> Self {
227 match raw_paging_state {
228 Some(raw_bytes) => Self::HasMorePages {
229 state: PagingState::new_from_raw_bytes(raw_bytes),
230 },
231 None => Self::NoMorePages,
232 }
233 }
234
235 #[inline]
238 pub fn into_paging_control_flow(self) -> ControlFlow<(), PagingState> {
239 match self {
240 Self::HasMorePages {
241 state: next_page_handle,
242 } => ControlFlow::Continue(next_page_handle),
243 Self::NoMorePages => ControlFlow::Break(()),
244 }
245 }
246}
247
248#[derive(Debug, Clone, PartialEq, Eq)]
253pub struct PagingState(Option<Arc<[u8]>>);
254
255impl PagingState {
256 #[inline]
258 pub fn start() -> Self {
259 Self(None)
260 }
261
262 #[inline]
268 pub fn as_bytes_slice(&self) -> Option<&Arc<[u8]>> {
269 self.0.as_ref()
270 }
271
272 #[inline]
276 pub fn new_from_raw_bytes(raw_bytes: impl Into<Arc<[u8]>>) -> Self {
277 Self(Some(raw_bytes.into()))
278 }
279}
280
281impl Default for PagingState {
282 fn default() -> Self {
283 Self::start()
284 }
285}
286
287#[non_exhaustive]
289#[derive(Error, Debug, Clone)]
290pub enum QuerySerializationError {
291 #[error("Invalid query parameters: {0}")]
293 QueryParametersSerialization(QueryParametersSerializationError),
294
295 #[error("Failed to serialize a statement content: {0}")]
297 StatementStringSerialization(TryFromIntError),
298}
299
300#[non_exhaustive]
302#[derive(Error, Debug, Clone)]
303pub enum QueryParametersSerializationError {
304 #[error("Malformed paging state: {0}")]
306 BadPagingState(#[from] TryFromIntError),
307}