1use crate::deserialize::result::{RawRowIterator, TypedRowIterator};
4use crate::deserialize::row::DeserializeRow;
5use crate::deserialize::{FrameSlice, TypeCheckError};
6use crate::frame::frame_errors::CustomTypeParseError;
7use crate::frame::frame_errors::{
8 ColumnSpecParseError, ColumnSpecParseErrorKind, CqlResultParseError, CqlTypeParseError,
9 LowLevelDeserializationError, PreparedMetadataParseError, PreparedParseError,
10 RawRowsAndPagingStateResponseParseError, ResultMetadataAndRowsCountParseError,
11 ResultMetadataParseError, SchemaChangeEventParseError, SetKeyspaceParseError,
12 TableSpecParseError,
13};
14use crate::frame::request::query::PagingStateResponse;
15use crate::frame::response::event::SchemaChangeEvent;
16use crate::frame::types;
17use bytes::{Buf, Bytes};
18use std::borrow::Cow;
19use std::fmt::Debug;
20use std::sync::Arc;
21use std::{result::Result as StdResult, str};
22
23#[derive(Debug)]
25pub struct SetKeyspace {
26 pub keyspace_name: String,
29}
30
31#[derive(Debug)]
34pub struct Prepared {
35 pub id: Bytes,
37 pub prepared_metadata: PreparedMetadata,
40 pub result_metadata: ResultMetadata<'static>,
43}
44
45#[derive(Debug)]
49pub struct SchemaChange {
50 pub event: SchemaChangeEvent,
52}
53
54#[derive(Debug, Clone, PartialEq, Eq, Hash)]
58pub struct TableSpec<'a> {
59 ks_name: Cow<'a, str>,
60 table_name: Cow<'a, str>,
61}
62
63#[derive(Clone, Debug, PartialEq, Eq)]
73#[non_exhaustive]
74pub enum ColumnType<'frame> {
75 Native(NativeType),
77
78 Collection {
81 frozen: bool,
86 typ: CollectionType<'frame>,
88 },
89
90 Vector {
94 typ: Box<ColumnType<'frame>>,
96 dimensions: u16,
98 },
99
100 UserDefinedType {
102 frozen: bool,
108 definition: Arc<UserDefinedType<'frame>>,
110 },
111
112 Tuple(Vec<ColumnType<'frame>>),
115}
116
117#[derive(Clone, Debug, PartialEq, Eq)]
119#[non_exhaustive]
120pub enum NativeType {
121 Ascii,
123 Boolean,
125 Blob,
127 Counter,
129 Date,
131 Decimal,
133 Double,
135 Duration,
137 Float,
139 Int,
141 BigInt,
143 Text,
145 Timestamp,
147 Inet,
149 SmallInt,
151 TinyInt,
153 Time,
155 Timeuuid,
157 Uuid,
159 Varint,
161}
162
163impl NativeType {
164 pub(crate) fn type_size(&self) -> Option<usize> {
170 match self {
171 NativeType::Ascii => None,
172 NativeType::Boolean => Some(1),
173 NativeType::Blob => None,
174 NativeType::Counter => None,
175 NativeType::Date => None,
176 NativeType::Decimal => None,
177 NativeType::Double => Some(8),
178 NativeType::Duration => None,
179 NativeType::Float => Some(4),
180 NativeType::Int => Some(4),
181 NativeType::BigInt => Some(8),
182 NativeType::Text => None,
183 NativeType::Timestamp => Some(8),
184 NativeType::Inet => None,
185 NativeType::SmallInt => None,
186 NativeType::TinyInt => None,
187 NativeType::Time => None,
188 NativeType::Timeuuid => Some(16),
189 NativeType::Uuid => Some(16),
190 NativeType::Varint => None,
191 }
192 }
193}
194
195#[derive(Clone, Debug, PartialEq, Eq)]
200#[non_exhaustive]
201pub enum CollectionType<'frame> {
202 List(Box<ColumnType<'frame>>),
204 Map(Box<ColumnType<'frame>>, Box<ColumnType<'frame>>),
207 Set(Box<ColumnType<'frame>>),
209}
210
211#[derive(Clone, Debug, PartialEq, Eq)]
214pub struct UserDefinedType<'frame> {
215 pub name: Cow<'frame, str>,
217 pub keyspace: Cow<'frame, str>,
219 pub field_types: Vec<(Cow<'frame, str>, ColumnType<'frame>)>,
221}
222
223impl ColumnType<'_> {
224 pub fn into_owned(self) -> ColumnType<'static> {
229 match self {
230 ColumnType::Native(b) => ColumnType::Native(b),
231 ColumnType::Collection { frozen, typ: t } => ColumnType::Collection {
232 frozen,
233 typ: t.into_owned(),
234 },
235 ColumnType::Vector {
236 typ: type_,
237 dimensions,
238 } => ColumnType::Vector {
239 typ: Box::new(type_.into_owned()),
240 dimensions,
241 },
242 ColumnType::UserDefinedType {
243 frozen,
244 definition: udt,
245 } => {
246 let udt = Arc::try_unwrap(udt).unwrap_or_else(|e| e.as_ref().clone());
247 ColumnType::UserDefinedType {
248 frozen,
249 definition: Arc::new(UserDefinedType {
250 name: udt.name.into_owned().into(),
251 keyspace: udt.keyspace.into_owned().into(),
252 field_types: udt
253 .field_types
254 .into_iter()
255 .map(|(cow, column_type)| {
256 (cow.into_owned().into(), column_type.into_owned())
257 })
258 .collect(),
259 }),
260 }
261 }
262 ColumnType::Tuple(vec) => {
263 ColumnType::Tuple(vec.into_iter().map(ColumnType::into_owned).collect())
264 }
265 }
266 }
267}
268
269impl CollectionType<'_> {
270 fn into_owned(self) -> CollectionType<'static> {
275 match self {
276 CollectionType::List(elem_type) => {
277 CollectionType::List(Box::new(elem_type.into_owned()))
278 }
279 CollectionType::Map(key, value) => {
280 CollectionType::Map(Box::new(key.into_owned()), Box::new(value.into_owned()))
281 }
282 CollectionType::Set(elem_type) => CollectionType::Set(Box::new(elem_type.into_owned())),
283 }
284 }
285}
286
287impl<'a> TableSpec<'a> {
288 pub const fn borrowed(ks: &'a str, table: &'a str) -> Self {
290 Self {
291 ks_name: Cow::Borrowed(ks),
292 table_name: Cow::Borrowed(table),
293 }
294 }
295
296 pub fn ks_name(&'a self) -> &'a str {
298 self.ks_name.as_ref()
299 }
300
301 pub fn table_name(&'a self) -> &'a str {
303 self.table_name.as_ref()
304 }
305
306 pub fn into_owned(self) -> TableSpec<'static> {
310 TableSpec::owned(self.ks_name.into_owned(), self.table_name.into_owned())
311 }
312
313 pub fn to_owned(&self) -> TableSpec<'static> {
317 TableSpec::owned(self.ks_name().to_owned(), self.table_name().to_owned())
318 }
319}
320
321impl TableSpec<'static> {
322 pub fn owned(ks_name: String, table_name: String) -> Self {
324 Self {
325 ks_name: Cow::Owned(ks_name),
326 table_name: Cow::Owned(table_name),
327 }
328 }
329}
330
331impl ColumnType<'_> {
332 pub(crate) fn supports_special_empty_value(&self) -> bool {
341 #[expect(clippy::match_like_matches_macro)]
342 match self {
343 ColumnType::Native(NativeType::Counter)
344 | ColumnType::Native(NativeType::Duration)
345 | ColumnType::Collection { .. }
346 | ColumnType::UserDefinedType { .. } => false,
347
348 _ => true,
349 }
350 }
351}
352
353#[derive(Debug, Clone, PartialEq, Eq)]
357pub struct ColumnSpec<'frame> {
358 pub(crate) table_spec: TableSpec<'frame>,
359 pub(crate) name: Cow<'frame, str>,
360 pub(crate) typ: ColumnType<'frame>,
361}
362
363impl ColumnSpec<'static> {
364 #[inline]
366 pub fn owned(name: String, typ: ColumnType<'static>, table_spec: TableSpec<'static>) -> Self {
367 Self {
368 table_spec,
369 name: Cow::Owned(name),
370 typ,
371 }
372 }
373}
374
375impl<'frame> ColumnSpec<'frame> {
376 #[inline]
378 pub const fn borrowed(
379 name: &'frame str,
380 typ: ColumnType<'frame>,
381 table_spec: TableSpec<'frame>,
382 ) -> Self {
383 Self {
384 table_spec,
385 name: Cow::Borrowed(name),
386 typ,
387 }
388 }
389
390 #[inline]
392 pub fn table_spec(&self) -> &TableSpec<'frame> {
393 &self.table_spec
394 }
395
396 #[inline]
398 pub fn name(&self) -> &str {
399 &self.name
400 }
401
402 #[inline]
404 pub fn typ(&self) -> &ColumnType<'frame> {
405 &self.typ
406 }
407}
408
409#[derive(Debug, Clone)]
413pub struct ResultMetadata<'a> {
414 col_count: usize,
415 col_specs: Vec<ColumnSpec<'a>>,
416}
417
418impl<'a> ResultMetadata<'a> {
419 #[inline]
421 pub fn col_count(&self) -> usize {
422 self.col_count
423 }
424
425 #[inline]
427 pub fn col_specs(&self) -> &[ColumnSpec<'a>] {
428 &self.col_specs
429 }
430
431 #[inline]
436 pub fn mock_empty() -> Self {
437 Self {
438 col_count: 0,
439 col_specs: Vec::new(),
440 }
441 }
442}
443
444#[derive(Debug)]
449pub enum ResultMetadataHolder {
450 SelfBorrowed(SelfBorrowedMetadataContainer),
452 SharedCached(Arc<ResultMetadata<'static>>),
454}
455
456impl ResultMetadataHolder {
457 #[inline]
463 pub fn inner(&self) -> &ResultMetadata<'_> {
464 match self {
465 ResultMetadataHolder::SelfBorrowed(c) => c.metadata(),
466 ResultMetadataHolder::SharedCached(s) => s,
467 }
468 }
469
470 #[inline]
472 pub fn mock_empty() -> Self {
473 Self::SelfBorrowed(SelfBorrowedMetadataContainer::mock_empty())
474 }
475}
476
477#[derive(Debug, Copy, Clone)]
485pub struct PartitionKeyIndex {
486 pub index: u16,
488 pub sequence: u16,
490}
491
492#[derive(Debug, Clone)]
494pub struct PreparedMetadata {
495 pub flags: i32,
497 pub col_count: usize,
499 pub pk_indexes: Vec<PartitionKeyIndex>,
502 pub col_specs: Vec<ColumnSpec<'static>>,
504}
505
506#[derive(Debug, Clone)]
511pub struct RawMetadataAndRawRows {
512 col_count: usize,
514 global_tables_spec: bool,
515 no_metadata: bool,
516
517 raw_metadata_and_rows: Bytes,
519
520 cached_metadata: Option<Arc<ResultMetadata<'static>>>,
522}
523
524impl RawMetadataAndRawRows {
525 #[inline]
529 pub fn mock_empty() -> Self {
530 static EMPTY_METADATA_ZERO_ROWS: &[u8] = &0_i32.to_be_bytes();
533 let raw_metadata_and_rows = Bytes::from_static(EMPTY_METADATA_ZERO_ROWS);
534
535 Self {
536 col_count: 0,
537 global_tables_spec: false,
538 no_metadata: false,
539 raw_metadata_and_rows,
540 cached_metadata: None,
541 }
542 }
543
544 #[inline]
546 pub fn metadata_and_rows_bytes_size(&self) -> usize {
547 self.raw_metadata_and_rows.len()
548 }
549}
550
551mod self_borrowed_metadata {
552 use std::ops::Deref;
553
554 use bytes::Bytes;
555 use yoke::{Yoke, Yokeable};
556
557 use super::ResultMetadata;
558
559 #[derive(Debug, Clone)]
563 struct BytesWrapper {
564 inner: Bytes,
565 }
566
567 impl Deref for BytesWrapper {
568 type Target = [u8];
569
570 fn deref(&self) -> &Self::Target {
571 &self.inner
572 }
573 }
574
575 unsafe impl stable_deref_trait::StableDeref for BytesWrapper {}
579
580 unsafe impl yoke::CloneableCart for BytesWrapper {}
587
588 #[derive(Debug, Clone, Yokeable)]
591 struct ResultMetadataWrapper<'frame>(ResultMetadata<'frame>);
592
593 #[derive(Debug, Clone)]
600 pub struct SelfBorrowedMetadataContainer {
601 metadata_and_raw_rows: Yoke<ResultMetadataWrapper<'static>, BytesWrapper>,
602 }
603
604 impl SelfBorrowedMetadataContainer {
605 pub fn mock_empty() -> Self {
607 Self {
608 metadata_and_raw_rows: Yoke::attach_to_cart(
609 BytesWrapper {
610 inner: Bytes::new(),
611 },
612 |_| ResultMetadataWrapper(ResultMetadata::mock_empty()),
613 ),
614 }
615 }
616
617 pub fn metadata(&self) -> &ResultMetadata<'_> {
619 &self.metadata_and_raw_rows.get().0
620 }
621
622 pub(super) fn make_deserialized_metadata<F, ErrorT>(
625 frame: Bytes,
626 deserializer: F,
627 ) -> Result<(Self, Bytes), ErrorT>
628 where
629 F: for<'frame> FnOnce(&mut &'frame [u8]) -> Result<ResultMetadata<'frame>, ErrorT>,
631 {
632 let deserialized_metadata_and_raw_rows: Yoke<
633 (ResultMetadataWrapper<'static>, &'static [u8]),
634 BytesWrapper,
635 > = Yoke::try_attach_to_cart(BytesWrapper { inner: frame }, |mut slice| {
636 let metadata = deserializer(&mut slice)?;
637 let row_count_and_raw_rows = slice;
638 Ok((ResultMetadataWrapper(metadata), row_count_and_raw_rows))
639 })?;
640
641 let (_metadata, raw_rows) = deserialized_metadata_and_raw_rows.get();
642 let raw_rows_with_count = deserialized_metadata_and_raw_rows
643 .backing_cart()
644 .inner
645 .slice_ref(raw_rows);
646
647 Ok((
648 Self {
649 metadata_and_raw_rows: deserialized_metadata_and_raw_rows
650 .map_project(|(metadata, _), _| metadata),
651 },
652 raw_rows_with_count,
653 ))
654 }
655 }
656}
657pub use self_borrowed_metadata::SelfBorrowedMetadataContainer;
658
659use super::custom_type_parser::CustomTypeParser;
660
661#[derive(Debug)]
665pub struct DeserializedMetadataAndRawRows {
666 metadata: ResultMetadataHolder,
667 rows_count: usize,
668 raw_rows: Bytes,
669}
670
671impl DeserializedMetadataAndRawRows {
672 #[inline]
675 pub fn metadata(&self) -> &ResultMetadata<'_> {
676 self.metadata.inner()
677 }
678
679 #[inline]
682 pub fn into_metadata(self) -> ResultMetadataHolder {
683 self.metadata
684 }
685
686 #[inline]
688 pub fn rows_count(&self) -> usize {
689 self.rows_count
690 }
691
692 #[inline]
694 pub fn rows_bytes_size(&self) -> usize {
695 self.raw_rows.len()
696 }
697
698 #[inline]
702 pub fn mock_empty() -> Self {
703 Self {
704 metadata: ResultMetadataHolder::SelfBorrowed(
705 SelfBorrowedMetadataContainer::mock_empty(),
706 ),
707 rows_count: 0,
708 raw_rows: Bytes::new(),
709 }
710 }
711
712 pub(crate) fn into_inner(self) -> (ResultMetadataHolder, usize, Bytes) {
713 (self.metadata, self.rows_count, self.raw_rows)
714 }
715
716 #[inline]
721 pub fn rows_iter<'frame, 'metadata, R: DeserializeRow<'frame, 'metadata>>(
722 &'frame self,
723 ) -> StdResult<TypedRowIterator<'frame, 'metadata, R>, TypeCheckError>
724 where
725 'frame: 'metadata,
726 {
727 let frame_slice = FrameSlice::new(&self.raw_rows);
728 let raw = RawRowIterator::new(
729 self.rows_count,
730 self.metadata.inner().col_specs(),
731 frame_slice,
732 );
733 TypedRowIterator::new(raw)
734 }
735}
736
737#[derive(Debug)]
739pub enum Result {
740 Void,
742 Rows((RawMetadataAndRawRows, PagingStateResponse)),
744 SetKeyspace(SetKeyspace),
747 Prepared(Prepared),
750 SchemaChange(SchemaChange),
753}
754
755fn deser_type_generic<'frame, 'result, StrT: Into<Cow<'result, str>>>(
756 buf: &mut &'frame [u8],
757 read_string: fn(&mut &'frame [u8]) -> StdResult<StrT, LowLevelDeserializationError>,
758 read_custom_type: fn(&'frame str) -> StdResult<ColumnType<'result>, CustomTypeParseError>,
759) -> StdResult<ColumnType<'result>, CqlTypeParseError> {
760 use ColumnType::*;
761 use NativeType::*;
762 let id =
763 types::read_short(buf).map_err(|err| CqlTypeParseError::TypeIdParseError(err.into()))?;
764 Ok(match id {
765 0x0000 => {
766 let type_str: &'frame str =
767 types::read_string(buf).map_err(CqlTypeParseError::CustomTypeNameParseError)?;
768 read_custom_type(type_str).map_err(CqlTypeParseError::CustomTypeParseError)?
769 }
770 0x0001 => Native(Ascii),
771 0x0002 => Native(BigInt),
772 0x0003 => Native(Blob),
773 0x0004 => Native(Boolean),
774 0x0005 => Native(Counter),
775 0x0006 => Native(Decimal),
776 0x0007 => Native(Double),
777 0x0008 => Native(Float),
778 0x0009 => Native(Int),
779 0x000B => Native(Timestamp),
780 0x000C => Native(Uuid),
781 0x000D => Native(Text),
782 0x000E => Native(Varint),
783 0x000F => Native(Timeuuid),
784 0x0010 => Native(Inet),
785 0x0011 => Native(Date),
786 0x0012 => Native(Time),
787 0x0013 => Native(SmallInt),
788 0x0014 => Native(TinyInt),
789 0x0015 => Native(Duration),
790 0x0020 => Collection {
791 frozen: false,
792 typ: CollectionType::List(Box::new(deser_type_generic(
793 buf,
794 read_string,
795 read_custom_type,
796 )?)),
797 },
798 0x0021 => Collection {
799 frozen: false,
800 typ: CollectionType::Map(
801 Box::new(deser_type_generic(buf, read_string, read_custom_type)?),
802 Box::new(deser_type_generic(buf, read_string, read_custom_type)?),
803 ),
804 },
805 0x0022 => Collection {
806 frozen: false,
807 typ: CollectionType::Set(Box::new(deser_type_generic(
808 buf,
809 read_string,
810 read_custom_type,
811 )?)),
812 },
813 0x0030 => {
814 let keyspace_name =
815 read_string(buf).map_err(CqlTypeParseError::UdtKeyspaceNameParseError)?;
816 let type_name = read_string(buf).map_err(CqlTypeParseError::UdtNameParseError)?;
817 let fields_size: usize = types::read_short(buf)
818 .map_err(|err| CqlTypeParseError::UdtFieldsCountParseError(err.into()))?
819 .into();
820
821 let mut field_types: Vec<(Cow<'result, str>, ColumnType)> =
822 Vec::with_capacity(fields_size);
823
824 for _ in 0..fields_size {
825 let field_name =
826 read_string(buf).map_err(CqlTypeParseError::UdtFieldNameParseError)?;
827 let field_type = deser_type_generic(buf, read_string, read_custom_type)?;
828
829 field_types.push((field_name.into(), field_type));
830 }
831
832 UserDefinedType {
833 frozen: false,
834 definition: Arc::new(self::UserDefinedType {
835 name: type_name.into(),
836 keyspace: keyspace_name.into(),
837 field_types,
838 }),
839 }
840 }
841 0x0031 => {
842 let len: usize = types::read_short(buf)
843 .map_err(|err| CqlTypeParseError::TupleLengthParseError(err.into()))?
844 .into();
845 let mut types = Vec::with_capacity(len);
846 for _ in 0..len {
847 types.push(deser_type_generic(buf, read_string, read_custom_type)?);
848 }
849 Tuple(types)
850 }
851 id => {
852 return Err(CqlTypeParseError::TypeNotImplemented(id));
853 }
854 })
855}
856
857fn deser_type_borrowed<'frame>(
858 buf: &mut &'frame [u8],
859) -> StdResult<ColumnType<'frame>, CqlTypeParseError> {
860 deser_type_generic(buf, |buf| types::read_string(buf), CustomTypeParser::parse)
861}
862
863fn deser_type_owned(buf: &mut &[u8]) -> StdResult<ColumnType<'static>, CqlTypeParseError> {
864 deser_type_generic(
865 buf,
866 |buf| types::read_string(buf).map(ToOwned::to_owned),
867 |type_str| CustomTypeParser::parse(type_str).map(|t| t.into_owned()),
868 )
869}
870
871fn deser_table_spec<'frame>(
877 buf: &mut &'frame [u8],
878) -> StdResult<TableSpec<'frame>, TableSpecParseError> {
879 let ks_name = types::read_string(buf).map_err(TableSpecParseError::MalformedKeyspaceName)?;
880 let table_name = types::read_string(buf).map_err(TableSpecParseError::MalformedTableName)?;
881 Ok(TableSpec::borrowed(ks_name, table_name))
882}
883
884fn mk_col_spec_parse_error(
885 col_idx: usize,
886 err: impl Into<ColumnSpecParseErrorKind>,
887) -> ColumnSpecParseError {
888 ColumnSpecParseError {
889 column_index: col_idx,
890 kind: err.into(),
891 }
892}
893
894fn deser_col_specs_generic<'frame, 'result>(
895 buf: &mut &'frame [u8],
896 global_table_spec: Option<TableSpec<'frame>>,
897 col_count: usize,
898 make_col_spec: fn(&'frame str, ColumnType<'result>, TableSpec<'frame>) -> ColumnSpec<'result>,
899 deser_type: fn(&mut &'frame [u8]) -> StdResult<ColumnType<'result>, CqlTypeParseError>,
900) -> StdResult<Vec<ColumnSpec<'result>>, ColumnSpecParseError> {
901 let mut col_specs = Vec::with_capacity(col_count);
902 for col_idx in 0..col_count {
903 let table_spec = match global_table_spec {
904 Some(ref known_spec) => known_spec.clone(),
906
907 None => deser_table_spec(buf).map_err(|err| mk_col_spec_parse_error(col_idx, err))?,
909 };
910
911 let name = types::read_string(buf).map_err(|err| mk_col_spec_parse_error(col_idx, err))?;
912 let typ = deser_type(buf).map_err(|err| mk_col_spec_parse_error(col_idx, err))?;
913 let col_spec = make_col_spec(name, typ, table_spec);
914 col_specs.push(col_spec);
915 }
916 Ok(col_specs)
917}
918
919fn deser_col_specs_borrowed<'frame>(
925 buf: &mut &'frame [u8],
926 global_table_spec: Option<TableSpec<'frame>>,
927 col_count: usize,
928) -> StdResult<Vec<ColumnSpec<'frame>>, ColumnSpecParseError> {
929 deser_col_specs_generic(
930 buf,
931 global_table_spec,
932 col_count,
933 ColumnSpec::borrowed,
934 deser_type_borrowed,
935 )
936}
937
938fn deser_col_specs_owned<'frame>(
944 buf: &mut &'frame [u8],
945 global_table_spec: Option<TableSpec<'frame>>,
946 col_count: usize,
947) -> StdResult<Vec<ColumnSpec<'static>>, ColumnSpecParseError> {
948 let result: StdResult<Vec<ColumnSpec<'static>>, ColumnSpecParseError> = deser_col_specs_generic(
949 buf,
950 global_table_spec,
951 col_count,
952 |name: &str, typ, table_spec: TableSpec| {
953 ColumnSpec::owned(name.to_owned(), typ, table_spec.into_owned())
954 },
955 deser_type_owned,
956 );
957
958 result
959}
960
961fn deser_result_metadata(
962 buf: &mut &[u8],
963) -> StdResult<(ResultMetadata<'static>, PagingStateResponse), ResultMetadataParseError> {
964 let flags = types::read_int(buf)
965 .map_err(|err| ResultMetadataParseError::FlagsParseError(err.into()))?;
966 let global_tables_spec = flags & 0x0001 != 0;
967 let has_more_pages = flags & 0x0002 != 0;
968 let no_metadata = flags & 0x0004 != 0;
969
970 let col_count =
971 types::read_int_length(buf).map_err(ResultMetadataParseError::ColumnCountParseError)?;
972
973 let raw_paging_state = has_more_pages
974 .then(|| types::read_bytes(buf).map_err(ResultMetadataParseError::PagingStateParseError))
975 .transpose()?;
976
977 let paging_state = PagingStateResponse::new_from_raw_bytes(raw_paging_state);
978
979 let col_specs = if no_metadata {
980 vec![]
981 } else {
982 let global_table_spec = global_tables_spec
983 .then(|| deser_table_spec(buf))
984 .transpose()?;
985
986 deser_col_specs_owned(buf, global_table_spec, col_count)?
987 };
988
989 let metadata = ResultMetadata {
990 col_count,
991 col_specs,
992 };
993 Ok((metadata, paging_state))
994}
995
996impl RawMetadataAndRawRows {
997 fn deserialize(
1000 frame: &mut FrameSlice,
1001 cached_metadata: Option<Arc<ResultMetadata<'static>>>,
1002 ) -> StdResult<(Self, PagingStateResponse), RawRowsAndPagingStateResponseParseError> {
1003 let flags = types::read_int(frame.as_slice_mut())
1004 .map_err(|err| RawRowsAndPagingStateResponseParseError::FlagsParseError(err.into()))?;
1005 let global_tables_spec = flags & 0x0001 != 0;
1006 let has_more_pages = flags & 0x0002 != 0;
1007 let no_metadata = flags & 0x0004 != 0;
1008
1009 let col_count = types::read_int_length(frame.as_slice_mut())
1010 .map_err(RawRowsAndPagingStateResponseParseError::ColumnCountParseError)?;
1011
1012 let raw_paging_state = has_more_pages
1013 .then(|| {
1014 types::read_bytes(frame.as_slice_mut())
1015 .map_err(RawRowsAndPagingStateResponseParseError::PagingStateParseError)
1016 })
1017 .transpose()?;
1018
1019 let paging_state = PagingStateResponse::new_from_raw_bytes(raw_paging_state);
1020
1021 let raw_rows = Self {
1022 col_count,
1023 global_tables_spec,
1024 no_metadata,
1025 raw_metadata_and_rows: frame.to_bytes(),
1026 cached_metadata,
1027 };
1028
1029 Ok((raw_rows, paging_state))
1030 }
1031}
1032
1033impl RawMetadataAndRawRows {
1034 fn metadata_deserializer(
1044 col_count: usize,
1045 global_tables_spec: bool,
1046 ) -> impl for<'frame> FnOnce(
1047 &mut &'frame [u8],
1048 ) -> StdResult<ResultMetadata<'frame>, ResultMetadataParseError> {
1049 move |buf| {
1050 let server_metadata = {
1051 let global_table_spec = global_tables_spec
1052 .then(|| deser_table_spec(buf))
1053 .transpose()?;
1054
1055 let col_specs = deser_col_specs_borrowed(buf, global_table_spec, col_count)?;
1056
1057 ResultMetadata {
1058 col_count,
1059 col_specs,
1060 }
1061 };
1062 Ok(server_metadata)
1063 }
1064 }
1065
1066 pub fn deserialize_metadata(
1071 self,
1072 ) -> StdResult<DeserializedMetadataAndRawRows, ResultMetadataAndRowsCountParseError> {
1073 let (metadata_deserialized, row_count_and_raw_rows) = match self.cached_metadata {
1074 Some(cached) if self.no_metadata => {
1075 (
1078 ResultMetadataHolder::SharedCached(cached),
1079 self.raw_metadata_and_rows,
1080 )
1081 }
1082 None if self.no_metadata => {
1083 (
1088 ResultMetadataHolder::mock_empty(),
1089 self.raw_metadata_and_rows,
1090 )
1091 }
1092 Some(_) | None => {
1093 let (metadata_container, raw_rows_with_count) =
1103 self_borrowed_metadata::SelfBorrowedMetadataContainer::make_deserialized_metadata(
1104 self.raw_metadata_and_rows,
1105 Self::metadata_deserializer(self.col_count, self.global_tables_spec),
1106 )?;
1107 (
1108 ResultMetadataHolder::SelfBorrowed(metadata_container),
1109 raw_rows_with_count,
1110 )
1111 }
1112 };
1113
1114 let mut frame_slice = FrameSlice::new(&row_count_and_raw_rows);
1115
1116 let rows_count: usize = types::read_int_length(frame_slice.as_slice_mut())
1117 .map_err(ResultMetadataAndRowsCountParseError::RowsCountParseError)?;
1118
1119 Ok(DeserializedMetadataAndRawRows {
1120 metadata: metadata_deserialized,
1121 rows_count,
1122 raw_rows: frame_slice.to_bytes(),
1123 })
1124 }
1125}
1126
1127fn deser_prepared_metadata(
1128 buf: &mut &[u8],
1129) -> StdResult<PreparedMetadata, PreparedMetadataParseError> {
1130 let flags = types::read_int(buf)
1131 .map_err(|err| PreparedMetadataParseError::FlagsParseError(err.into()))?;
1132 let global_tables_spec = flags & 0x0001 != 0;
1133
1134 let col_count =
1135 types::read_int_length(buf).map_err(PreparedMetadataParseError::ColumnCountParseError)?;
1136
1137 let pk_count: usize =
1138 types::read_int_length(buf).map_err(PreparedMetadataParseError::PkCountParseError)?;
1139
1140 let mut pk_indexes = Vec::with_capacity(pk_count);
1141 for i in 0..pk_count {
1142 pk_indexes.push(PartitionKeyIndex {
1143 index: types::read_short(buf)
1144 .map_err(|err| PreparedMetadataParseError::PkIndexParseError(err.into()))?
1145 as u16,
1146 sequence: i as u16,
1147 });
1148 }
1149 pk_indexes.sort_unstable_by_key(|pki| pki.index);
1150
1151 let global_table_spec = global_tables_spec
1152 .then(|| deser_table_spec(buf))
1153 .transpose()?;
1154
1155 let col_specs = deser_col_specs_owned(buf, global_table_spec, col_count)?;
1156
1157 Ok(PreparedMetadata {
1158 flags,
1159 col_count,
1160 pk_indexes,
1161 col_specs,
1162 })
1163}
1164
1165fn deser_rows(
1166 buf_bytes: Bytes,
1167 cached_metadata: Option<&Arc<ResultMetadata<'static>>>,
1168) -> StdResult<(RawMetadataAndRawRows, PagingStateResponse), RawRowsAndPagingStateResponseParseError>
1169{
1170 let mut frame_slice = FrameSlice::new(&buf_bytes);
1171 RawMetadataAndRawRows::deserialize(&mut frame_slice, cached_metadata.cloned())
1172}
1173
1174fn deser_set_keyspace(buf: &mut &[u8]) -> StdResult<SetKeyspace, SetKeyspaceParseError> {
1175 let keyspace_name = types::read_string(buf)?.to_string();
1176
1177 Ok(SetKeyspace { keyspace_name })
1178}
1179
1180fn deser_prepared(buf: &mut &[u8]) -> StdResult<Prepared, PreparedParseError> {
1181 let id_len = types::read_short(buf)
1182 .map_err(|err| PreparedParseError::IdLengthParseError(err.into()))?
1183 as usize;
1184 let id: Bytes = buf[0..id_len].to_owned().into();
1185 buf.advance(id_len);
1186 let prepared_metadata =
1187 deser_prepared_metadata(buf).map_err(PreparedParseError::PreparedMetadataParseError)?;
1188 let (result_metadata, paging_state_response) =
1189 deser_result_metadata(buf).map_err(PreparedParseError::ResultMetadataParseError)?;
1190 if let PagingStateResponse::HasMorePages { state } = paging_state_response {
1191 return Err(PreparedParseError::NonZeroPagingState(
1192 state
1193 .as_bytes_slice()
1194 .cloned()
1195 .unwrap_or_else(|| Arc::from([])),
1196 ));
1197 }
1198
1199 Ok(Prepared {
1200 id,
1201 prepared_metadata,
1202 result_metadata,
1203 })
1204}
1205
1206fn deser_schema_change(buf: &mut &[u8]) -> StdResult<SchemaChange, SchemaChangeEventParseError> {
1207 Ok(SchemaChange {
1208 event: SchemaChangeEvent::deserialize(buf)?,
1209 })
1210}
1211
1212pub fn deserialize(
1216 buf_bytes: Bytes,
1217 cached_metadata: Option<&Arc<ResultMetadata<'static>>>,
1218) -> StdResult<Result, CqlResultParseError> {
1219 let buf = &mut &*buf_bytes;
1220 use self::Result::*;
1221 Ok(
1222 match types::read_int(buf)
1223 .map_err(|err| CqlResultParseError::ResultIdParseError(err.into()))?
1224 {
1225 0x0001 => Void,
1226 0x0002 => Rows(deser_rows(buf_bytes.slice_ref(buf), cached_metadata)?),
1227 0x0003 => SetKeyspace(deser_set_keyspace(buf)?),
1228 0x0004 => Prepared(deser_prepared(buf)?),
1229 0x0005 => SchemaChange(deser_schema_change(buf)?),
1230 id => return Err(CqlResultParseError::UnknownResultId(id)),
1231 },
1232 )
1233}
1234
1235#[doc(hidden)]
1240mod test_utils {
1241 use std::num::TryFromIntError;
1242
1243 use bytes::{BufMut, BytesMut};
1244
1245 use super::*;
1246
1247 impl TableSpec<'_> {
1248 pub(crate) fn serialize(&self, buf: &mut impl BufMut) -> StdResult<(), TryFromIntError> {
1249 types::write_string(&self.ks_name, buf)?;
1250 types::write_string(&self.table_name, buf)?;
1251
1252 Ok(())
1253 }
1254 }
1255
1256 impl ColumnType<'_> {
1257 fn id(&self) -> u16 {
1258 use NativeType::*;
1259 match self {
1260 Self::Native(Ascii) => 0x0001,
1261 Self::Native(BigInt) => 0x0002,
1262 Self::Native(Blob) => 0x0003,
1263 Self::Native(Boolean) => 0x0004,
1264 Self::Native(Counter) => 0x0005,
1265 Self::Native(Decimal) => 0x0006,
1266 Self::Native(Double) => 0x0007,
1267 Self::Native(Float) => 0x0008,
1268 Self::Native(Int) => 0x0009,
1269 Self::Native(Timestamp) => 0x000B,
1270 Self::Native(Uuid) => 0x000C,
1271 Self::Native(Text) => 0x000D,
1272 Self::Native(Varint) => 0x000E,
1273 Self::Native(Timeuuid) => 0x000F,
1274 Self::Native(Inet) => 0x0010,
1275 Self::Native(Date) => 0x0011,
1276 Self::Native(Time) => 0x0012,
1277 Self::Native(SmallInt) => 0x0013,
1278 Self::Native(TinyInt) => 0x0014,
1279 Self::Native(Duration) => 0x0015,
1280 Self::Collection {
1281 typ: CollectionType::List(_),
1282 ..
1283 } => 0x0020,
1284 Self::Collection {
1285 typ: CollectionType::Map(_, _),
1286 ..
1287 } => 0x0021,
1288 Self::Collection {
1289 typ: CollectionType::Set(_),
1290 ..
1291 } => 0x0022,
1292 Self::Vector { .. } => {
1293 unimplemented!();
1294 }
1295 Self::UserDefinedType { .. } => 0x0030,
1296 Self::Tuple(_) => 0x0031,
1297 }
1298 }
1299
1300 pub(crate) fn type_size(&self) -> Option<usize> {
1302 match self {
1303 ColumnType::Native(n) => n.type_size(),
1304 ColumnType::Tuple(_) => None,
1305 ColumnType::Collection { .. } => None,
1306 ColumnType::Vector { typ, dimensions } => {
1307 typ.type_size().map(|size| size * usize::from(*dimensions))
1308 }
1309 ColumnType::UserDefinedType { .. } => None,
1310 }
1311 }
1312
1313 pub(crate) fn serialize(&self, buf: &mut impl BufMut) -> StdResult<(), TryFromIntError> {
1315 let id = self.id();
1316 types::write_short(id, buf);
1317
1318 match self {
1319 ColumnType::Native(_) => (),
1321
1322 ColumnType::Collection {
1323 typ: CollectionType::List(elem_type),
1324 ..
1325 }
1326 | ColumnType::Collection {
1327 typ: CollectionType::Set(elem_type),
1328 ..
1329 } => {
1330 elem_type.serialize(buf)?;
1331 }
1332 ColumnType::Collection {
1333 typ: CollectionType::Map(key_type, value_type),
1334 ..
1335 } => {
1336 key_type.serialize(buf)?;
1337 value_type.serialize(buf)?;
1338 }
1339 ColumnType::Tuple(types) => {
1340 types::write_short_length(types.len(), buf)?;
1341 for typ in types.iter() {
1342 typ.serialize(buf)?;
1343 }
1344 }
1345 ColumnType::Vector { .. } => {
1346 unimplemented!()
1347 }
1348 ColumnType::UserDefinedType {
1349 definition: udt, ..
1350 } => {
1351 types::write_string(&udt.keyspace, buf)?;
1352 types::write_string(&udt.name, buf)?;
1353 types::write_short_length(udt.field_types.len(), buf)?;
1354 for (field_name, field_type) in udt.field_types.iter() {
1355 types::write_string(field_name, buf)?;
1356 field_type.serialize(buf)?;
1357 }
1358 }
1359 }
1360
1361 Ok(())
1362 }
1363 }
1364
1365 impl<'a> ResultMetadata<'a> {
1366 #[inline]
1367 #[doc(hidden)]
1368 pub fn new_for_test(col_count: usize, col_specs: Vec<ColumnSpec<'a>>) -> Self {
1369 Self {
1370 col_count,
1371 col_specs,
1372 }
1373 }
1374
1375 pub(crate) fn serialize(
1376 &self,
1377 buf: &mut impl BufMut,
1378 no_metadata: bool,
1379 global_tables_spec: bool,
1380 ) -> StdResult<(), TryFromIntError> {
1381 let global_table_spec = global_tables_spec
1382 .then(|| self.col_specs.first().map(|col_spec| col_spec.table_spec()))
1383 .flatten();
1384
1385 let mut flags = 0;
1386 if global_table_spec.is_some() {
1387 flags |= 0x0001;
1388 }
1389 if no_metadata {
1390 flags |= 0x0004;
1391 }
1392 types::write_int(flags, buf);
1393
1394 types::write_int_length(self.col_count, buf)?;
1395
1396 if !no_metadata {
1399 if let Some(spec) = global_table_spec {
1400 spec.serialize(buf)?;
1401 }
1402
1403 for col_spec in self.col_specs() {
1404 if global_table_spec.is_none() {
1405 col_spec.table_spec().serialize(buf)?;
1406 }
1407
1408 types::write_string(col_spec.name(), buf)?;
1409 col_spec.typ().serialize(buf)?;
1410 }
1411 }
1412
1413 Ok(())
1414 }
1415 }
1416
1417 impl RawMetadataAndRawRows {
1418 #[doc(hidden)]
1419 #[inline]
1420 pub fn new_for_test(
1421 cached_metadata: Option<Arc<ResultMetadata<'static>>>,
1422 metadata: Option<ResultMetadata>,
1423 global_tables_spec: bool,
1424 rows_count: usize,
1425 raw_rows: &[u8],
1426 ) -> StdResult<Self, TryFromIntError> {
1427 let no_metadata = metadata.is_none();
1428 let empty_metadata = ResultMetadata::mock_empty();
1429 let used_metadata = metadata
1430 .as_ref()
1431 .or(cached_metadata.as_deref())
1432 .unwrap_or(&empty_metadata);
1433
1434 let raw_result_rows = {
1435 let mut buf = BytesMut::new();
1436 used_metadata.serialize(&mut buf, global_tables_spec, no_metadata)?;
1437 types::write_int_length(rows_count, &mut buf)?;
1438 buf.extend_from_slice(raw_rows);
1439
1440 buf.freeze()
1441 };
1442
1443 let (raw_rows, _paging_state_response) =
1444 Self::deserialize(&mut FrameSlice::new(&raw_result_rows), cached_metadata).expect(
1445 "Ill-formed serialized metadata for tests - likely bug in serialization code",
1446 );
1447
1448 Ok(raw_rows)
1449 }
1450 }
1451
1452 impl DeserializedMetadataAndRawRows {
1453 #[inline]
1454 #[cfg(test)]
1455 pub(crate) fn new_for_test(
1456 metadata: ResultMetadata<'static>,
1457 rows_count: usize,
1458 raw_rows: Bytes,
1459 ) -> Self {
1460 Self {
1461 metadata: ResultMetadataHolder::SharedCached(Arc::new(metadata)),
1462 rows_count,
1463 raw_rows,
1464 }
1465 }
1466 }
1467}