1use crate::deserialize::result::{RawRowIterator, TypedRowIterator};
2use crate::deserialize::row::DeserializeRow;
3use crate::deserialize::{FrameSlice, TypeCheckError};
4use crate::frame::frame_errors::{
5 ColumnSpecParseError, ColumnSpecParseErrorKind, CqlResultParseError, CqlTypeParseError,
6 LowLevelDeserializationError, PreparedMetadataParseError, PreparedParseError,
7 RawRowsAndPagingStateResponseParseError, ResultMetadataAndRowsCountParseError,
8 ResultMetadataParseError, SchemaChangeEventParseError, SetKeyspaceParseError,
9 TableSpecParseError,
10};
11use crate::frame::request::query::PagingStateResponse;
12use crate::frame::response::event::SchemaChangeEvent;
13use crate::frame::types;
14use bytes::{Buf, Bytes};
15use std::borrow::Cow;
16use std::fmt::Debug;
17use std::sync::Arc;
18use std::{result::Result as StdResult, str};
19
20#[derive(Debug)]
21pub struct SetKeyspace {
22 pub keyspace_name: String,
23}
24
25#[derive(Debug)]
26pub struct Prepared {
27 pub id: Bytes,
28 pub prepared_metadata: PreparedMetadata,
29 pub result_metadata: ResultMetadata<'static>,
30}
31
32#[derive(Debug)]
33pub struct SchemaChange {
34 pub event: SchemaChangeEvent,
35}
36
37#[derive(Debug, Clone, PartialEq, Eq, Hash)]
38pub struct TableSpec<'a> {
39 ks_name: Cow<'a, str>,
40 table_name: Cow<'a, str>,
41}
42
43#[derive(Clone, Debug, PartialEq, Eq)]
53#[non_exhaustive]
54pub enum ColumnType<'frame> {
55 Native(NativeType),
57
58 Collection {
61 frozen: bool,
62 typ: CollectionType<'frame>,
63 },
64
65 Vector {
69 typ: Box<ColumnType<'frame>>,
70 dimensions: u16,
71 },
72
73 UserDefinedType {
75 frozen: bool,
76 definition: Arc<UserDefinedType<'frame>>,
77 },
78
79 Tuple(Vec<ColumnType<'frame>>),
82}
83
84#[derive(Clone, Debug, PartialEq, Eq)]
86#[non_exhaustive]
87pub enum NativeType {
88 Ascii,
89 Boolean,
90 Blob,
91 Counter,
92 Date,
93 Decimal,
94 Double,
95 Duration,
96 Float,
97 Int,
98 BigInt,
99 Text,
100 Timestamp,
101 Inet,
102 SmallInt,
103 TinyInt,
104 Time,
105 Timeuuid,
106 Uuid,
107 Varint,
108}
109
110#[derive(Clone, Debug, PartialEq, Eq)]
115#[non_exhaustive]
116pub enum CollectionType<'frame> {
117 List(Box<ColumnType<'frame>>),
118 Map(Box<ColumnType<'frame>>, Box<ColumnType<'frame>>),
119 Set(Box<ColumnType<'frame>>),
120}
121
122#[derive(Clone, Debug, PartialEq, Eq)]
124pub struct UserDefinedType<'frame> {
125 pub name: Cow<'frame, str>,
126 pub keyspace: Cow<'frame, str>,
127 pub field_types: Vec<(Cow<'frame, str>, ColumnType<'frame>)>,
128}
129
130impl ColumnType<'_> {
131 pub fn into_owned(self) -> ColumnType<'static> {
132 match self {
133 ColumnType::Native(b) => ColumnType::Native(b),
134 ColumnType::Collection { frozen, typ: t } => ColumnType::Collection {
135 frozen,
136 typ: t.into_owned(),
137 },
138 ColumnType::Vector {
139 typ: type_,
140 dimensions,
141 } => ColumnType::Vector {
142 typ: Box::new(type_.into_owned()),
143 dimensions,
144 },
145 ColumnType::UserDefinedType {
146 frozen,
147 definition: udt,
148 } => {
149 let udt = Arc::try_unwrap(udt).unwrap_or_else(|e| e.as_ref().clone());
150 ColumnType::UserDefinedType {
151 frozen,
152 definition: Arc::new(UserDefinedType {
153 name: udt.name.into_owned().into(),
154 keyspace: udt.keyspace.into_owned().into(),
155 field_types: udt
156 .field_types
157 .into_iter()
158 .map(|(cow, column_type)| {
159 (cow.into_owned().into(), column_type.into_owned())
160 })
161 .collect(),
162 }),
163 }
164 }
165 ColumnType::Tuple(vec) => {
166 ColumnType::Tuple(vec.into_iter().map(ColumnType::into_owned).collect())
167 }
168 }
169 }
170}
171
172impl CollectionType<'_> {
173 fn into_owned(self) -> CollectionType<'static> {
174 match self {
175 CollectionType::List(elem_type) => {
176 CollectionType::List(Box::new(elem_type.into_owned()))
177 }
178 CollectionType::Map(key, value) => {
179 CollectionType::Map(Box::new(key.into_owned()), Box::new(value.into_owned()))
180 }
181 CollectionType::Set(elem_type) => CollectionType::Set(Box::new(elem_type.into_owned())),
182 }
183 }
184}
185
186impl<'a> TableSpec<'a> {
187 pub const fn borrowed(ks: &'a str, table: &'a str) -> Self {
188 Self {
189 ks_name: Cow::Borrowed(ks),
190 table_name: Cow::Borrowed(table),
191 }
192 }
193
194 pub fn ks_name(&'a self) -> &'a str {
195 self.ks_name.as_ref()
196 }
197
198 pub fn table_name(&'a self) -> &'a str {
199 self.table_name.as_ref()
200 }
201
202 pub fn into_owned(self) -> TableSpec<'static> {
203 TableSpec::owned(self.ks_name.into_owned(), self.table_name.into_owned())
204 }
205
206 pub fn to_owned(&self) -> TableSpec<'static> {
207 TableSpec::owned(self.ks_name().to_owned(), self.table_name().to_owned())
208 }
209}
210
211impl TableSpec<'static> {
212 pub fn owned(ks_name: String, table_name: String) -> Self {
213 Self {
214 ks_name: Cow::Owned(ks_name),
215 table_name: Cow::Owned(table_name),
216 }
217 }
218}
219
220impl ColumnType<'_> {
221 pub(crate) fn supports_special_empty_value(&self) -> bool {
230 #[allow(clippy::match_like_matches_macro)]
231 match self {
232 ColumnType::Native(NativeType::Counter)
233 | ColumnType::Native(NativeType::Duration)
234 | ColumnType::Collection { .. }
235 | ColumnType::UserDefinedType { .. } => false,
236
237 _ => true,
238 }
239 }
240}
241
242#[derive(Debug, Clone, PartialEq, Eq)]
243pub struct ColumnSpec<'frame> {
244 pub(crate) table_spec: TableSpec<'frame>,
245 pub(crate) name: Cow<'frame, str>,
246 pub(crate) typ: ColumnType<'frame>,
247}
248
249impl ColumnSpec<'static> {
250 #[inline]
251 pub fn owned(name: String, typ: ColumnType<'static>, table_spec: TableSpec<'static>) -> Self {
252 Self {
253 table_spec,
254 name: Cow::Owned(name),
255 typ,
256 }
257 }
258}
259
260impl<'frame> ColumnSpec<'frame> {
261 #[inline]
262 pub const fn borrowed(
263 name: &'frame str,
264 typ: ColumnType<'frame>,
265 table_spec: TableSpec<'frame>,
266 ) -> Self {
267 Self {
268 table_spec,
269 name: Cow::Borrowed(name),
270 typ,
271 }
272 }
273
274 #[inline]
275 pub fn table_spec(&self) -> &TableSpec<'frame> {
276 &self.table_spec
277 }
278
279 #[inline]
280 pub fn name(&self) -> &str {
281 &self.name
282 }
283
284 #[inline]
285 pub fn typ(&self) -> &ColumnType<'frame> {
286 &self.typ
287 }
288}
289
290#[derive(Debug, Clone)]
291pub struct ResultMetadata<'a> {
292 col_count: usize,
293 col_specs: Vec<ColumnSpec<'a>>,
294}
295
296impl<'a> ResultMetadata<'a> {
297 #[inline]
298 pub fn col_count(&self) -> usize {
299 self.col_count
300 }
301
302 #[inline]
303 pub fn col_specs(&self) -> &[ColumnSpec<'a>] {
304 &self.col_specs
305 }
306
307 #[inline]
310 pub fn mock_empty() -> Self {
311 Self {
312 col_count: 0,
313 col_specs: Vec::new(),
314 }
315 }
316}
317
318#[derive(Debug)]
323pub enum ResultMetadataHolder {
324 SelfBorrowed(SelfBorrowedMetadataContainer),
325 SharedCached(Arc<ResultMetadata<'static>>),
326}
327
328impl ResultMetadataHolder {
329 #[inline]
335 pub fn inner(&self) -> &ResultMetadata<'_> {
336 match self {
337 ResultMetadataHolder::SelfBorrowed(c) => c.metadata(),
338 ResultMetadataHolder::SharedCached(s) => s,
339 }
340 }
341
342 #[inline]
344 pub fn mock_empty() -> Self {
345 Self::SelfBorrowed(SelfBorrowedMetadataContainer::mock_empty())
346 }
347}
348
349#[derive(Debug, Copy, Clone)]
350pub struct PartitionKeyIndex {
351 pub index: u16,
353 pub sequence: u16,
355}
356
357#[derive(Debug, Clone)]
358pub struct PreparedMetadata {
359 pub flags: i32,
360 pub col_count: usize,
361 pub pk_indexes: Vec<PartitionKeyIndex>,
364 pub col_specs: Vec<ColumnSpec<'static>>,
365}
366
367#[derive(Debug, Clone)]
372pub struct RawMetadataAndRawRows {
373 col_count: usize,
375 global_tables_spec: bool,
376 no_metadata: bool,
377
378 raw_metadata_and_rows: Bytes,
380
381 cached_metadata: Option<Arc<ResultMetadata<'static>>>,
383}
384
385impl RawMetadataAndRawRows {
386 #[inline]
390 pub fn mock_empty() -> Self {
391 static EMPTY_METADATA_ZERO_ROWS: &[u8] = &0_i32.to_be_bytes();
394 let raw_metadata_and_rows = Bytes::from_static(EMPTY_METADATA_ZERO_ROWS);
395
396 Self {
397 col_count: 0,
398 global_tables_spec: false,
399 no_metadata: false,
400 raw_metadata_and_rows,
401 cached_metadata: None,
402 }
403 }
404
405 #[inline]
407 pub fn metadata_and_rows_bytes_size(&self) -> usize {
408 self.raw_metadata_and_rows.len()
409 }
410}
411
412mod self_borrowed_metadata {
413 use std::ops::Deref;
414
415 use bytes::Bytes;
416 use yoke::{Yoke, Yokeable};
417
418 use super::ResultMetadata;
419
420 #[derive(Debug, Clone)]
424 struct BytesWrapper {
425 inner: Bytes,
426 }
427
428 impl Deref for BytesWrapper {
429 type Target = [u8];
430
431 fn deref(&self) -> &Self::Target {
432 &self.inner
433 }
434 }
435
436 unsafe impl stable_deref_trait::StableDeref for BytesWrapper {}
440
441 unsafe impl yoke::CloneableCart for BytesWrapper {}
448
449 #[derive(Debug, Clone, Yokeable)]
452 struct ResultMetadataWrapper<'frame>(ResultMetadata<'frame>);
453
454 #[derive(Debug, Clone)]
461 pub struct SelfBorrowedMetadataContainer {
462 metadata_and_raw_rows: Yoke<ResultMetadataWrapper<'static>, BytesWrapper>,
463 }
464
465 impl SelfBorrowedMetadataContainer {
466 pub fn mock_empty() -> Self {
468 Self {
469 metadata_and_raw_rows: Yoke::attach_to_cart(
470 BytesWrapper {
471 inner: Bytes::new(),
472 },
473 |_| ResultMetadataWrapper(ResultMetadata::mock_empty()),
474 ),
475 }
476 }
477
478 pub fn metadata(&self) -> &ResultMetadata<'_> {
480 &self.metadata_and_raw_rows.get().0
481 }
482
483 pub(super) fn make_deserialized_metadata<F, ErrorT>(
486 frame: Bytes,
487 deserializer: F,
488 ) -> Result<(Self, Bytes), ErrorT>
489 where
490 F: for<'frame> FnOnce(&mut &'frame [u8]) -> Result<ResultMetadata<'frame>, ErrorT>,
492 {
493 let deserialized_metadata_and_raw_rows: Yoke<
494 (ResultMetadataWrapper<'static>, &'static [u8]),
495 BytesWrapper,
496 > = Yoke::try_attach_to_cart(BytesWrapper { inner: frame }, |mut slice| {
497 let metadata = deserializer(&mut slice)?;
498 let row_count_and_raw_rows = slice;
499 Ok((ResultMetadataWrapper(metadata), row_count_and_raw_rows))
500 })?;
501
502 let (_metadata, raw_rows) = deserialized_metadata_and_raw_rows.get();
503 let raw_rows_with_count = deserialized_metadata_and_raw_rows
504 .backing_cart()
505 .inner
506 .slice_ref(raw_rows);
507
508 Ok((
509 Self {
510 metadata_and_raw_rows: deserialized_metadata_and_raw_rows
511 .map_project(|(metadata, _), _| metadata),
512 },
513 raw_rows_with_count,
514 ))
515 }
516 }
517}
518pub use self_borrowed_metadata::SelfBorrowedMetadataContainer;
519
520#[derive(Debug)]
524pub struct DeserializedMetadataAndRawRows {
525 metadata: ResultMetadataHolder,
526 rows_count: usize,
527 raw_rows: Bytes,
528}
529
530impl DeserializedMetadataAndRawRows {
531 #[inline]
534 pub fn metadata(&self) -> &ResultMetadata<'_> {
535 self.metadata.inner()
536 }
537
538 #[inline]
541 pub fn into_metadata(self) -> ResultMetadataHolder {
542 self.metadata
543 }
544
545 #[inline]
547 pub fn rows_count(&self) -> usize {
548 self.rows_count
549 }
550
551 #[inline]
553 pub fn rows_bytes_size(&self) -> usize {
554 self.raw_rows.len()
555 }
556
557 #[inline]
560 pub fn mock_empty() -> Self {
561 Self {
562 metadata: ResultMetadataHolder::SelfBorrowed(
563 SelfBorrowedMetadataContainer::mock_empty(),
564 ),
565 rows_count: 0,
566 raw_rows: Bytes::new(),
567 }
568 }
569
570 pub(crate) fn into_inner(self) -> (ResultMetadataHolder, usize, Bytes) {
571 (self.metadata, self.rows_count, self.raw_rows)
572 }
573
574 #[inline]
579 pub fn rows_iter<'frame, 'metadata, R: DeserializeRow<'frame, 'metadata>>(
580 &'frame self,
581 ) -> StdResult<TypedRowIterator<'frame, 'metadata, R>, TypeCheckError>
582 where
583 'frame: 'metadata,
584 {
585 let frame_slice = FrameSlice::new(&self.raw_rows);
586 let raw = RawRowIterator::new(
587 self.rows_count,
588 self.metadata.inner().col_specs(),
589 frame_slice,
590 );
591 TypedRowIterator::new(raw)
592 }
593}
594
595#[derive(Debug)]
596pub enum Result {
597 Void,
598 Rows((RawMetadataAndRawRows, PagingStateResponse)),
599 SetKeyspace(SetKeyspace),
600 Prepared(Prepared),
601 SchemaChange(SchemaChange),
602}
603
604fn deser_type_generic<'frame, 'result, StrT: Into<Cow<'result, str>>>(
605 buf: &mut &'frame [u8],
606 read_string: fn(&mut &'frame [u8]) -> StdResult<StrT, LowLevelDeserializationError>,
607) -> StdResult<ColumnType<'result>, CqlTypeParseError> {
608 use ColumnType::*;
609 use NativeType::*;
610 let id =
611 types::read_short(buf).map_err(|err| CqlTypeParseError::TypeIdParseError(err.into()))?;
612 Ok(match id {
613 0x0000 => {
614 let type_str =
621 types::read_string(buf).map_err(CqlTypeParseError::CustomTypeNameParseError)?;
622 match type_str {
623 "org.apache.cassandra.db.marshal.DurationType" => Native(Duration),
624 _ => {
625 return Err(CqlTypeParseError::CustomTypeUnsupported(
626 type_str.to_owned(),
627 ))
628 }
629 }
630 }
631 0x0001 => Native(Ascii),
632 0x0002 => Native(BigInt),
633 0x0003 => Native(Blob),
634 0x0004 => Native(Boolean),
635 0x0005 => Native(Counter),
636 0x0006 => Native(Decimal),
637 0x0007 => Native(Double),
638 0x0008 => Native(Float),
639 0x0009 => Native(Int),
640 0x000B => Native(Timestamp),
641 0x000C => Native(Uuid),
642 0x000D => Native(Text),
643 0x000E => Native(Varint),
644 0x000F => Native(Timeuuid),
645 0x0010 => Native(Inet),
646 0x0011 => Native(Date),
647 0x0012 => Native(Time),
648 0x0013 => Native(SmallInt),
649 0x0014 => Native(TinyInt),
650 0x0015 => Native(Duration),
651 0x0020 => Collection {
652 frozen: false,
653 typ: CollectionType::List(Box::new(deser_type_generic(buf, read_string)?)),
654 },
655 0x0021 => Collection {
656 frozen: false,
657 typ: CollectionType::Map(
658 Box::new(deser_type_generic(buf, read_string)?),
659 Box::new(deser_type_generic(buf, read_string)?),
660 ),
661 },
662 0x0022 => Collection {
663 frozen: false,
664 typ: CollectionType::Set(Box::new(deser_type_generic(buf, read_string)?)),
665 },
666 0x0030 => {
667 let keyspace_name =
668 read_string(buf).map_err(CqlTypeParseError::UdtKeyspaceNameParseError)?;
669 let type_name = read_string(buf).map_err(CqlTypeParseError::UdtNameParseError)?;
670 let fields_size: usize = types::read_short(buf)
671 .map_err(|err| CqlTypeParseError::UdtFieldsCountParseError(err.into()))?
672 .into();
673
674 let mut field_types: Vec<(Cow<'result, str>, ColumnType)> =
675 Vec::with_capacity(fields_size);
676
677 for _ in 0..fields_size {
678 let field_name =
679 read_string(buf).map_err(CqlTypeParseError::UdtFieldNameParseError)?;
680 let field_type = deser_type_generic(buf, read_string)?;
681
682 field_types.push((field_name.into(), field_type));
683 }
684
685 UserDefinedType {
686 frozen: false,
687 definition: Arc::new(self::UserDefinedType {
688 name: type_name.into(),
689 keyspace: keyspace_name.into(),
690 field_types,
691 }),
692 }
693 }
694 0x0031 => {
695 let len: usize = types::read_short(buf)
696 .map_err(|err| CqlTypeParseError::TupleLengthParseError(err.into()))?
697 .into();
698 let mut types = Vec::with_capacity(len);
699 for _ in 0..len {
700 types.push(deser_type_generic(buf, read_string)?);
701 }
702 Tuple(types)
703 }
704 id => {
705 return Err(CqlTypeParseError::TypeNotImplemented(id));
706 }
707 })
708}
709
710fn deser_type_borrowed<'frame>(
711 buf: &mut &'frame [u8],
712) -> StdResult<ColumnType<'frame>, CqlTypeParseError> {
713 deser_type_generic(buf, |buf| types::read_string(buf))
714}
715
716fn deser_type_owned(buf: &mut &[u8]) -> StdResult<ColumnType<'static>, CqlTypeParseError> {
717 deser_type_generic(buf, |buf| types::read_string(buf).map(ToOwned::to_owned))
718}
719
720fn deser_table_spec<'frame>(
726 buf: &mut &'frame [u8],
727) -> StdResult<TableSpec<'frame>, TableSpecParseError> {
728 let ks_name = types::read_string(buf).map_err(TableSpecParseError::MalformedKeyspaceName)?;
729 let table_name = types::read_string(buf).map_err(TableSpecParseError::MalformedTableName)?;
730 Ok(TableSpec::borrowed(ks_name, table_name))
731}
732
733fn mk_col_spec_parse_error(
734 col_idx: usize,
735 err: impl Into<ColumnSpecParseErrorKind>,
736) -> ColumnSpecParseError {
737 ColumnSpecParseError {
738 column_index: col_idx,
739 kind: err.into(),
740 }
741}
742
743fn deser_col_specs_generic<'frame, 'result>(
744 buf: &mut &'frame [u8],
745 global_table_spec: Option<TableSpec<'frame>>,
746 col_count: usize,
747 make_col_spec: fn(&'frame str, ColumnType<'result>, TableSpec<'frame>) -> ColumnSpec<'result>,
748 deser_type: fn(&mut &'frame [u8]) -> StdResult<ColumnType<'result>, CqlTypeParseError>,
749) -> StdResult<Vec<ColumnSpec<'result>>, ColumnSpecParseError> {
750 let mut col_specs = Vec::with_capacity(col_count);
751 for col_idx in 0..col_count {
752 let table_spec = match global_table_spec {
753 Some(ref known_spec) => known_spec.clone(),
755
756 None => deser_table_spec(buf).map_err(|err| mk_col_spec_parse_error(col_idx, err))?,
758 };
759
760 let name = types::read_string(buf).map_err(|err| mk_col_spec_parse_error(col_idx, err))?;
761 let typ = deser_type(buf).map_err(|err| mk_col_spec_parse_error(col_idx, err))?;
762 let col_spec = make_col_spec(name, typ, table_spec);
763 col_specs.push(col_spec);
764 }
765 Ok(col_specs)
766}
767
768fn deser_col_specs_borrowed<'frame>(
774 buf: &mut &'frame [u8],
775 global_table_spec: Option<TableSpec<'frame>>,
776 col_count: usize,
777) -> StdResult<Vec<ColumnSpec<'frame>>, ColumnSpecParseError> {
778 deser_col_specs_generic(
779 buf,
780 global_table_spec,
781 col_count,
782 ColumnSpec::borrowed,
783 deser_type_borrowed,
784 )
785}
786
787fn deser_col_specs_owned<'frame>(
793 buf: &mut &'frame [u8],
794 global_table_spec: Option<TableSpec<'frame>>,
795 col_count: usize,
796) -> StdResult<Vec<ColumnSpec<'static>>, ColumnSpecParseError> {
797 let result: StdResult<Vec<ColumnSpec<'static>>, ColumnSpecParseError> = deser_col_specs_generic(
798 buf,
799 global_table_spec,
800 col_count,
801 |name: &str, typ, table_spec: TableSpec| {
802 ColumnSpec::owned(name.to_owned(), typ, table_spec.into_owned())
803 },
804 deser_type_owned,
805 );
806
807 result
808}
809
810fn deser_result_metadata(
811 buf: &mut &[u8],
812) -> StdResult<(ResultMetadata<'static>, PagingStateResponse), ResultMetadataParseError> {
813 let flags = types::read_int(buf)
814 .map_err(|err| ResultMetadataParseError::FlagsParseError(err.into()))?;
815 let global_tables_spec = flags & 0x0001 != 0;
816 let has_more_pages = flags & 0x0002 != 0;
817 let no_metadata = flags & 0x0004 != 0;
818
819 let col_count =
820 types::read_int_length(buf).map_err(ResultMetadataParseError::ColumnCountParseError)?;
821
822 let raw_paging_state = has_more_pages
823 .then(|| types::read_bytes(buf).map_err(ResultMetadataParseError::PagingStateParseError))
824 .transpose()?;
825
826 let paging_state = PagingStateResponse::new_from_raw_bytes(raw_paging_state);
827
828 let col_specs = if no_metadata {
829 vec![]
830 } else {
831 let global_table_spec = global_tables_spec
832 .then(|| deser_table_spec(buf))
833 .transpose()?;
834
835 deser_col_specs_owned(buf, global_table_spec, col_count)?
836 };
837
838 let metadata = ResultMetadata {
839 col_count,
840 col_specs,
841 };
842 Ok((metadata, paging_state))
843}
844
845impl RawMetadataAndRawRows {
846 fn deserialize(
849 frame: &mut FrameSlice,
850 cached_metadata: Option<Arc<ResultMetadata<'static>>>,
851 ) -> StdResult<(Self, PagingStateResponse), RawRowsAndPagingStateResponseParseError> {
852 let flags = types::read_int(frame.as_slice_mut())
853 .map_err(|err| RawRowsAndPagingStateResponseParseError::FlagsParseError(err.into()))?;
854 let global_tables_spec = flags & 0x0001 != 0;
855 let has_more_pages = flags & 0x0002 != 0;
856 let no_metadata = flags & 0x0004 != 0;
857
858 let col_count = types::read_int_length(frame.as_slice_mut())
859 .map_err(RawRowsAndPagingStateResponseParseError::ColumnCountParseError)?;
860
861 let raw_paging_state = has_more_pages
862 .then(|| {
863 types::read_bytes(frame.as_slice_mut())
864 .map_err(RawRowsAndPagingStateResponseParseError::PagingStateParseError)
865 })
866 .transpose()?;
867
868 let paging_state = PagingStateResponse::new_from_raw_bytes(raw_paging_state);
869
870 let raw_rows = Self {
871 col_count,
872 global_tables_spec,
873 no_metadata,
874 raw_metadata_and_rows: frame.to_bytes(),
875 cached_metadata,
876 };
877
878 Ok((raw_rows, paging_state))
879 }
880}
881
882impl RawMetadataAndRawRows {
883 fn metadata_deserializer(
893 col_count: usize,
894 global_tables_spec: bool,
895 ) -> impl for<'frame> FnOnce(
896 &mut &'frame [u8],
897 ) -> StdResult<ResultMetadata<'frame>, ResultMetadataParseError> {
898 move |buf| {
899 let server_metadata = {
900 let global_table_spec = global_tables_spec
901 .then(|| deser_table_spec(buf))
902 .transpose()?;
903
904 let col_specs = deser_col_specs_borrowed(buf, global_table_spec, col_count)?;
905
906 ResultMetadata {
907 col_count,
908 col_specs,
909 }
910 };
911 Ok(server_metadata)
912 }
913 }
914
915 pub fn deserialize_metadata(
920 self,
921 ) -> StdResult<DeserializedMetadataAndRawRows, ResultMetadataAndRowsCountParseError> {
922 let (metadata_deserialized, row_count_and_raw_rows) = match self.cached_metadata {
923 Some(cached) if self.no_metadata => {
924 (
927 ResultMetadataHolder::SharedCached(cached),
928 self.raw_metadata_and_rows,
929 )
930 }
931 None if self.no_metadata => {
932 (
937 ResultMetadataHolder::mock_empty(),
938 self.raw_metadata_and_rows,
939 )
940 }
941 Some(_) | None => {
942 let (metadata_container, raw_rows_with_count) =
952 self_borrowed_metadata::SelfBorrowedMetadataContainer::make_deserialized_metadata(
953 self.raw_metadata_and_rows,
954 Self::metadata_deserializer(self.col_count, self.global_tables_spec),
955 )?;
956 (
957 ResultMetadataHolder::SelfBorrowed(metadata_container),
958 raw_rows_with_count,
959 )
960 }
961 };
962
963 let mut frame_slice = FrameSlice::new(&row_count_and_raw_rows);
964
965 let rows_count: usize = types::read_int_length(frame_slice.as_slice_mut())
966 .map_err(ResultMetadataAndRowsCountParseError::RowsCountParseError)?;
967
968 Ok(DeserializedMetadataAndRawRows {
969 metadata: metadata_deserialized,
970 rows_count,
971 raw_rows: frame_slice.to_bytes(),
972 })
973 }
974}
975
976fn deser_prepared_metadata(
977 buf: &mut &[u8],
978) -> StdResult<PreparedMetadata, PreparedMetadataParseError> {
979 let flags = types::read_int(buf)
980 .map_err(|err| PreparedMetadataParseError::FlagsParseError(err.into()))?;
981 let global_tables_spec = flags & 0x0001 != 0;
982
983 let col_count =
984 types::read_int_length(buf).map_err(PreparedMetadataParseError::ColumnCountParseError)?;
985
986 let pk_count: usize =
987 types::read_int_length(buf).map_err(PreparedMetadataParseError::PkCountParseError)?;
988
989 let mut pk_indexes = Vec::with_capacity(pk_count);
990 for i in 0..pk_count {
991 pk_indexes.push(PartitionKeyIndex {
992 index: types::read_short(buf)
993 .map_err(|err| PreparedMetadataParseError::PkIndexParseError(err.into()))?
994 as u16,
995 sequence: i as u16,
996 });
997 }
998 pk_indexes.sort_unstable_by_key(|pki| pki.index);
999
1000 let global_table_spec = global_tables_spec
1001 .then(|| deser_table_spec(buf))
1002 .transpose()?;
1003
1004 let col_specs = deser_col_specs_owned(buf, global_table_spec, col_count)?;
1005
1006 Ok(PreparedMetadata {
1007 flags,
1008 col_count,
1009 pk_indexes,
1010 col_specs,
1011 })
1012}
1013
1014fn deser_rows(
1015 buf_bytes: Bytes,
1016 cached_metadata: Option<&Arc<ResultMetadata<'static>>>,
1017) -> StdResult<(RawMetadataAndRawRows, PagingStateResponse), RawRowsAndPagingStateResponseParseError>
1018{
1019 let mut frame_slice = FrameSlice::new(&buf_bytes);
1020 RawMetadataAndRawRows::deserialize(&mut frame_slice, cached_metadata.cloned())
1021}
1022
1023fn deser_set_keyspace(buf: &mut &[u8]) -> StdResult<SetKeyspace, SetKeyspaceParseError> {
1024 let keyspace_name = types::read_string(buf)?.to_string();
1025
1026 Ok(SetKeyspace { keyspace_name })
1027}
1028
1029fn deser_prepared(buf: &mut &[u8]) -> StdResult<Prepared, PreparedParseError> {
1030 let id_len = types::read_short(buf)
1031 .map_err(|err| PreparedParseError::IdLengthParseError(err.into()))?
1032 as usize;
1033 let id: Bytes = buf[0..id_len].to_owned().into();
1034 buf.advance(id_len);
1035 let prepared_metadata =
1036 deser_prepared_metadata(buf).map_err(PreparedParseError::PreparedMetadataParseError)?;
1037 let (result_metadata, paging_state_response) =
1038 deser_result_metadata(buf).map_err(PreparedParseError::ResultMetadataParseError)?;
1039 if let PagingStateResponse::HasMorePages { state } = paging_state_response {
1040 return Err(PreparedParseError::NonZeroPagingState(
1041 state
1042 .as_bytes_slice()
1043 .cloned()
1044 .unwrap_or_else(|| Arc::from([])),
1045 ));
1046 }
1047
1048 Ok(Prepared {
1049 id,
1050 prepared_metadata,
1051 result_metadata,
1052 })
1053}
1054
1055fn deser_schema_change(buf: &mut &[u8]) -> StdResult<SchemaChange, SchemaChangeEventParseError> {
1056 Ok(SchemaChange {
1057 event: SchemaChangeEvent::deserialize(buf)?,
1058 })
1059}
1060
1061pub fn deserialize(
1062 buf_bytes: Bytes,
1063 cached_metadata: Option<&Arc<ResultMetadata<'static>>>,
1064) -> StdResult<Result, CqlResultParseError> {
1065 let buf = &mut &*buf_bytes;
1066 use self::Result::*;
1067 Ok(
1068 match types::read_int(buf)
1069 .map_err(|err| CqlResultParseError::ResultIdParseError(err.into()))?
1070 {
1071 0x0001 => Void,
1072 0x0002 => Rows(deser_rows(buf_bytes.slice_ref(buf), cached_metadata)?),
1073 0x0003 => SetKeyspace(deser_set_keyspace(buf)?),
1074 0x0004 => Prepared(deser_prepared(buf)?),
1075 0x0005 => SchemaChange(deser_schema_change(buf)?),
1076 id => return Err(CqlResultParseError::UnknownResultId(id)),
1077 },
1078 )
1079}
1080
1081#[doc(hidden)]
1086mod test_utils {
1087 use std::num::TryFromIntError;
1088
1089 use bytes::{BufMut, BytesMut};
1090
1091 use super::*;
1092
1093 impl TableSpec<'_> {
1094 pub(crate) fn serialize(&self, buf: &mut impl BufMut) -> StdResult<(), TryFromIntError> {
1095 types::write_string(&self.ks_name, buf)?;
1096 types::write_string(&self.table_name, buf)?;
1097
1098 Ok(())
1099 }
1100 }
1101
1102 impl ColumnType<'_> {
1103 fn id(&self) -> u16 {
1104 use NativeType::*;
1105 match self {
1106 Self::Native(Ascii) => 0x0001,
1107 Self::Native(BigInt) => 0x0002,
1108 Self::Native(Blob) => 0x0003,
1109 Self::Native(Boolean) => 0x0004,
1110 Self::Native(Counter) => 0x0005,
1111 Self::Native(Decimal) => 0x0006,
1112 Self::Native(Double) => 0x0007,
1113 Self::Native(Float) => 0x0008,
1114 Self::Native(Int) => 0x0009,
1115 Self::Native(Timestamp) => 0x000B,
1116 Self::Native(Uuid) => 0x000C,
1117 Self::Native(Text) => 0x000D,
1118 Self::Native(Varint) => 0x000E,
1119 Self::Native(Timeuuid) => 0x000F,
1120 Self::Native(Inet) => 0x0010,
1121 Self::Native(Date) => 0x0011,
1122 Self::Native(Time) => 0x0012,
1123 Self::Native(SmallInt) => 0x0013,
1124 Self::Native(TinyInt) => 0x0014,
1125 Self::Native(Duration) => 0x0015,
1126 Self::Collection {
1127 typ: CollectionType::List(_),
1128 ..
1129 } => 0x0020,
1130 Self::Collection {
1131 typ: CollectionType::Map(_, _),
1132 ..
1133 } => 0x0021,
1134 Self::Collection {
1135 typ: CollectionType::Set(_),
1136 ..
1137 } => 0x0022,
1138 Self::Vector { .. } => {
1139 unimplemented!();
1140 }
1141 Self::UserDefinedType { .. } => 0x0030,
1142 Self::Tuple(_) => 0x0031,
1143 }
1144 }
1145
1146 pub(crate) fn serialize(&self, buf: &mut impl BufMut) -> StdResult<(), TryFromIntError> {
1148 let id = self.id();
1149 types::write_short(id, buf);
1150
1151 match self {
1152 ColumnType::Native(_) => (),
1154
1155 ColumnType::Collection {
1156 typ: CollectionType::List(elem_type),
1157 ..
1158 }
1159 | ColumnType::Collection {
1160 typ: CollectionType::Set(elem_type),
1161 ..
1162 } => {
1163 elem_type.serialize(buf)?;
1164 }
1165 ColumnType::Collection {
1166 typ: CollectionType::Map(key_type, value_type),
1167 ..
1168 } => {
1169 key_type.serialize(buf)?;
1170 value_type.serialize(buf)?;
1171 }
1172 ColumnType::Tuple(types) => {
1173 types::write_short_length(types.len(), buf)?;
1174 for typ in types.iter() {
1175 typ.serialize(buf)?;
1176 }
1177 }
1178 ColumnType::Vector { .. } => {
1179 unimplemented!()
1180 }
1181 ColumnType::UserDefinedType {
1182 definition: udt, ..
1183 } => {
1184 types::write_string(&udt.keyspace, buf)?;
1185 types::write_string(&udt.name, buf)?;
1186 types::write_short_length(udt.field_types.len(), buf)?;
1187 for (field_name, field_type) in udt.field_types.iter() {
1188 types::write_string(field_name, buf)?;
1189 field_type.serialize(buf)?;
1190 }
1191 }
1192 }
1193
1194 Ok(())
1195 }
1196 }
1197
1198 impl<'a> ResultMetadata<'a> {
1199 #[inline]
1200 #[doc(hidden)]
1201 pub fn new_for_test(col_count: usize, col_specs: Vec<ColumnSpec<'a>>) -> Self {
1202 Self {
1203 col_count,
1204 col_specs,
1205 }
1206 }
1207
1208 pub(crate) fn serialize(
1209 &self,
1210 buf: &mut impl BufMut,
1211 no_metadata: bool,
1212 global_tables_spec: bool,
1213 ) -> StdResult<(), TryFromIntError> {
1214 let global_table_spec = global_tables_spec
1215 .then(|| self.col_specs.first().map(|col_spec| col_spec.table_spec()))
1216 .flatten();
1217
1218 let mut flags = 0;
1219 if global_table_spec.is_some() {
1220 flags |= 0x0001;
1221 }
1222 if no_metadata {
1223 flags |= 0x0004;
1224 }
1225 types::write_int(flags, buf);
1226
1227 types::write_int_length(self.col_count, buf)?;
1228
1229 if !no_metadata {
1232 if let Some(spec) = global_table_spec {
1233 spec.serialize(buf)?;
1234 }
1235
1236 for col_spec in self.col_specs() {
1237 if global_table_spec.is_none() {
1238 col_spec.table_spec().serialize(buf)?;
1239 }
1240
1241 types::write_string(col_spec.name(), buf)?;
1242 col_spec.typ().serialize(buf)?;
1243 }
1244 }
1245
1246 Ok(())
1247 }
1248 }
1249
1250 impl RawMetadataAndRawRows {
1251 #[doc(hidden)]
1252 #[inline]
1253 pub fn new_for_test(
1254 cached_metadata: Option<Arc<ResultMetadata<'static>>>,
1255 metadata: Option<ResultMetadata>,
1256 global_tables_spec: bool,
1257 rows_count: usize,
1258 raw_rows: &[u8],
1259 ) -> StdResult<Self, TryFromIntError> {
1260 let no_metadata = metadata.is_none();
1261 let empty_metadata = ResultMetadata::mock_empty();
1262 let used_metadata = metadata
1263 .as_ref()
1264 .or(cached_metadata.as_deref())
1265 .unwrap_or(&empty_metadata);
1266
1267 let raw_result_rows = {
1268 let mut buf = BytesMut::new();
1269 used_metadata.serialize(&mut buf, global_tables_spec, no_metadata)?;
1270 types::write_int_length(rows_count, &mut buf)?;
1271 buf.extend_from_slice(raw_rows);
1272
1273 buf.freeze()
1274 };
1275
1276 let (raw_rows, _paging_state_response) =
1277 Self::deserialize(&mut FrameSlice::new(&raw_result_rows), cached_metadata).expect(
1278 "Ill-formed serialized metadata for tests - likely bug in serialization code",
1279 );
1280
1281 Ok(raw_rows)
1282 }
1283 }
1284
1285 impl DeserializedMetadataAndRawRows {
1286 #[inline]
1287 #[cfg(test)]
1288 pub(crate) fn new_for_test(
1289 metadata: ResultMetadata<'static>,
1290 rows_count: usize,
1291 raw_rows: Bytes,
1292 ) -> Self {
1293 Self {
1294 metadata: ResultMetadataHolder::SharedCached(Arc::new(metadata)),
1295 rows_count,
1296 raw_rows,
1297 }
1298 }
1299 }
1300}