1use bytes::Bytes;
2
3use crate::frame::response::result::{
4 ColumnSpec, DeserializedMetadataAndRawRows, ResultMetadata, ResultMetadataHolder,
5};
6
7use super::row::{mk_deser_err, BuiltinDeserializationErrorKind, ColumnIterator, DeserializeRow};
8use super::{DeserializationError, FrameSlice, TypeCheckError};
9use std::marker::PhantomData;
10
11#[derive(Debug)]
13pub struct RawRowIterator<'frame, 'metadata> {
14 specs: &'metadata [ColumnSpec<'metadata>],
15 remaining: usize,
16 slice: FrameSlice<'frame>,
17}
18
19impl<'frame, 'metadata> RawRowIterator<'frame, 'metadata> {
20 #[inline]
26 pub fn new(
27 remaining: usize,
28 specs: &'metadata [ColumnSpec<'metadata>],
29 slice: FrameSlice<'frame>,
30 ) -> Self {
31 Self {
32 specs,
33 remaining,
34 slice,
35 }
36 }
37
38 #[inline]
40 pub fn specs(&self) -> &'metadata [ColumnSpec<'metadata>] {
41 self.specs
42 }
43
44 #[inline]
47 pub fn rows_remaining(&self) -> usize {
48 self.remaining
49 }
50}
51
52impl<'frame, 'metadata> Iterator for RawRowIterator<'frame, 'metadata> {
53 type Item = Result<ColumnIterator<'frame, 'metadata>, DeserializationError>;
54
55 #[inline]
56 fn next(&mut self) -> Option<Self::Item> {
57 self.remaining = self.remaining.checked_sub(1)?;
58
59 let iter = ColumnIterator::new(self.specs, self.slice);
60
61 for (column_index, spec) in self.specs.iter().enumerate() {
63 if let Err(err) = self.slice.read_cql_bytes() {
64 return Some(Err(mk_deser_err::<Self>(
65 BuiltinDeserializationErrorKind::RawColumnDeserializationFailed {
66 column_index,
67 column_name: spec.name().to_owned(),
68 err: DeserializationError::new(err),
69 },
70 )));
71 }
72 }
73
74 Some(Ok(iter))
75 }
76
77 #[inline]
78 fn size_hint(&self) -> (usize, Option<usize>) {
79 (self.remaining, Some(self.remaining))
83 }
84}
85
86#[derive(Debug)]
89pub struct TypedRowIterator<'frame, 'metadata, R> {
90 inner: RawRowIterator<'frame, 'metadata>,
91 _phantom: PhantomData<R>,
92}
93
94impl<'frame, 'metadata, R> TypedRowIterator<'frame, 'metadata, R>
95where
96 R: DeserializeRow<'frame, 'metadata>,
97{
98 #[inline]
102 pub fn new(raw: RawRowIterator<'frame, 'metadata>) -> Result<Self, TypeCheckError> {
103 R::type_check(raw.specs())?;
104 Ok(Self {
105 inner: raw,
106 _phantom: PhantomData,
107 })
108 }
109
110 #[inline]
112 pub fn specs(&self) -> &'metadata [ColumnSpec<'metadata>] {
113 self.inner.specs()
114 }
115
116 #[inline]
119 pub fn rows_remaining(&self) -> usize {
120 self.inner.rows_remaining()
121 }
122}
123
124impl<'frame, 'metadata, R> Iterator for TypedRowIterator<'frame, 'metadata, R>
125where
126 R: DeserializeRow<'frame, 'metadata>,
127{
128 type Item = Result<R, DeserializationError>;
129
130 #[inline]
131 fn next(&mut self) -> Option<Self::Item> {
132 self.inner
133 .next()
134 .map(|raw| raw.and_then(|raw| R::deserialize(raw)))
135 }
136
137 #[inline]
138 fn size_hint(&self) -> (usize, Option<usize>) {
139 self.inner.size_hint()
140 }
141}
142
143#[derive(Debug)]
152pub struct RawRowLendingIterator {
153 metadata: ResultMetadataHolder,
154 remaining: usize,
155 at: usize,
156 raw_rows: Bytes,
157}
158
159impl RawRowLendingIterator {
160 #[inline]
162 pub fn new(raw_rows: DeserializedMetadataAndRawRows) -> Self {
163 let (metadata, rows_count, raw_rows) = raw_rows.into_inner();
164 Self {
165 metadata,
166 remaining: rows_count,
167 at: 0,
168 raw_rows,
169 }
170 }
171
172 #[inline]
178 #[allow(clippy::should_implement_trait)] pub fn next(&mut self) -> Option<Result<ColumnIterator, DeserializationError>> {
180 self.remaining = self.remaining.checked_sub(1)?;
181
182 let mut remaining_frame = FrameSlice::new(&self.raw_rows);
184 *remaining_frame.as_slice_mut() = &remaining_frame.as_slice()[self.at..];
186 let iter = ColumnIterator::new(self.metadata.inner().col_specs(), remaining_frame);
190
191 for (column_index, spec) in self.metadata.inner().col_specs().iter().enumerate() {
193 let remaining_frame_len_before_column_read = remaining_frame.as_slice().len();
194 if let Err(err) = remaining_frame.read_cql_bytes() {
195 return Some(Err(mk_deser_err::<Self>(
196 BuiltinDeserializationErrorKind::RawColumnDeserializationFailed {
197 column_index,
198 column_name: spec.name().to_owned(),
199 err: DeserializationError::new(err),
200 },
201 )));
202 } else {
203 let remaining_frame_len_after_column_read = remaining_frame.as_slice().len();
204 self.at +=
205 remaining_frame_len_before_column_read - remaining_frame_len_after_column_read;
206 }
207 }
208
209 Some(Ok(iter))
210 }
211
212 #[inline]
213 pub fn size_hint(&self) -> (usize, Option<usize>) {
214 (self.remaining, Some(self.remaining))
218 }
219
220 #[inline]
223 pub fn metadata(&self) -> &ResultMetadata<'_> {
224 self.metadata.inner()
225 }
226
227 #[inline]
230 pub fn rows_remaining(&self) -> usize {
231 self.remaining
232 }
233}
234
235#[cfg(test)]
236mod tests {
237
238 use bytes::Bytes;
239 use std::ops::Deref;
240
241 use crate::frame::response::result::{
242 ColumnSpec, ColumnType, DeserializedMetadataAndRawRows, NativeType, ResultMetadata,
243 };
244
245 use super::super::tests::{serialize_cells, spec, CELL1, CELL2};
246 use super::{
247 ColumnIterator, DeserializationError, FrameSlice, RawRowIterator, RawRowLendingIterator,
248 TypedRowIterator,
249 };
250
251 trait LendingIterator {
252 type Item<'borrow>
253 where
254 Self: 'borrow;
255 fn lend_next(&mut self) -> Option<Result<Self::Item<'_>, DeserializationError>>;
256 }
257
258 #[allow(clippy::needless_lifetimes)]
261 impl<'frame, 'metadata> LendingIterator for RawRowIterator<'frame, 'metadata> {
262 type Item<'borrow>
263 = ColumnIterator<'borrow, 'borrow>
264 where
265 Self: 'borrow;
266
267 fn lend_next(&mut self) -> Option<Result<ColumnIterator<'_, '_>, DeserializationError>> {
268 self.next()
269 }
270 }
271
272 impl LendingIterator for RawRowLendingIterator {
273 type Item<'borrow> = ColumnIterator<'borrow, 'borrow>;
274
275 fn lend_next(&mut self) -> Option<Result<ColumnIterator<'_, '_>, DeserializationError>> {
276 self.next()
277 }
278 }
279
280 #[test]
281 fn test_row_iterators_basic_parse() {
282 static SPECS: &[ColumnSpec<'static>] = &[
295 spec("b1", ColumnType::Native(NativeType::Blob)),
296 spec("b2", ColumnType::Native(NativeType::Blob)),
297 ];
298 lazy_static::lazy_static! {
299 static ref RAW_DATA: Bytes = serialize_cells([Some(CELL1), Some(CELL2), Some(CELL2), Some(CELL1)]);
300 }
301 let raw_data = RAW_DATA.deref();
302 let specs = SPECS;
303
304 let row_iter = RawRowIterator::new(2, specs, FrameSlice::new(raw_data));
305 let lending_row_iter =
306 RawRowLendingIterator::new(DeserializedMetadataAndRawRows::new_for_test(
307 ResultMetadata::new_for_test(specs.len(), specs.to_vec()),
308 2,
309 raw_data.clone(),
310 ));
311 check(row_iter);
312 check(lending_row_iter);
313
314 fn check<I>(mut iter: I)
315 where
316 I: for<'item> LendingIterator<Item<'item> = ColumnIterator<'item, 'item>>,
317 {
318 let mut row1 = iter.lend_next().unwrap().unwrap();
319 let c11 = row1.next().unwrap().unwrap();
320 assert_eq!(c11.slice.unwrap().as_slice(), CELL1);
321 let c12 = row1.next().unwrap().unwrap();
322 assert_eq!(c12.slice.unwrap().as_slice(), CELL2);
323 assert!(row1.next().is_none());
324
325 let mut row2 = iter.lend_next().unwrap().unwrap();
326 let c21 = row2.next().unwrap().unwrap();
327 assert_eq!(c21.slice.unwrap().as_slice(), CELL2);
328 let c22 = row2.next().unwrap().unwrap();
329 assert_eq!(c22.slice.unwrap().as_slice(), CELL1);
330 assert!(row2.next().is_none());
331
332 assert!(iter.lend_next().is_none());
333 }
334 }
335
336 #[test]
337 fn test_row_iterators_too_few_rows() {
338 static SPECS: &[ColumnSpec<'static>] = &[
339 spec("b1", ColumnType::Native(NativeType::Blob)),
340 spec("b2", ColumnType::Native(NativeType::Blob)),
341 ];
342 lazy_static::lazy_static! {
343 static ref RAW_DATA: Bytes = serialize_cells([Some(CELL1), Some(CELL2)]);
344 }
345 let raw_data = RAW_DATA.deref();
346 let specs = SPECS;
347
348 let row_iter = RawRowIterator::new(2, specs, FrameSlice::new(raw_data));
349 let lending_row_iter =
350 RawRowLendingIterator::new(DeserializedMetadataAndRawRows::new_for_test(
351 ResultMetadata::new_for_test(specs.len(), specs.to_vec()),
352 2,
353 raw_data.clone(),
354 ));
355 check(row_iter);
356 check(lending_row_iter);
357
358 fn check<I>(mut iter: I)
359 where
360 I: for<'item> LendingIterator<Item<'item> = ColumnIterator<'item, 'item>>,
361 {
362 iter.lend_next().unwrap().unwrap();
363 iter.lend_next().unwrap().unwrap_err();
364 }
365 }
366
367 #[test]
368 fn test_typed_row_iterator_basic_parse() {
369 let raw_data = serialize_cells([Some(CELL1), Some(CELL2), Some(CELL2), Some(CELL1)]);
370 let specs = [
371 spec("b1", ColumnType::Native(NativeType::Blob)),
372 spec("b2", ColumnType::Native(NativeType::Blob)),
373 ];
374 let iter = RawRowIterator::new(2, &specs, FrameSlice::new(&raw_data));
375 let mut iter = TypedRowIterator::<'_, '_, (&[u8], Vec<u8>)>::new(iter).unwrap();
376
377 let (c11, c12) = iter.next().unwrap().unwrap();
378 assert_eq!(c11, CELL1);
379 assert_eq!(c12, CELL2);
380
381 let (c21, c22) = iter.next().unwrap().unwrap();
382 assert_eq!(c21, CELL2);
383 assert_eq!(c22, CELL1);
384
385 assert!(iter.next().is_none());
386 }
387
388 #[test]
389 fn test_typed_row_iterator_wrong_type() {
390 let raw_data = Bytes::new();
391 let specs = [
392 spec("b1", ColumnType::Native(NativeType::Blob)),
393 spec("b2", ColumnType::Native(NativeType::Blob)),
394 ];
395 let iter = RawRowIterator::new(0, &specs, FrameSlice::new(&raw_data));
396 assert!(TypedRowIterator::<'_, '_, (i32, i64)>::new(iter).is_err());
397 }
398}