use bytes::Bytes;
use crate::frame::response::result::{
ColumnSpec, DeserializedMetadataAndRawRows, ResultMetadata, ResultMetadataHolder,
};
use super::row::{mk_deser_err, BuiltinDeserializationErrorKind, ColumnIterator, DeserializeRow};
use super::{DeserializationError, FrameSlice, TypeCheckError};
use std::marker::PhantomData;
#[derive(Debug)]
pub struct RawRowIterator<'frame, 'metadata> {
specs: &'metadata [ColumnSpec<'metadata>],
remaining: usize,
slice: FrameSlice<'frame>,
}
impl<'frame, 'metadata> RawRowIterator<'frame, 'metadata> {
#[inline]
pub fn new(
remaining: usize,
specs: &'metadata [ColumnSpec<'metadata>],
slice: FrameSlice<'frame>,
) -> Self {
Self {
specs,
remaining,
slice,
}
}
#[inline]
pub fn specs(&self) -> &'metadata [ColumnSpec<'metadata>] {
self.specs
}
#[inline]
pub fn rows_remaining(&self) -> usize {
self.remaining
}
}
impl<'frame, 'metadata> Iterator for RawRowIterator<'frame, 'metadata> {
type Item = Result<ColumnIterator<'frame, 'metadata>, DeserializationError>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.remaining = self.remaining.checked_sub(1)?;
let iter = ColumnIterator::new(self.specs, self.slice);
for (column_index, spec) in self.specs.iter().enumerate() {
if let Err(err) = self.slice.read_cql_bytes() {
return Some(Err(mk_deser_err::<Self>(
BuiltinDeserializationErrorKind::RawColumnDeserializationFailed {
column_index,
column_name: spec.name().to_owned(),
err: DeserializationError::new(err),
},
)));
}
}
Some(Ok(iter))
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(self.remaining, Some(self.remaining))
}
}
#[derive(Debug)]
pub struct TypedRowIterator<'frame, 'metadata, R> {
inner: RawRowIterator<'frame, 'metadata>,
_phantom: PhantomData<R>,
}
impl<'frame, 'metadata, R> TypedRowIterator<'frame, 'metadata, R>
where
R: DeserializeRow<'frame, 'metadata>,
{
#[inline]
pub fn new(raw: RawRowIterator<'frame, 'metadata>) -> Result<Self, TypeCheckError> {
R::type_check(raw.specs())?;
Ok(Self {
inner: raw,
_phantom: PhantomData,
})
}
#[inline]
pub fn specs(&self) -> &'metadata [ColumnSpec<'metadata>] {
self.inner.specs()
}
#[inline]
pub fn rows_remaining(&self) -> usize {
self.inner.rows_remaining()
}
}
impl<'frame, 'metadata, R> Iterator for TypedRowIterator<'frame, 'metadata, R>
where
R: DeserializeRow<'frame, 'metadata>,
{
type Item = Result<R, DeserializationError>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.inner
.next()
.map(|raw| raw.and_then(|raw| R::deserialize(raw)))
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
#[derive(Debug)]
pub struct RawRowLendingIterator {
metadata: ResultMetadataHolder,
remaining: usize,
at: usize,
raw_rows: Bytes,
}
impl RawRowLendingIterator {
#[inline]
pub fn new(raw_rows: DeserializedMetadataAndRawRows) -> Self {
let (metadata, rows_count, raw_rows) = raw_rows.into_inner();
Self {
metadata,
remaining: rows_count,
at: 0,
raw_rows,
}
}
#[inline]
#[allow(clippy::should_implement_trait)] pub fn next(&mut self) -> Option<Result<ColumnIterator, DeserializationError>> {
self.remaining = self.remaining.checked_sub(1)?;
let mut remaining_frame = FrameSlice::new(&self.raw_rows);
*remaining_frame.as_slice_mut() = &remaining_frame.as_slice()[self.at..];
let iter = ColumnIterator::new(self.metadata.inner().col_specs(), remaining_frame);
for (column_index, spec) in self.metadata.inner().col_specs().iter().enumerate() {
let remaining_frame_len_before_column_read = remaining_frame.as_slice().len();
if let Err(err) = remaining_frame.read_cql_bytes() {
return Some(Err(mk_deser_err::<Self>(
BuiltinDeserializationErrorKind::RawColumnDeserializationFailed {
column_index,
column_name: spec.name().to_owned(),
err: DeserializationError::new(err),
},
)));
} else {
let remaining_frame_len_after_column_read = remaining_frame.as_slice().len();
self.at +=
remaining_frame_len_before_column_read - remaining_frame_len_after_column_read;
}
}
Some(Ok(iter))
}
#[inline]
pub fn size_hint(&self) -> (usize, Option<usize>) {
(self.remaining, Some(self.remaining))
}
#[inline]
pub fn metadata(&self) -> &ResultMetadata<'_> {
self.metadata.inner()
}
#[inline]
pub fn rows_remaining(&self) -> usize {
self.remaining
}
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
use std::ops::Deref;
use crate::frame::response::result::{
ColumnSpec, ColumnType, DeserializedMetadataAndRawRows, ResultMetadata,
};
use super::super::tests::{serialize_cells, spec, CELL1, CELL2};
use super::{
ColumnIterator, DeserializationError, FrameSlice, RawRowIterator, RawRowLendingIterator,
TypedRowIterator,
};
trait LendingIterator {
type Item<'borrow>
where
Self: 'borrow;
fn lend_next(&mut self) -> Option<Result<Self::Item<'_>, DeserializationError>>;
}
#[allow(clippy::needless_lifetimes)]
impl<'frame, 'metadata> LendingIterator for RawRowIterator<'frame, 'metadata> {
type Item<'borrow>
= ColumnIterator<'borrow, 'borrow>
where
Self: 'borrow;
fn lend_next(&mut self) -> Option<Result<ColumnIterator<'_, '_>, DeserializationError>> {
self.next()
}
}
impl LendingIterator for RawRowLendingIterator {
type Item<'borrow> = ColumnIterator<'borrow, 'borrow>;
fn lend_next(&mut self) -> Option<Result<ColumnIterator<'_, '_>, DeserializationError>> {
self.next()
}
}
#[test]
fn test_row_iterators_basic_parse() {
static SPECS: &[ColumnSpec<'static>] =
&[spec("b1", ColumnType::Blob), spec("b2", ColumnType::Blob)];
lazy_static::lazy_static! {
static ref RAW_DATA: Bytes = serialize_cells([Some(CELL1), Some(CELL2), Some(CELL2), Some(CELL1)]);
}
let raw_data = RAW_DATA.deref();
let specs = SPECS;
let row_iter = RawRowIterator::new(2, specs, FrameSlice::new(raw_data));
let lending_row_iter =
RawRowLendingIterator::new(DeserializedMetadataAndRawRows::new_for_test(
ResultMetadata::new_for_test(specs.len(), specs.to_vec()),
2,
raw_data.clone(),
));
check(row_iter);
check(lending_row_iter);
fn check<I>(mut iter: I)
where
I: for<'item> LendingIterator<Item<'item> = ColumnIterator<'item, 'item>>,
{
let mut row1 = iter.lend_next().unwrap().unwrap();
let c11 = row1.next().unwrap().unwrap();
assert_eq!(c11.slice.unwrap().as_slice(), CELL1);
let c12 = row1.next().unwrap().unwrap();
assert_eq!(c12.slice.unwrap().as_slice(), CELL2);
assert!(row1.next().is_none());
let mut row2 = iter.lend_next().unwrap().unwrap();
let c21 = row2.next().unwrap().unwrap();
assert_eq!(c21.slice.unwrap().as_slice(), CELL2);
let c22 = row2.next().unwrap().unwrap();
assert_eq!(c22.slice.unwrap().as_slice(), CELL1);
assert!(row2.next().is_none());
assert!(iter.lend_next().is_none());
}
}
#[test]
fn test_row_iterators_too_few_rows() {
static SPECS: &[ColumnSpec<'static>] =
&[spec("b1", ColumnType::Blob), spec("b2", ColumnType::Blob)];
lazy_static::lazy_static! {
static ref RAW_DATA: Bytes = serialize_cells([Some(CELL1), Some(CELL2)]);
}
let raw_data = RAW_DATA.deref();
let specs = SPECS;
let row_iter = RawRowIterator::new(2, specs, FrameSlice::new(raw_data));
let lending_row_iter =
RawRowLendingIterator::new(DeserializedMetadataAndRawRows::new_for_test(
ResultMetadata::new_for_test(specs.len(), specs.to_vec()),
2,
raw_data.clone(),
));
check(row_iter);
check(lending_row_iter);
fn check<I>(mut iter: I)
where
I: for<'item> LendingIterator<Item<'item> = ColumnIterator<'item, 'item>>,
{
iter.lend_next().unwrap().unwrap();
iter.lend_next().unwrap().unwrap_err();
}
}
#[test]
fn test_typed_row_iterator_basic_parse() {
let raw_data = serialize_cells([Some(CELL1), Some(CELL2), Some(CELL2), Some(CELL1)]);
let specs = [spec("b1", ColumnType::Blob), spec("b2", ColumnType::Blob)];
let iter = RawRowIterator::new(2, &specs, FrameSlice::new(&raw_data));
let mut iter = TypedRowIterator::<'_, '_, (&[u8], Vec<u8>)>::new(iter).unwrap();
let (c11, c12) = iter.next().unwrap().unwrap();
assert_eq!(c11, CELL1);
assert_eq!(c12, CELL2);
let (c21, c22) = iter.next().unwrap().unwrap();
assert_eq!(c21, CELL2);
assert_eq!(c22, CELL1);
assert!(iter.next().is_none());
}
#[test]
fn test_typed_row_iterator_wrong_type() {
let raw_data = Bytes::new();
let specs = [spec("b1", ColumnType::Blob), spec("b2", ColumnType::Blob)];
let iter = RawRowIterator::new(0, &specs, FrameSlice::new(&raw_data));
assert!(TypedRowIterator::<'_, '_, (i32, i64)>::new(iter).is_err());
}
}