scylla_cql/frame/response/
result.rs

1use crate::deserialize::result::{RawRowIterator, TypedRowIterator};
2use crate::deserialize::row::DeserializeRow;
3use crate::deserialize::{FrameSlice, TypeCheckError};
4use crate::frame::frame_errors::{
5    ColumnSpecParseError, ColumnSpecParseErrorKind, CqlResultParseError, CqlTypeParseError,
6    LowLevelDeserializationError, PreparedMetadataParseError, PreparedParseError,
7    RawRowsAndPagingStateResponseParseError, ResultMetadataAndRowsCountParseError,
8    ResultMetadataParseError, SchemaChangeEventParseError, SetKeyspaceParseError,
9    TableSpecParseError,
10};
11use crate::frame::request::query::PagingStateResponse;
12use crate::frame::response::event::SchemaChangeEvent;
13use crate::frame::types;
14use bytes::{Buf, Bytes};
15use std::borrow::Cow;
16use std::fmt::Debug;
17use std::sync::Arc;
18use std::{result::Result as StdResult, str};
19
20#[derive(Debug)]
21pub struct SetKeyspace {
22    pub keyspace_name: String,
23}
24
25#[derive(Debug)]
26pub struct Prepared {
27    pub id: Bytes,
28    pub prepared_metadata: PreparedMetadata,
29    pub result_metadata: ResultMetadata<'static>,
30}
31
32#[derive(Debug)]
33pub struct SchemaChange {
34    pub event: SchemaChangeEvent,
35}
36
37#[derive(Debug, Clone, PartialEq, Eq, Hash)]
38pub struct TableSpec<'a> {
39    ks_name: Cow<'a, str>,
40    table_name: Cow<'a, str>,
41}
42
43/// A type of:
44/// - a column in schema metadata
45/// - a bind marker in a prepared statement
46/// - a column a in query result set
47///
48/// Some of the variants contain a `frozen` flag. This flag is only used
49/// in schema metadata. For prepared statement bind markers and query result
50/// types those fields will always be set to `false` (even if the DB column
51/// corresponding to given marker / result type is frozen).
52#[derive(Clone, Debug, PartialEq, Eq)]
53#[non_exhaustive]
54pub enum ColumnType<'frame> {
55    /// Types that are "simple" (non-recursive).
56    Native(NativeType),
57
58    /// Collection types: Map, Set, and List. Those are composite types with
59    /// dynamic size but constant predefined element types.
60    Collection {
61        frozen: bool,
62        typ: CollectionType<'frame>,
63    },
64
65    /// A composite list-like type that has a defined size and all its elements
66    /// are of the same type. Intuitively, it can be viewed as a list with constant
67    /// predefined size, or as a tuple which has all elements of the same type.
68    Vector {
69        typ: Box<ColumnType<'frame>>,
70        dimensions: u16,
71    },
72
73    /// A C-struct-like type defined by the user.
74    UserDefinedType {
75        frozen: bool,
76        definition: Arc<UserDefinedType<'frame>>,
77    },
78
79    /// A composite type with a defined size and elements of possibly different,
80    /// but predefined, types.
81    Tuple(Vec<ColumnType<'frame>>),
82}
83
84/// A [ColumnType] variants that are "simple" (non-recursive).
85#[derive(Clone, Debug, PartialEq, Eq)]
86#[non_exhaustive]
87pub enum NativeType {
88    Ascii,
89    Boolean,
90    Blob,
91    Counter,
92    Date,
93    Decimal,
94    Double,
95    Duration,
96    Float,
97    Int,
98    BigInt,
99    Text,
100    Timestamp,
101    Inet,
102    SmallInt,
103    TinyInt,
104    Time,
105    Timeuuid,
106    Uuid,
107    Varint,
108}
109
110/// Collection variants of [ColumnType]. A collection is a composite type that
111/// has dynamic size, so it is possible to add and remove values to/from it.
112///
113/// Tuple and vector are not collections because they have predefined size.
114#[derive(Clone, Debug, PartialEq, Eq)]
115#[non_exhaustive]
116pub enum CollectionType<'frame> {
117    List(Box<ColumnType<'frame>>),
118    Map(Box<ColumnType<'frame>>, Box<ColumnType<'frame>>),
119    Set(Box<ColumnType<'frame>>),
120}
121
122/// Definition of a user-defined type
123#[derive(Clone, Debug, PartialEq, Eq)]
124pub struct UserDefinedType<'frame> {
125    pub name: Cow<'frame, str>,
126    pub keyspace: Cow<'frame, str>,
127    pub field_types: Vec<(Cow<'frame, str>, ColumnType<'frame>)>,
128}
129
130impl ColumnType<'_> {
131    pub fn into_owned(self) -> ColumnType<'static> {
132        match self {
133            ColumnType::Native(b) => ColumnType::Native(b),
134            ColumnType::Collection { frozen, typ: t } => ColumnType::Collection {
135                frozen,
136                typ: t.into_owned(),
137            },
138            ColumnType::Vector {
139                typ: type_,
140                dimensions,
141            } => ColumnType::Vector {
142                typ: Box::new(type_.into_owned()),
143                dimensions,
144            },
145            ColumnType::UserDefinedType {
146                frozen,
147                definition: udt,
148            } => {
149                let udt = Arc::try_unwrap(udt).unwrap_or_else(|e| e.as_ref().clone());
150                ColumnType::UserDefinedType {
151                    frozen,
152                    definition: Arc::new(UserDefinedType {
153                        name: udt.name.into_owned().into(),
154                        keyspace: udt.keyspace.into_owned().into(),
155                        field_types: udt
156                            .field_types
157                            .into_iter()
158                            .map(|(cow, column_type)| {
159                                (cow.into_owned().into(), column_type.into_owned())
160                            })
161                            .collect(),
162                    }),
163                }
164            }
165            ColumnType::Tuple(vec) => {
166                ColumnType::Tuple(vec.into_iter().map(ColumnType::into_owned).collect())
167            }
168        }
169    }
170}
171
172impl CollectionType<'_> {
173    fn into_owned(self) -> CollectionType<'static> {
174        match self {
175            CollectionType::List(elem_type) => {
176                CollectionType::List(Box::new(elem_type.into_owned()))
177            }
178            CollectionType::Map(key, value) => {
179                CollectionType::Map(Box::new(key.into_owned()), Box::new(value.into_owned()))
180            }
181            CollectionType::Set(elem_type) => CollectionType::Set(Box::new(elem_type.into_owned())),
182        }
183    }
184}
185
186impl<'a> TableSpec<'a> {
187    pub const fn borrowed(ks: &'a str, table: &'a str) -> Self {
188        Self {
189            ks_name: Cow::Borrowed(ks),
190            table_name: Cow::Borrowed(table),
191        }
192    }
193
194    pub fn ks_name(&'a self) -> &'a str {
195        self.ks_name.as_ref()
196    }
197
198    pub fn table_name(&'a self) -> &'a str {
199        self.table_name.as_ref()
200    }
201
202    pub fn into_owned(self) -> TableSpec<'static> {
203        TableSpec::owned(self.ks_name.into_owned(), self.table_name.into_owned())
204    }
205
206    pub fn to_owned(&self) -> TableSpec<'static> {
207        TableSpec::owned(self.ks_name().to_owned(), self.table_name().to_owned())
208    }
209}
210
211impl TableSpec<'static> {
212    pub fn owned(ks_name: String, table_name: String) -> Self {
213        Self {
214            ks_name: Cow::Owned(ks_name),
215            table_name: Cow::Owned(table_name),
216        }
217    }
218}
219
220impl ColumnType<'_> {
221    // Returns true if the type allows a special, empty value in addition to its
222    // natural representation. For example, bigint represents a 32-bit integer,
223    // but it can also hold a 0-bit empty value.
224    //
225    // It looks like Cassandra 4.1.3 rejects empty values for some more types than
226    // Scylla: date, time, smallint and tinyint. We will only check against
227    // Scylla's set of types supported for empty values as it's smaller;
228    // with Cassandra, some rejects will just have to be rejected on the db side.
229    pub(crate) fn supports_special_empty_value(&self) -> bool {
230        #[allow(clippy::match_like_matches_macro)]
231        match self {
232            ColumnType::Native(NativeType::Counter)
233            | ColumnType::Native(NativeType::Duration)
234            | ColumnType::Collection { .. }
235            | ColumnType::UserDefinedType { .. } => false,
236
237            _ => true,
238        }
239    }
240}
241
242#[derive(Debug, Clone, PartialEq, Eq)]
243pub struct ColumnSpec<'frame> {
244    pub(crate) table_spec: TableSpec<'frame>,
245    pub(crate) name: Cow<'frame, str>,
246    pub(crate) typ: ColumnType<'frame>,
247}
248
249impl ColumnSpec<'static> {
250    #[inline]
251    pub fn owned(name: String, typ: ColumnType<'static>, table_spec: TableSpec<'static>) -> Self {
252        Self {
253            table_spec,
254            name: Cow::Owned(name),
255            typ,
256        }
257    }
258}
259
260impl<'frame> ColumnSpec<'frame> {
261    #[inline]
262    pub const fn borrowed(
263        name: &'frame str,
264        typ: ColumnType<'frame>,
265        table_spec: TableSpec<'frame>,
266    ) -> Self {
267        Self {
268            table_spec,
269            name: Cow::Borrowed(name),
270            typ,
271        }
272    }
273
274    #[inline]
275    pub fn table_spec(&self) -> &TableSpec<'frame> {
276        &self.table_spec
277    }
278
279    #[inline]
280    pub fn name(&self) -> &str {
281        &self.name
282    }
283
284    #[inline]
285    pub fn typ(&self) -> &ColumnType<'frame> {
286        &self.typ
287    }
288}
289
290#[derive(Debug, Clone)]
291pub struct ResultMetadata<'a> {
292    col_count: usize,
293    col_specs: Vec<ColumnSpec<'a>>,
294}
295
296impl<'a> ResultMetadata<'a> {
297    #[inline]
298    pub fn col_count(&self) -> usize {
299        self.col_count
300    }
301
302    #[inline]
303    pub fn col_specs(&self) -> &[ColumnSpec<'a>] {
304        &self.col_specs
305    }
306
307    // Preferred to implementing Default, because users shouldn't be encouraged to create
308    // empty ResultMetadata.
309    #[inline]
310    pub fn mock_empty() -> Self {
311        Self {
312            col_count: 0,
313            col_specs: Vec::new(),
314        }
315    }
316}
317
318/// Versatile container for [ResultMetadata]. Allows 2 types of ownership
319/// of `ResultMetadata`:
320/// 1. owning it in a borrowed form, self-borrowed from the RESULT:Rows frame;
321/// 2. sharing ownership of metadata cached in PreparedStatement.
322#[derive(Debug)]
323pub enum ResultMetadataHolder {
324    SelfBorrowed(SelfBorrowedMetadataContainer),
325    SharedCached(Arc<ResultMetadata<'static>>),
326}
327
328impl ResultMetadataHolder {
329    /// Returns reference to the stored [ResultMetadata].
330    ///
331    /// Note that [ResultMetadataHolder] cannot implement [Deref](std::ops::Deref),
332    /// because `Deref` does not permit that `Deref::Target`'s lifetime depend on
333    /// lifetime of `&self`.
334    #[inline]
335    pub fn inner(&self) -> &ResultMetadata<'_> {
336        match self {
337            ResultMetadataHolder::SelfBorrowed(c) => c.metadata(),
338            ResultMetadataHolder::SharedCached(s) => s,
339        }
340    }
341
342    /// Creates an empty [ResultMetadataHolder].
343    #[inline]
344    pub fn mock_empty() -> Self {
345        Self::SelfBorrowed(SelfBorrowedMetadataContainer::mock_empty())
346    }
347}
348
349#[derive(Debug, Copy, Clone)]
350pub struct PartitionKeyIndex {
351    /// index in the serialized values
352    pub index: u16,
353    /// sequence number in partition key
354    pub sequence: u16,
355}
356
357#[derive(Debug, Clone)]
358pub struct PreparedMetadata {
359    pub flags: i32,
360    pub col_count: usize,
361    /// pk_indexes are sorted by `index` and can be reordered in partition key order
362    /// using `sequence` field
363    pub pk_indexes: Vec<PartitionKeyIndex>,
364    pub col_specs: Vec<ColumnSpec<'static>>,
365}
366
367/// RESULT:Rows response, in partially serialized form.
368///
369/// Flags and paging state are deserialized, remaining part of metadata
370/// as well as rows remain serialized.
371#[derive(Debug, Clone)]
372pub struct RawMetadataAndRawRows {
373    // Already deserialized part of metadata:
374    col_count: usize,
375    global_tables_spec: bool,
376    no_metadata: bool,
377
378    /// The remaining part of the RESULT frame.
379    raw_metadata_and_rows: Bytes,
380
381    /// Metadata cached in PreparedStatement, if present.
382    cached_metadata: Option<Arc<ResultMetadata<'static>>>,
383}
384
385impl RawMetadataAndRawRows {
386    /// Creates an empty [RawMetadataAndRawRows].
387    // Preferred to implementing Default, because users shouldn't be encouraged to create
388    // empty RawMetadataAndRawRows.
389    #[inline]
390    pub fn mock_empty() -> Self {
391        // Minimal correct `raw_metadata_and_rows` looks like this:
392        // Empty metadata (0 bytes), rows_count=0 (i32 big endian), empty rows (0 bytes).
393        static EMPTY_METADATA_ZERO_ROWS: &[u8] = &0_i32.to_be_bytes();
394        let raw_metadata_and_rows = Bytes::from_static(EMPTY_METADATA_ZERO_ROWS);
395
396        Self {
397            col_count: 0,
398            global_tables_spec: false,
399            no_metadata: false,
400            raw_metadata_and_rows,
401            cached_metadata: None,
402        }
403    }
404
405    /// Returns the serialized size of the raw metadata + raw rows.
406    #[inline]
407    pub fn metadata_and_rows_bytes_size(&self) -> usize {
408        self.raw_metadata_and_rows.len()
409    }
410}
411
412mod self_borrowed_metadata {
413    use std::ops::Deref;
414
415    use bytes::Bytes;
416    use yoke::{Yoke, Yokeable};
417
418    use super::ResultMetadata;
419
420    // A trivial wrapper over Bytes, introduced to circumvent the orphan rule.
421    // (neither `bytes` nor `stable_deref_trait` crate wants to implement
422    //  `StableDeref` for `Bytes`, so we need a wrapper for that)
423    #[derive(Debug, Clone)]
424    struct BytesWrapper {
425        inner: Bytes,
426    }
427
428    impl Deref for BytesWrapper {
429        type Target = [u8];
430
431        fn deref(&self) -> &Self::Target {
432            &self.inner
433        }
434    }
435
436    // SAFETY:
437    // StableDeref requires that a type dereferences to a stable address, even when moved.
438    // `Bytes` satisfy this requirement, because they dereference to their heap allocation.
439    unsafe impl stable_deref_trait::StableDeref for BytesWrapper {}
440
441    // SAFETY:
442    // Citing `CloneableCart`'s docstring:
443    // > Safety
444    // > This trait is safe to implement on StableDeref types which, once Cloned, point to the same underlying data and retain ownership.
445    //
446    // `Bytes` satisfy this requirement.
447    unsafe impl yoke::CloneableCart for BytesWrapper {}
448
449    // A trivial wrapper over [ResultMetadata], introduced to keep ResultMetadata free of Yoke.
450    // This way Yoke does not appear in any public types/APIs.
451    #[derive(Debug, Clone, Yokeable)]
452    struct ResultMetadataWrapper<'frame>(ResultMetadata<'frame>);
453
454    /// A container that can be considered an `Arc<ResultMetadata>` with an additional capability
455    /// of containing metadata in a borrowed form.
456    ///
457    /// The borrow comes from the `Bytes` that this container holds internally. Therefore,
458    /// the held `ResultMetadata`'s lifetime is covariant with the lifetime of this container
459    /// itself.
460    #[derive(Debug, Clone)]
461    pub struct SelfBorrowedMetadataContainer {
462        metadata_and_raw_rows: Yoke<ResultMetadataWrapper<'static>, BytesWrapper>,
463    }
464
465    impl SelfBorrowedMetadataContainer {
466        /// Creates an empty [SelfBorrowedMetadataContainer].
467        pub fn mock_empty() -> Self {
468            Self {
469                metadata_and_raw_rows: Yoke::attach_to_cart(
470                    BytesWrapper {
471                        inner: Bytes::new(),
472                    },
473                    |_| ResultMetadataWrapper(ResultMetadata::mock_empty()),
474                ),
475            }
476        }
477
478        /// Returns a reference to the contained [ResultMetadata].
479        pub fn metadata(&self) -> &ResultMetadata<'_> {
480            &self.metadata_and_raw_rows.get().0
481        }
482
483        // Returns Self (deserialized metadata) and the rest of the bytes,
484        // which contain rows count and then rows themselves.
485        pub(super) fn make_deserialized_metadata<F, ErrorT>(
486            frame: Bytes,
487            deserializer: F,
488        ) -> Result<(Self, Bytes), ErrorT>
489        where
490            // This constraint is modelled after `Yoke::try_attach_to_cart`.
491            F: for<'frame> FnOnce(&mut &'frame [u8]) -> Result<ResultMetadata<'frame>, ErrorT>,
492        {
493            let deserialized_metadata_and_raw_rows: Yoke<
494                (ResultMetadataWrapper<'static>, &'static [u8]),
495                BytesWrapper,
496            > = Yoke::try_attach_to_cart(BytesWrapper { inner: frame }, |mut slice| {
497                let metadata = deserializer(&mut slice)?;
498                let row_count_and_raw_rows = slice;
499                Ok((ResultMetadataWrapper(metadata), row_count_and_raw_rows))
500            })?;
501
502            let (_metadata, raw_rows) = deserialized_metadata_and_raw_rows.get();
503            let raw_rows_with_count = deserialized_metadata_and_raw_rows
504                .backing_cart()
505                .inner
506                .slice_ref(raw_rows);
507
508            Ok((
509                Self {
510                    metadata_and_raw_rows: deserialized_metadata_and_raw_rows
511                        .map_project(|(metadata, _), _| metadata),
512                },
513                raw_rows_with_count,
514            ))
515        }
516    }
517}
518pub use self_borrowed_metadata::SelfBorrowedMetadataContainer;
519
520/// RESULT:Rows response, in partially serialized form.
521///
522/// Paging state and metadata are deserialized, rows remain serialized.
523#[derive(Debug)]
524pub struct DeserializedMetadataAndRawRows {
525    metadata: ResultMetadataHolder,
526    rows_count: usize,
527    raw_rows: Bytes,
528}
529
530impl DeserializedMetadataAndRawRows {
531    /// Returns the metadata associated with this response
532    /// (table and column specifications).
533    #[inline]
534    pub fn metadata(&self) -> &ResultMetadata<'_> {
535        self.metadata.inner()
536    }
537
538    /// Consumes the `DeserializedMetadataAndRawRows` and returns metadata
539    /// associated with the response (or cached metadata, if used in its stead).
540    #[inline]
541    pub fn into_metadata(self) -> ResultMetadataHolder {
542        self.metadata
543    }
544
545    /// Returns the number of rows that the RESULT:Rows contain.
546    #[inline]
547    pub fn rows_count(&self) -> usize {
548        self.rows_count
549    }
550
551    /// Returns the serialized size of the raw rows.
552    #[inline]
553    pub fn rows_bytes_size(&self) -> usize {
554        self.raw_rows.len()
555    }
556
557    // Preferred to implementing Default, because users shouldn't be encouraged to create
558    // empty DeserializedMetadataAndRawRows.
559    #[inline]
560    pub fn mock_empty() -> Self {
561        Self {
562            metadata: ResultMetadataHolder::SelfBorrowed(
563                SelfBorrowedMetadataContainer::mock_empty(),
564            ),
565            rows_count: 0,
566            raw_rows: Bytes::new(),
567        }
568    }
569
570    pub(crate) fn into_inner(self) -> (ResultMetadataHolder, usize, Bytes) {
571        (self.metadata, self.rows_count, self.raw_rows)
572    }
573
574    /// Creates a typed iterator over the rows that lazily deserializes
575    /// rows in the result.
576    ///
577    /// Returns Err if the schema of returned result doesn't match R.
578    #[inline]
579    pub fn rows_iter<'frame, 'metadata, R: DeserializeRow<'frame, 'metadata>>(
580        &'frame self,
581    ) -> StdResult<TypedRowIterator<'frame, 'metadata, R>, TypeCheckError>
582    where
583        'frame: 'metadata,
584    {
585        let frame_slice = FrameSlice::new(&self.raw_rows);
586        let raw = RawRowIterator::new(
587            self.rows_count,
588            self.metadata.inner().col_specs(),
589            frame_slice,
590        );
591        TypedRowIterator::new(raw)
592    }
593}
594
595#[derive(Debug)]
596pub enum Result {
597    Void,
598    Rows((RawMetadataAndRawRows, PagingStateResponse)),
599    SetKeyspace(SetKeyspace),
600    Prepared(Prepared),
601    SchemaChange(SchemaChange),
602}
603
604fn deser_type_generic<'frame, 'result, StrT: Into<Cow<'result, str>>>(
605    buf: &mut &'frame [u8],
606    read_string: fn(&mut &'frame [u8]) -> StdResult<StrT, LowLevelDeserializationError>,
607) -> StdResult<ColumnType<'result>, CqlTypeParseError> {
608    use ColumnType::*;
609    use NativeType::*;
610    let id =
611        types::read_short(buf).map_err(|err| CqlTypeParseError::TypeIdParseError(err.into()))?;
612    Ok(match id {
613        0x0000 => {
614            // We use types::read_string instead of read_string argument here on purpose.
615            // Chances are the underlying string is `...DurationType`, in which case
616            // we don't need to allocate it at all. Only for Custom types
617            // (which we don't support anyway) do we need to allocate.
618            // OTOH, the provided `read_string` function deserializes borrowed OR owned string;
619            // here we want to always deserialize borrowed string.
620            let type_str =
621                types::read_string(buf).map_err(CqlTypeParseError::CustomTypeNameParseError)?;
622            match type_str {
623                "org.apache.cassandra.db.marshal.DurationType" => Native(Duration),
624                _ => {
625                    return Err(CqlTypeParseError::CustomTypeUnsupported(
626                        type_str.to_owned(),
627                    ))
628                }
629            }
630        }
631        0x0001 => Native(Ascii),
632        0x0002 => Native(BigInt),
633        0x0003 => Native(Blob),
634        0x0004 => Native(Boolean),
635        0x0005 => Native(Counter),
636        0x0006 => Native(Decimal),
637        0x0007 => Native(Double),
638        0x0008 => Native(Float),
639        0x0009 => Native(Int),
640        0x000B => Native(Timestamp),
641        0x000C => Native(Uuid),
642        0x000D => Native(Text),
643        0x000E => Native(Varint),
644        0x000F => Native(Timeuuid),
645        0x0010 => Native(Inet),
646        0x0011 => Native(Date),
647        0x0012 => Native(Time),
648        0x0013 => Native(SmallInt),
649        0x0014 => Native(TinyInt),
650        0x0015 => Native(Duration),
651        0x0020 => Collection {
652            frozen: false,
653            typ: CollectionType::List(Box::new(deser_type_generic(buf, read_string)?)),
654        },
655        0x0021 => Collection {
656            frozen: false,
657            typ: CollectionType::Map(
658                Box::new(deser_type_generic(buf, read_string)?),
659                Box::new(deser_type_generic(buf, read_string)?),
660            ),
661        },
662        0x0022 => Collection {
663            frozen: false,
664            typ: CollectionType::Set(Box::new(deser_type_generic(buf, read_string)?)),
665        },
666        0x0030 => {
667            let keyspace_name =
668                read_string(buf).map_err(CqlTypeParseError::UdtKeyspaceNameParseError)?;
669            let type_name = read_string(buf).map_err(CqlTypeParseError::UdtNameParseError)?;
670            let fields_size: usize = types::read_short(buf)
671                .map_err(|err| CqlTypeParseError::UdtFieldsCountParseError(err.into()))?
672                .into();
673
674            let mut field_types: Vec<(Cow<'result, str>, ColumnType)> =
675                Vec::with_capacity(fields_size);
676
677            for _ in 0..fields_size {
678                let field_name =
679                    read_string(buf).map_err(CqlTypeParseError::UdtFieldNameParseError)?;
680                let field_type = deser_type_generic(buf, read_string)?;
681
682                field_types.push((field_name.into(), field_type));
683            }
684
685            UserDefinedType {
686                frozen: false,
687                definition: Arc::new(self::UserDefinedType {
688                    name: type_name.into(),
689                    keyspace: keyspace_name.into(),
690                    field_types,
691                }),
692            }
693        }
694        0x0031 => {
695            let len: usize = types::read_short(buf)
696                .map_err(|err| CqlTypeParseError::TupleLengthParseError(err.into()))?
697                .into();
698            let mut types = Vec::with_capacity(len);
699            for _ in 0..len {
700                types.push(deser_type_generic(buf, read_string)?);
701            }
702            Tuple(types)
703        }
704        id => {
705            return Err(CqlTypeParseError::TypeNotImplemented(id));
706        }
707    })
708}
709
710fn deser_type_borrowed<'frame>(
711    buf: &mut &'frame [u8],
712) -> StdResult<ColumnType<'frame>, CqlTypeParseError> {
713    deser_type_generic(buf, |buf| types::read_string(buf))
714}
715
716fn deser_type_owned(buf: &mut &[u8]) -> StdResult<ColumnType<'static>, CqlTypeParseError> {
717    deser_type_generic(buf, |buf| types::read_string(buf).map(ToOwned::to_owned))
718}
719
720/// Deserializes a table spec, be it per-column one or a global one,
721/// in the borrowed form.
722///
723/// This function does not allocate.
724/// To obtain TableSpec<'static>, use `.into_owned()` on its result.
725fn deser_table_spec<'frame>(
726    buf: &mut &'frame [u8],
727) -> StdResult<TableSpec<'frame>, TableSpecParseError> {
728    let ks_name = types::read_string(buf).map_err(TableSpecParseError::MalformedKeyspaceName)?;
729    let table_name = types::read_string(buf).map_err(TableSpecParseError::MalformedTableName)?;
730    Ok(TableSpec::borrowed(ks_name, table_name))
731}
732
733fn mk_col_spec_parse_error(
734    col_idx: usize,
735    err: impl Into<ColumnSpecParseErrorKind>,
736) -> ColumnSpecParseError {
737    ColumnSpecParseError {
738        column_index: col_idx,
739        kind: err.into(),
740    }
741}
742
743fn deser_col_specs_generic<'frame, 'result>(
744    buf: &mut &'frame [u8],
745    global_table_spec: Option<TableSpec<'frame>>,
746    col_count: usize,
747    make_col_spec: fn(&'frame str, ColumnType<'result>, TableSpec<'frame>) -> ColumnSpec<'result>,
748    deser_type: fn(&mut &'frame [u8]) -> StdResult<ColumnType<'result>, CqlTypeParseError>,
749) -> StdResult<Vec<ColumnSpec<'result>>, ColumnSpecParseError> {
750    let mut col_specs = Vec::with_capacity(col_count);
751    for col_idx in 0..col_count {
752        let table_spec = match global_table_spec {
753            // If global table spec was provided, we simply clone it to each column spec.
754            Some(ref known_spec) => known_spec.clone(),
755
756            // Else, we deserialize the table spec for a column.
757            None => deser_table_spec(buf).map_err(|err| mk_col_spec_parse_error(col_idx, err))?,
758        };
759
760        let name = types::read_string(buf).map_err(|err| mk_col_spec_parse_error(col_idx, err))?;
761        let typ = deser_type(buf).map_err(|err| mk_col_spec_parse_error(col_idx, err))?;
762        let col_spec = make_col_spec(name, typ, table_spec);
763        col_specs.push(col_spec);
764    }
765    Ok(col_specs)
766}
767
768/// Deserializes col specs (part of ResultMetadata or PreparedMetadata)
769/// in the borrowed form.
770///
771/// To avoid needless allocations, it is advised to pass `global_table_spec`
772/// in the borrowed form, so that cloning it is cheap.
773fn deser_col_specs_borrowed<'frame>(
774    buf: &mut &'frame [u8],
775    global_table_spec: Option<TableSpec<'frame>>,
776    col_count: usize,
777) -> StdResult<Vec<ColumnSpec<'frame>>, ColumnSpecParseError> {
778    deser_col_specs_generic(
779        buf,
780        global_table_spec,
781        col_count,
782        ColumnSpec::borrowed,
783        deser_type_borrowed,
784    )
785}
786
787/// Deserializes col specs (part of ResultMetadata or PreparedMetadata)
788/// in the owned form.
789///
790/// To avoid needless allocations, it is advised to pass `global_table_spec`
791/// in the borrowed form, so that cloning it is cheap.
792fn deser_col_specs_owned<'frame>(
793    buf: &mut &'frame [u8],
794    global_table_spec: Option<TableSpec<'frame>>,
795    col_count: usize,
796) -> StdResult<Vec<ColumnSpec<'static>>, ColumnSpecParseError> {
797    let result: StdResult<Vec<ColumnSpec<'static>>, ColumnSpecParseError> = deser_col_specs_generic(
798        buf,
799        global_table_spec,
800        col_count,
801        |name: &str, typ, table_spec: TableSpec| {
802            ColumnSpec::owned(name.to_owned(), typ, table_spec.into_owned())
803        },
804        deser_type_owned,
805    );
806
807    result
808}
809
810fn deser_result_metadata(
811    buf: &mut &[u8],
812) -> StdResult<(ResultMetadata<'static>, PagingStateResponse), ResultMetadataParseError> {
813    let flags = types::read_int(buf)
814        .map_err(|err| ResultMetadataParseError::FlagsParseError(err.into()))?;
815    let global_tables_spec = flags & 0x0001 != 0;
816    let has_more_pages = flags & 0x0002 != 0;
817    let no_metadata = flags & 0x0004 != 0;
818
819    let col_count =
820        types::read_int_length(buf).map_err(ResultMetadataParseError::ColumnCountParseError)?;
821
822    let raw_paging_state = has_more_pages
823        .then(|| types::read_bytes(buf).map_err(ResultMetadataParseError::PagingStateParseError))
824        .transpose()?;
825
826    let paging_state = PagingStateResponse::new_from_raw_bytes(raw_paging_state);
827
828    let col_specs = if no_metadata {
829        vec![]
830    } else {
831        let global_table_spec = global_tables_spec
832            .then(|| deser_table_spec(buf))
833            .transpose()?;
834
835        deser_col_specs_owned(buf, global_table_spec, col_count)?
836    };
837
838    let metadata = ResultMetadata {
839        col_count,
840        col_specs,
841    };
842    Ok((metadata, paging_state))
843}
844
845impl RawMetadataAndRawRows {
846    /// Deserializes flags and paging state; the other part of result metadata
847    /// as well as rows remain serialized.
848    fn deserialize(
849        frame: &mut FrameSlice,
850        cached_metadata: Option<Arc<ResultMetadata<'static>>>,
851    ) -> StdResult<(Self, PagingStateResponse), RawRowsAndPagingStateResponseParseError> {
852        let flags = types::read_int(frame.as_slice_mut())
853            .map_err(|err| RawRowsAndPagingStateResponseParseError::FlagsParseError(err.into()))?;
854        let global_tables_spec = flags & 0x0001 != 0;
855        let has_more_pages = flags & 0x0002 != 0;
856        let no_metadata = flags & 0x0004 != 0;
857
858        let col_count = types::read_int_length(frame.as_slice_mut())
859            .map_err(RawRowsAndPagingStateResponseParseError::ColumnCountParseError)?;
860
861        let raw_paging_state = has_more_pages
862            .then(|| {
863                types::read_bytes(frame.as_slice_mut())
864                    .map_err(RawRowsAndPagingStateResponseParseError::PagingStateParseError)
865            })
866            .transpose()?;
867
868        let paging_state = PagingStateResponse::new_from_raw_bytes(raw_paging_state);
869
870        let raw_rows = Self {
871            col_count,
872            global_tables_spec,
873            no_metadata,
874            raw_metadata_and_rows: frame.to_bytes(),
875            cached_metadata,
876        };
877
878        Ok((raw_rows, paging_state))
879    }
880}
881
882impl RawMetadataAndRawRows {
883    // This function is needed because creating the deserializer closure
884    // directly in the enclosing function does not provide enough type hints
885    // for the compiler (and having a function with a verbose signature does),
886    // so it demands a type annotation. We cannot, however, write a correct
887    // type annotation, because this way we would limit the lifetime
888    // to a concrete lifetime, and our closure needs to be `impl for<'frame> ...`.
889    // This is a proud trick by Wojciech Przytuła, which crowns the brilliant
890    // idea of Karol Baryła to use Yoke to enable borrowing ResultMetadata
891    // from itself.
892    fn metadata_deserializer(
893        col_count: usize,
894        global_tables_spec: bool,
895    ) -> impl for<'frame> FnOnce(
896        &mut &'frame [u8],
897    ) -> StdResult<ResultMetadata<'frame>, ResultMetadataParseError> {
898        move |buf| {
899            let server_metadata = {
900                let global_table_spec = global_tables_spec
901                    .then(|| deser_table_spec(buf))
902                    .transpose()?;
903
904                let col_specs = deser_col_specs_borrowed(buf, global_table_spec, col_count)?;
905
906                ResultMetadata {
907                    col_count,
908                    col_specs,
909                }
910            };
911            Ok(server_metadata)
912        }
913    }
914
915    /// Deserializes ResultMetadata and deserializes rows count. Keeps rows in the serialized form.
916    ///
917    /// If metadata is cached (in the PreparedStatement), it is reused (shared) from cache
918    /// instead of deserializing.
919    pub fn deserialize_metadata(
920        self,
921    ) -> StdResult<DeserializedMetadataAndRawRows, ResultMetadataAndRowsCountParseError> {
922        let (metadata_deserialized, row_count_and_raw_rows) = match self.cached_metadata {
923            Some(cached) if self.no_metadata => {
924                // Server sent no metadata, but we have metadata cached. This means that we asked the server
925                // not to send metadata in the response as an optimization. We use cached metadata instead.
926                (
927                    ResultMetadataHolder::SharedCached(cached),
928                    self.raw_metadata_and_rows,
929                )
930            }
931            None if self.no_metadata => {
932                // Server sent no metadata and we have no metadata cached. Having no metadata cached,
933                // we wouldn't have asked the server for skipping metadata. Therefore, this is most probably
934                // not a SELECT, because in such case the server would send empty metadata both in Prepared
935                // and in Result responses.
936                (
937                    ResultMetadataHolder::mock_empty(),
938                    self.raw_metadata_and_rows,
939                )
940            }
941            Some(_) | None => {
942                // Two possibilities:
943                // 1) no cached_metadata provided. Server is supposed to provide the result metadata.
944                // 2) cached metadata present (so we should have asked for skipping metadata),
945                //    but the server sent result metadata anyway.
946                // In case 1 we have to deserialize result metadata. In case 2 we choose to do that,
947                // too, because it's suspicious, so we had better use the new metadata just in case.
948                // Also, we simply need to advance the buffer pointer past metadata, and this requires
949                // parsing metadata.
950
951                let (metadata_container, raw_rows_with_count) =
952                    self_borrowed_metadata::SelfBorrowedMetadataContainer::make_deserialized_metadata(
953                        self.raw_metadata_and_rows,
954                        Self::metadata_deserializer(self.col_count, self.global_tables_spec),
955                    )?;
956                (
957                    ResultMetadataHolder::SelfBorrowed(metadata_container),
958                    raw_rows_with_count,
959                )
960            }
961        };
962
963        let mut frame_slice = FrameSlice::new(&row_count_and_raw_rows);
964
965        let rows_count: usize = types::read_int_length(frame_slice.as_slice_mut())
966            .map_err(ResultMetadataAndRowsCountParseError::RowsCountParseError)?;
967
968        Ok(DeserializedMetadataAndRawRows {
969            metadata: metadata_deserialized,
970            rows_count,
971            raw_rows: frame_slice.to_bytes(),
972        })
973    }
974}
975
976fn deser_prepared_metadata(
977    buf: &mut &[u8],
978) -> StdResult<PreparedMetadata, PreparedMetadataParseError> {
979    let flags = types::read_int(buf)
980        .map_err(|err| PreparedMetadataParseError::FlagsParseError(err.into()))?;
981    let global_tables_spec = flags & 0x0001 != 0;
982
983    let col_count =
984        types::read_int_length(buf).map_err(PreparedMetadataParseError::ColumnCountParseError)?;
985
986    let pk_count: usize =
987        types::read_int_length(buf).map_err(PreparedMetadataParseError::PkCountParseError)?;
988
989    let mut pk_indexes = Vec::with_capacity(pk_count);
990    for i in 0..pk_count {
991        pk_indexes.push(PartitionKeyIndex {
992            index: types::read_short(buf)
993                .map_err(|err| PreparedMetadataParseError::PkIndexParseError(err.into()))?
994                as u16,
995            sequence: i as u16,
996        });
997    }
998    pk_indexes.sort_unstable_by_key(|pki| pki.index);
999
1000    let global_table_spec = global_tables_spec
1001        .then(|| deser_table_spec(buf))
1002        .transpose()?;
1003
1004    let col_specs = deser_col_specs_owned(buf, global_table_spec, col_count)?;
1005
1006    Ok(PreparedMetadata {
1007        flags,
1008        col_count,
1009        pk_indexes,
1010        col_specs,
1011    })
1012}
1013
1014fn deser_rows(
1015    buf_bytes: Bytes,
1016    cached_metadata: Option<&Arc<ResultMetadata<'static>>>,
1017) -> StdResult<(RawMetadataAndRawRows, PagingStateResponse), RawRowsAndPagingStateResponseParseError>
1018{
1019    let mut frame_slice = FrameSlice::new(&buf_bytes);
1020    RawMetadataAndRawRows::deserialize(&mut frame_slice, cached_metadata.cloned())
1021}
1022
1023fn deser_set_keyspace(buf: &mut &[u8]) -> StdResult<SetKeyspace, SetKeyspaceParseError> {
1024    let keyspace_name = types::read_string(buf)?.to_string();
1025
1026    Ok(SetKeyspace { keyspace_name })
1027}
1028
1029fn deser_prepared(buf: &mut &[u8]) -> StdResult<Prepared, PreparedParseError> {
1030    let id_len = types::read_short(buf)
1031        .map_err(|err| PreparedParseError::IdLengthParseError(err.into()))?
1032        as usize;
1033    let id: Bytes = buf[0..id_len].to_owned().into();
1034    buf.advance(id_len);
1035    let prepared_metadata =
1036        deser_prepared_metadata(buf).map_err(PreparedParseError::PreparedMetadataParseError)?;
1037    let (result_metadata, paging_state_response) =
1038        deser_result_metadata(buf).map_err(PreparedParseError::ResultMetadataParseError)?;
1039    if let PagingStateResponse::HasMorePages { state } = paging_state_response {
1040        return Err(PreparedParseError::NonZeroPagingState(
1041            state
1042                .as_bytes_slice()
1043                .cloned()
1044                .unwrap_or_else(|| Arc::from([])),
1045        ));
1046    }
1047
1048    Ok(Prepared {
1049        id,
1050        prepared_metadata,
1051        result_metadata,
1052    })
1053}
1054
1055fn deser_schema_change(buf: &mut &[u8]) -> StdResult<SchemaChange, SchemaChangeEventParseError> {
1056    Ok(SchemaChange {
1057        event: SchemaChangeEvent::deserialize(buf)?,
1058    })
1059}
1060
1061pub fn deserialize(
1062    buf_bytes: Bytes,
1063    cached_metadata: Option<&Arc<ResultMetadata<'static>>>,
1064) -> StdResult<Result, CqlResultParseError> {
1065    let buf = &mut &*buf_bytes;
1066    use self::Result::*;
1067    Ok(
1068        match types::read_int(buf)
1069            .map_err(|err| CqlResultParseError::ResultIdParseError(err.into()))?
1070        {
1071            0x0001 => Void,
1072            0x0002 => Rows(deser_rows(buf_bytes.slice_ref(buf), cached_metadata)?),
1073            0x0003 => SetKeyspace(deser_set_keyspace(buf)?),
1074            0x0004 => Prepared(deser_prepared(buf)?),
1075            0x0005 => SchemaChange(deser_schema_change(buf)?),
1076            id => return Err(CqlResultParseError::UnknownResultId(id)),
1077        },
1078    )
1079}
1080
1081// This is not #[cfg(test)], because it is used by scylla crate.
1082// Unfortunately, this attribute does not apply recursively to
1083// children item. Therefore, every `pub` item here must use have
1084// the specifier, too.
1085#[doc(hidden)]
1086mod test_utils {
1087    use std::num::TryFromIntError;
1088
1089    use bytes::{BufMut, BytesMut};
1090
1091    use super::*;
1092
1093    impl TableSpec<'_> {
1094        pub(crate) fn serialize(&self, buf: &mut impl BufMut) -> StdResult<(), TryFromIntError> {
1095            types::write_string(&self.ks_name, buf)?;
1096            types::write_string(&self.table_name, buf)?;
1097
1098            Ok(())
1099        }
1100    }
1101
1102    impl ColumnType<'_> {
1103        fn id(&self) -> u16 {
1104            use NativeType::*;
1105            match self {
1106                Self::Native(Ascii) => 0x0001,
1107                Self::Native(BigInt) => 0x0002,
1108                Self::Native(Blob) => 0x0003,
1109                Self::Native(Boolean) => 0x0004,
1110                Self::Native(Counter) => 0x0005,
1111                Self::Native(Decimal) => 0x0006,
1112                Self::Native(Double) => 0x0007,
1113                Self::Native(Float) => 0x0008,
1114                Self::Native(Int) => 0x0009,
1115                Self::Native(Timestamp) => 0x000B,
1116                Self::Native(Uuid) => 0x000C,
1117                Self::Native(Text) => 0x000D,
1118                Self::Native(Varint) => 0x000E,
1119                Self::Native(Timeuuid) => 0x000F,
1120                Self::Native(Inet) => 0x0010,
1121                Self::Native(Date) => 0x0011,
1122                Self::Native(Time) => 0x0012,
1123                Self::Native(SmallInt) => 0x0013,
1124                Self::Native(TinyInt) => 0x0014,
1125                Self::Native(Duration) => 0x0015,
1126                Self::Collection {
1127                    typ: CollectionType::List(_),
1128                    ..
1129                } => 0x0020,
1130                Self::Collection {
1131                    typ: CollectionType::Map(_, _),
1132                    ..
1133                } => 0x0021,
1134                Self::Collection {
1135                    typ: CollectionType::Set(_),
1136                    ..
1137                } => 0x0022,
1138                Self::Vector { .. } => {
1139                    unimplemented!();
1140                }
1141                Self::UserDefinedType { .. } => 0x0030,
1142                Self::Tuple(_) => 0x0031,
1143            }
1144        }
1145
1146        // Only for use in tests
1147        pub(crate) fn serialize(&self, buf: &mut impl BufMut) -> StdResult<(), TryFromIntError> {
1148            let id = self.id();
1149            types::write_short(id, buf);
1150
1151            match self {
1152                // Simple types
1153                ColumnType::Native(_) => (),
1154
1155                ColumnType::Collection {
1156                    typ: CollectionType::List(elem_type),
1157                    ..
1158                }
1159                | ColumnType::Collection {
1160                    typ: CollectionType::Set(elem_type),
1161                    ..
1162                } => {
1163                    elem_type.serialize(buf)?;
1164                }
1165                ColumnType::Collection {
1166                    typ: CollectionType::Map(key_type, value_type),
1167                    ..
1168                } => {
1169                    key_type.serialize(buf)?;
1170                    value_type.serialize(buf)?;
1171                }
1172                ColumnType::Tuple(types) => {
1173                    types::write_short_length(types.len(), buf)?;
1174                    for typ in types.iter() {
1175                        typ.serialize(buf)?;
1176                    }
1177                }
1178                ColumnType::Vector { .. } => {
1179                    unimplemented!()
1180                }
1181                ColumnType::UserDefinedType {
1182                    definition: udt, ..
1183                } => {
1184                    types::write_string(&udt.keyspace, buf)?;
1185                    types::write_string(&udt.name, buf)?;
1186                    types::write_short_length(udt.field_types.len(), buf)?;
1187                    for (field_name, field_type) in udt.field_types.iter() {
1188                        types::write_string(field_name, buf)?;
1189                        field_type.serialize(buf)?;
1190                    }
1191                }
1192            }
1193
1194            Ok(())
1195        }
1196    }
1197
1198    impl<'a> ResultMetadata<'a> {
1199        #[inline]
1200        #[doc(hidden)]
1201        pub fn new_for_test(col_count: usize, col_specs: Vec<ColumnSpec<'a>>) -> Self {
1202            Self {
1203                col_count,
1204                col_specs,
1205            }
1206        }
1207
1208        pub(crate) fn serialize(
1209            &self,
1210            buf: &mut impl BufMut,
1211            no_metadata: bool,
1212            global_tables_spec: bool,
1213        ) -> StdResult<(), TryFromIntError> {
1214            let global_table_spec = global_tables_spec
1215                .then(|| self.col_specs.first().map(|col_spec| col_spec.table_spec()))
1216                .flatten();
1217
1218            let mut flags = 0;
1219            if global_table_spec.is_some() {
1220                flags |= 0x0001;
1221            }
1222            if no_metadata {
1223                flags |= 0x0004;
1224            }
1225            types::write_int(flags, buf);
1226
1227            types::write_int_length(self.col_count, buf)?;
1228
1229            // No paging state.
1230
1231            if !no_metadata {
1232                if let Some(spec) = global_table_spec {
1233                    spec.serialize(buf)?;
1234                }
1235
1236                for col_spec in self.col_specs() {
1237                    if global_table_spec.is_none() {
1238                        col_spec.table_spec().serialize(buf)?;
1239                    }
1240
1241                    types::write_string(col_spec.name(), buf)?;
1242                    col_spec.typ().serialize(buf)?;
1243                }
1244            }
1245
1246            Ok(())
1247        }
1248    }
1249
1250    impl RawMetadataAndRawRows {
1251        #[doc(hidden)]
1252        #[inline]
1253        pub fn new_for_test(
1254            cached_metadata: Option<Arc<ResultMetadata<'static>>>,
1255            metadata: Option<ResultMetadata>,
1256            global_tables_spec: bool,
1257            rows_count: usize,
1258            raw_rows: &[u8],
1259        ) -> StdResult<Self, TryFromIntError> {
1260            let no_metadata = metadata.is_none();
1261            let empty_metadata = ResultMetadata::mock_empty();
1262            let used_metadata = metadata
1263                .as_ref()
1264                .or(cached_metadata.as_deref())
1265                .unwrap_or(&empty_metadata);
1266
1267            let raw_result_rows = {
1268                let mut buf = BytesMut::new();
1269                used_metadata.serialize(&mut buf, global_tables_spec, no_metadata)?;
1270                types::write_int_length(rows_count, &mut buf)?;
1271                buf.extend_from_slice(raw_rows);
1272
1273                buf.freeze()
1274            };
1275
1276            let (raw_rows, _paging_state_response) =
1277                Self::deserialize(&mut FrameSlice::new(&raw_result_rows), cached_metadata).expect(
1278                    "Ill-formed serialized metadata for tests - likely bug in serialization code",
1279                );
1280
1281            Ok(raw_rows)
1282        }
1283    }
1284
1285    impl DeserializedMetadataAndRawRows {
1286        #[inline]
1287        #[cfg(test)]
1288        pub(crate) fn new_for_test(
1289            metadata: ResultMetadata<'static>,
1290            rows_count: usize,
1291            raw_rows: Bytes,
1292        ) -> Self {
1293            Self {
1294                metadata: ResultMetadataHolder::SharedCached(Arc::new(metadata)),
1295                rows_count,
1296                raw_rows,
1297            }
1298        }
1299    }
1300}