scylla/statement/
prepared.rs

1use bytes::{Bytes, BytesMut};
2use scylla_cql::frame::response::result::{
3    ColumnSpec, PartitionKeyIndex, ResultMetadata, TableSpec,
4};
5use scylla_cql::frame::types::RawValue;
6use scylla_cql::serialize::row::{RowSerializationContext, SerializeRow, SerializedValues};
7use scylla_cql::serialize::SerializationError;
8use smallvec::{smallvec, SmallVec};
9use std::convert::TryInto;
10use std::sync::Arc;
11use std::time::Duration;
12use thiserror::Error;
13use uuid::Uuid;
14
15use super::{PageSize, StatementConfig};
16use crate::client::execution_profile::ExecutionProfileHandle;
17use crate::errors::{BadQuery, ExecutionError};
18use crate::frame::response::result::PreparedMetadata;
19use crate::frame::types::{Consistency, SerialConsistency};
20use crate::observability::history::HistoryListener;
21use crate::policies::retry::RetryPolicy;
22use crate::response::query_result::ColumnSpecs;
23use crate::routing::partitioner::{Partitioner, PartitionerHasher, PartitionerName};
24use crate::routing::Token;
25
26/// Represents a statement prepared on the server.
27///
28/// To prepare a statement, simply execute [`Session::prepare`](crate::client::session::Session::prepare).
29///
30/// If you plan on reusing the statement, or bounding some values to it during execution, always
31/// prefer using prepared statements over `Session::query_*` methods,
32/// e.g. [`Session::query_unpaged`](crate::client::session::Session::query_unpaged).
33///
34/// Benefits that prepared statements have to offer:
35/// * Performance - a prepared statement holds information about metadata
36///   that allows to carry out a statement execution in a type safe manner.
37///   When any of `Session::query_*` methods is called with non-empty bound values,
38///   the driver has to prepare the statement before execution (to provide type safety).
39///   This implies 2 round trips per [`Session::query_unpaged`](crate::client::session::Session::query_unpaged).
40///   On the other hand, the cost of [`Session::execute_unpaged`](crate::client::session::Session::execute_unpaged)
41///   is only 1 round trip.
42/// * Increased type-safety - bound values' types are validated with
43///   the [`PreparedMetadata`] received from the server during the serialization.
44/// * Improved load balancing - thanks to statement metadata, the driver is able
45///   to compute a set of destined replicas for the statement execution. These replicas
46///   will be preferred when choosing the node (and shard) to send the request to.
47/// * Result deserialization optimization - see [`PreparedStatement::set_use_cached_result_metadata`].
48///
49/// # Clone implementation
50/// Cloning a prepared statement is a cheap operation. It only
51/// requires copying a couple of small fields and some [Arc] pointers.
52/// Always prefer cloning over executing [`Session::prepare`](crate::client::session::Session::prepare)
53/// multiple times to save some roundtrips.
54///
55/// # Statement repreparation
56/// When schema is updated, the server is supposed to invalidate its
57/// prepared statement caches. Then, if client tries to execute a given statement,
58/// the server will respond with an error. Users should not worry about it, since
59/// the driver handles it properly and tries to reprepare the statement.
60/// However, there are some cases when client-side prepared statement should be dropped
61/// and prepared once again via [`Session::prepare`](crate::client::session::Session::prepare) -
62/// see the mention about altering schema below.
63///
64/// # Altering schema
65/// If for some reason you decided to alter the part of schema that corresponds to given prepared
66/// statement, then the corresponding statement (and its copies obtained via [`PreparedStatement::clone`]) should
67/// be dropped. The statement should be prepared again.
68///
69/// There are two reasons for this:
70///
71/// ### CQL v4 protocol limitations
72/// The driver only supports CQL version 4.
73///
74/// In multi-client scenario, only the first client which reprepares the statement
75/// will receive the updated metadata from the server.
76/// The rest of the clients will still hold on the outdated metadata.
77/// In version 4 of CQL protocol there is currently no way for the server to notify other
78/// clients about prepared statement's metadata update.
79///
80/// ### Client-side metadata immutability
81/// The decision was made to keep client-side metadata immutable.
82/// Mainly because of the CQLv4 limitations mentioned above. This means
83/// that metadata is not updated during statement repreparation.
84/// This raises two issues:
85/// * bound values serialization errors - since [`PreparedMetadata`] is not updated
86/// * result deserialization errors - when [`PreparedStatement::set_use_cached_result_metadata`] is enabled,
87///   since [`ResultMetadata`] is not updated
88///
89/// So, to mitigate those issues, drop the outdated [`PreparedStatement`] manually
90/// and prepare it again against the new schema.
91#[derive(Debug)]
92pub struct PreparedStatement {
93    pub(crate) config: StatementConfig,
94    pub prepare_tracing_ids: Vec<Uuid>,
95
96    id: Bytes,
97    shared: Arc<PreparedStatementSharedData>,
98    page_size: PageSize,
99    partitioner_name: PartitionerName,
100    is_confirmed_lwt: bool,
101}
102
103#[derive(Debug)]
104struct PreparedStatementSharedData {
105    metadata: PreparedMetadata,
106    result_metadata: Arc<ResultMetadata<'static>>,
107    statement: String,
108}
109
110impl Clone for PreparedStatement {
111    fn clone(&self) -> Self {
112        Self {
113            config: self.config.clone(),
114            prepare_tracing_ids: Vec::new(),
115            id: self.id.clone(),
116            shared: self.shared.clone(),
117            page_size: self.page_size,
118            partitioner_name: self.partitioner_name.clone(),
119            is_confirmed_lwt: self.is_confirmed_lwt,
120        }
121    }
122}
123
124impl PreparedStatement {
125    pub(crate) fn new(
126        id: Bytes,
127        is_lwt: bool,
128        metadata: PreparedMetadata,
129        result_metadata: Arc<ResultMetadata<'static>>,
130        statement: String,
131        page_size: PageSize,
132        config: StatementConfig,
133    ) -> Self {
134        Self {
135            id,
136            shared: Arc::new(PreparedStatementSharedData {
137                metadata,
138                result_metadata,
139                statement,
140            }),
141            prepare_tracing_ids: Vec::new(),
142            page_size,
143            config,
144            partitioner_name: Default::default(),
145            is_confirmed_lwt: is_lwt,
146        }
147    }
148
149    pub fn get_id(&self) -> &Bytes {
150        &self.id
151    }
152
153    pub fn get_statement(&self) -> &str {
154        &self.shared.statement
155    }
156
157    /// Sets the page size for this CQL query.
158    ///
159    /// Panics if given number is nonpositive.
160    pub fn set_page_size(&mut self, page_size: i32) {
161        self.page_size = page_size
162            .try_into()
163            .unwrap_or_else(|err| panic!("PreparedStatement::set_page_size: {err}"));
164    }
165
166    /// Returns the page size for this CQL query.
167    pub(crate) fn get_validated_page_size(&self) -> PageSize {
168        self.page_size
169    }
170
171    /// Returns the page size for this CQL query.
172    pub fn get_page_size(&self) -> i32 {
173        self.page_size.inner()
174    }
175
176    /// Gets tracing ids of queries used to prepare this statement
177    pub fn get_prepare_tracing_ids(&self) -> &[Uuid] {
178        &self.prepare_tracing_ids
179    }
180
181    /// Returns true if the prepared statement has necessary information
182    /// to be routed in a token-aware manner. If false, the query
183    /// will always be sent to a random node/shard.
184    pub fn is_token_aware(&self) -> bool {
185        !self.get_prepared_metadata().pk_indexes.is_empty()
186    }
187
188    /// Returns true if it is known that the prepared statement contains
189    /// a Lightweight Transaction. If so, the optimisation can be performed:
190    /// the query should be routed to the replicas in a predefined order
191    /// (i. e. always try first to contact replica A, then B if it fails,
192    /// then C, etc.). If false, the query should be routed normally.
193    /// Note: this a Scylla-specific optimisation. Therefore, the result
194    /// will be always false for Cassandra.
195    pub fn is_confirmed_lwt(&self) -> bool {
196        self.is_confirmed_lwt
197    }
198
199    /// Computes the partition key of the target table from given values —
200    /// it assumes that all partition key columns are passed in values.
201    /// Partition keys have specific serialization rules.
202    /// Ref: <https://github.com/scylladb/scylla/blob/40adf38915b6d8f5314c621a94d694d172360833/compound_compat.hh#L33-L47>
203    ///
204    /// Note: this is no longer required to compute a token. This is because partitioners
205    /// are now stateful and stream-based, so they do not require materialised partition key.
206    /// Therefore, for computing a token, there are more efficient methods, such as
207    /// [Self::calculate_token()].
208    pub fn compute_partition_key(
209        &self,
210        bound_values: &impl SerializeRow,
211    ) -> Result<Bytes, PartitionKeyError> {
212        let serialized = self.serialize_values(bound_values)?;
213        let partition_key = self.extract_partition_key(&serialized)?;
214        let mut buf = BytesMut::new();
215        let mut writer = |chunk: &[u8]| buf.extend_from_slice(chunk);
216
217        partition_key.write_encoded_partition_key(&mut writer)?;
218
219        Ok(buf.freeze())
220    }
221
222    /// Determines which values constitute the partition key and puts them in order.
223    ///
224    /// This is a preparation step necessary for calculating token based on a prepared statement.
225    pub(crate) fn extract_partition_key<'ps>(
226        &'ps self,
227        bound_values: &'ps SerializedValues,
228    ) -> Result<PartitionKey<'ps>, PartitionKeyExtractionError> {
229        PartitionKey::new(self.get_prepared_metadata(), bound_values)
230    }
231
232    pub(crate) fn extract_partition_key_and_calculate_token<'ps>(
233        &'ps self,
234        partitioner_name: &'ps PartitionerName,
235        serialized_values: &'ps SerializedValues,
236    ) -> Result<Option<(PartitionKey<'ps>, Token)>, PartitionKeyError> {
237        if !self.is_token_aware() {
238            return Ok(None);
239        }
240
241        let partition_key = self.extract_partition_key(serialized_values)?;
242        let token = partition_key.calculate_token(partitioner_name)?;
243
244        Ok(Some((partition_key, token)))
245    }
246
247    /// Calculates the token for given prepared statement and values.
248    ///
249    /// Returns the token that would be computed for executing the provided
250    /// prepared statement with the provided values.
251    // As this function creates a `PartitionKey`, it is intended rather for external usage (by users).
252    // For internal purposes, `PartitionKey::calculate_token()` is preferred, as `PartitionKey`
253    // is either way used internally, among others for display in traces.
254    pub fn calculate_token(
255        &self,
256        values: &impl SerializeRow,
257    ) -> Result<Option<Token>, PartitionKeyError> {
258        self.calculate_token_untyped(&self.serialize_values(values)?)
259    }
260
261    // A version of calculate_token which skips serialization and uses SerializedValues directly.
262    // Not type-safe, so not exposed to users.
263    pub(crate) fn calculate_token_untyped(
264        &self,
265        values: &SerializedValues,
266    ) -> Result<Option<Token>, PartitionKeyError> {
267        self.extract_partition_key_and_calculate_token(&self.partitioner_name, values)
268            .map(|opt| opt.map(|(_pk, token)| token))
269    }
270
271    /// Return keyspace name and table name this statement is operating on.
272    pub fn get_table_spec(&self) -> Option<&TableSpec> {
273        self.get_prepared_metadata()
274            .col_specs
275            .first()
276            .map(|spec| spec.table_spec())
277    }
278
279    /// Returns the name of the keyspace this statement is operating on.
280    pub fn get_keyspace_name(&self) -> Option<&str> {
281        self.get_prepared_metadata()
282            .col_specs
283            .first()
284            .map(|col_spec| col_spec.table_spec().ks_name())
285    }
286
287    /// Returns the name of the table this statement is operating on.
288    pub fn get_table_name(&self) -> Option<&str> {
289        self.get_prepared_metadata()
290            .col_specs
291            .first()
292            .map(|col_spec| col_spec.table_spec().table_name())
293    }
294
295    /// Sets the consistency to be used when executing this statement.
296    pub fn set_consistency(&mut self, c: Consistency) {
297        self.config.consistency = Some(c);
298    }
299
300    /// Gets the consistency to be used when executing this prepared statement if it is filled.
301    /// If this is empty, the default_consistency of the session will be used.
302    pub fn get_consistency(&self) -> Option<Consistency> {
303        self.config.consistency
304    }
305
306    /// Sets the serial consistency to be used when executing this statement.
307    /// (Ignored unless the statement is an LWT)
308    pub fn set_serial_consistency(&mut self, sc: Option<SerialConsistency>) {
309        self.config.serial_consistency = Some(sc);
310    }
311
312    /// Gets the serial consistency to be used when executing this statement.
313    /// (Ignored unless the statement is an LWT)
314    pub fn get_serial_consistency(&self) -> Option<SerialConsistency> {
315        self.config.serial_consistency.flatten()
316    }
317
318    /// Sets the idempotence of this statement
319    /// A query is idempotent if it can be applied multiple times without changing the result of the initial application
320    /// If set to `true` we can be sure that it is idempotent
321    /// If set to `false` it is unknown whether it is idempotent
322    /// This is used in [`RetryPolicy`] to decide if retrying a query is safe
323    pub fn set_is_idempotent(&mut self, is_idempotent: bool) {
324        self.config.is_idempotent = is_idempotent;
325    }
326
327    /// Gets the idempotence of this statement
328    pub fn get_is_idempotent(&self) -> bool {
329        self.config.is_idempotent
330    }
331
332    /// Enable or disable CQL Tracing for this statement
333    /// If enabled session.execute() will return a QueryResult containing tracing_id
334    /// which can be used to query tracing information about the execution of this query
335    pub fn set_tracing(&mut self, should_trace: bool) {
336        self.config.tracing = should_trace;
337    }
338
339    /// Gets whether tracing is enabled for this statement
340    pub fn get_tracing(&self) -> bool {
341        self.config.tracing
342    }
343
344    /// Make use of cached metadata to decode results
345    /// of the statement's execution.
346    ///
347    /// If true, the driver will request the server not to
348    /// attach the result metadata in response to the statement execution.
349    ///
350    /// The driver will cache the result metadata received from the server
351    /// after statement preparation and will use it
352    /// to deserialize the results of statement execution.
353    ///
354    /// This option is false by default.
355    pub fn set_use_cached_result_metadata(&mut self, use_cached_metadata: bool) {
356        self.config.skip_result_metadata = use_cached_metadata;
357    }
358
359    /// Gets the information whether the driver uses cached metadata
360    /// to decode the results of the statement's execution.
361    pub fn get_use_cached_result_metadata(&self) -> bool {
362        self.config.skip_result_metadata
363    }
364
365    /// Sets the default timestamp for this statement in microseconds.
366    /// If not None, it will replace the server side assigned timestamp as default timestamp
367    /// If a statement contains a `USING TIMESTAMP` clause, calling this method won't change
368    /// anything
369    pub fn set_timestamp(&mut self, timestamp: Option<i64>) {
370        self.config.timestamp = timestamp
371    }
372
373    /// Gets the default timestamp for this statement in microseconds.
374    pub fn get_timestamp(&self) -> Option<i64> {
375        self.config.timestamp
376    }
377
378    /// Sets the client-side timeout for this statement.
379    /// If not None, the driver will stop waiting for the request
380    /// to finish after `timeout` passed.
381    /// Otherwise, default session client timeout will be applied.
382    pub fn set_request_timeout(&mut self, timeout: Option<Duration>) {
383        self.config.request_timeout = timeout
384    }
385
386    /// Gets client timeout associated with this query
387    pub fn get_request_timeout(&self) -> Option<Duration> {
388        self.config.request_timeout
389    }
390
391    /// Sets the name of the partitioner used for this statement.
392    pub(crate) fn set_partitioner_name(&mut self, partitioner_name: PartitionerName) {
393        self.partitioner_name = partitioner_name;
394    }
395
396    /// Access metadata about the bind variables of this statement as returned by the database
397    pub(crate) fn get_prepared_metadata(&self) -> &PreparedMetadata {
398        &self.shared.metadata
399    }
400
401    /// Access column specifications of the bind variables of this statement
402    pub fn get_variable_col_specs(&self) -> ColumnSpecs<'_, 'static> {
403        ColumnSpecs::new(&self.shared.metadata.col_specs)
404    }
405
406    /// Access info about partition key indexes of the bind variables of this statement
407    pub fn get_variable_pk_indexes(&self) -> &[PartitionKeyIndex] {
408        &self.shared.metadata.pk_indexes
409    }
410
411    /// Access metadata about the result of prepared statement returned by the database
412    pub(crate) fn get_result_metadata(&self) -> &Arc<ResultMetadata<'static>> {
413        &self.shared.result_metadata
414    }
415
416    /// Access column specifications of the result set returned after the execution of this statement
417    pub fn get_result_set_col_specs(&self) -> ColumnSpecs<'_, 'static> {
418        ColumnSpecs::new(self.shared.result_metadata.col_specs())
419    }
420
421    /// Get the name of the partitioner used for this statement.
422    pub fn get_partitioner_name(&self) -> &PartitionerName {
423        &self.partitioner_name
424    }
425
426    /// Set the retry policy for this statement, overriding the one from execution profile if not None.
427    #[inline]
428    pub fn set_retry_policy(&mut self, retry_policy: Option<Arc<dyn RetryPolicy>>) {
429        self.config.retry_policy = retry_policy;
430    }
431
432    /// Get the retry policy set for the statement.
433    #[inline]
434    pub fn get_retry_policy(&self) -> Option<&Arc<dyn RetryPolicy>> {
435        self.config.retry_policy.as_ref()
436    }
437
438    /// Sets the listener capable of listening what happens during query execution.
439    pub fn set_history_listener(&mut self, history_listener: Arc<dyn HistoryListener>) {
440        self.config.history_listener = Some(history_listener);
441    }
442
443    /// Removes the listener set by `set_history_listener`.
444    pub fn remove_history_listener(&mut self) -> Option<Arc<dyn HistoryListener>> {
445        self.config.history_listener.take()
446    }
447
448    /// Associates the query with execution profile referred by the provided handle.
449    /// Handle may be later remapped to another profile, and query will reflect those changes.
450    pub fn set_execution_profile_handle(&mut self, profile_handle: Option<ExecutionProfileHandle>) {
451        self.config.execution_profile_handle = profile_handle;
452    }
453
454    /// Borrows the execution profile handle associated with this query.
455    pub fn get_execution_profile_handle(&self) -> Option<&ExecutionProfileHandle> {
456        self.config.execution_profile_handle.as_ref()
457    }
458
459    pub(crate) fn serialize_values(
460        &self,
461        values: &impl SerializeRow,
462    ) -> Result<SerializedValues, SerializationError> {
463        let ctx = RowSerializationContext::from_prepared(self.get_prepared_metadata());
464        SerializedValues::from_serializable(&ctx, values)
465    }
466}
467
468#[derive(Clone, Debug, Error, PartialEq, Eq, PartialOrd, Ord)]
469#[non_exhaustive]
470pub enum PartitionKeyExtractionError {
471    #[error("No value with given pk_index! pk_index: {0}, values.len(): {1}")]
472    NoPkIndexValue(u16, u16),
473}
474
475#[derive(Clone, Debug, Error, PartialEq, Eq, PartialOrd, Ord)]
476#[non_exhaustive]
477pub enum TokenCalculationError {
478    #[error("Value bytes too long to create partition key, max 65 535 allowed! value.len(): {0}")]
479    ValueTooLong(usize),
480}
481
482/// An error returned by [`PreparedStatement::compute_partition_key()`].
483#[derive(Clone, Debug, Error)]
484#[non_exhaustive]
485pub enum PartitionKeyError {
486    /// Failed to extract partition key.
487    #[error(transparent)]
488    PartitionKeyExtraction(#[from] PartitionKeyExtractionError),
489
490    /// Failed to calculate token.
491    #[error(transparent)]
492    TokenCalculation(#[from] TokenCalculationError),
493
494    /// Failed to serialize values required to compute partition key.
495    #[error(transparent)]
496    Serialization(#[from] SerializationError),
497}
498
499impl PartitionKeyError {
500    /// Converts the error to [`ExecutionError`].
501    pub fn into_execution_error(self) -> ExecutionError {
502        match self {
503            PartitionKeyError::PartitionKeyExtraction(_) => {
504                ExecutionError::BadQuery(BadQuery::PartitionKeyExtraction)
505            }
506            PartitionKeyError::TokenCalculation(TokenCalculationError::ValueTooLong(
507                values_len,
508            )) => {
509                ExecutionError::BadQuery(BadQuery::ValuesTooLongForKey(values_len, u16::MAX.into()))
510            }
511            PartitionKeyError::Serialization(err) => {
512                ExecutionError::BadQuery(BadQuery::SerializationError(err))
513            }
514        }
515    }
516}
517
518pub(crate) type PartitionKeyValue<'ps> = (&'ps [u8], &'ps ColumnSpec<'ps>);
519
520pub(crate) struct PartitionKey<'ps> {
521    pk_values: SmallVec<[Option<PartitionKeyValue<'ps>>; PartitionKey::SMALLVEC_ON_STACK_SIZE]>,
522}
523
524impl<'ps> PartitionKey<'ps> {
525    const SMALLVEC_ON_STACK_SIZE: usize = 8;
526
527    fn new(
528        prepared_metadata: &'ps PreparedMetadata,
529        bound_values: &'ps SerializedValues,
530    ) -> Result<Self, PartitionKeyExtractionError> {
531        // Iterate on values using sorted pk_indexes (see deser_prepared_metadata),
532        // and use PartitionKeyIndex.sequence to insert the value in pk_values with the correct order.
533        let mut pk_values: SmallVec<[_; PartitionKey::SMALLVEC_ON_STACK_SIZE]> =
534            smallvec![None; prepared_metadata.pk_indexes.len()];
535        let mut values_iter = bound_values.iter();
536        // pk_indexes contains the indexes of the partition key value, so the current offset of the
537        // iterator must be kept, in order to compute the next position of the pk in the iterator.
538        // At each iteration values_iter.nth(0) will roughly correspond to values[values_iter_offset],
539        // so values[pk_index.index] will be retrieved with values_iter.nth(pk_index.index - values_iter_offset)
540        let mut values_iter_offset = 0;
541        for pk_index in prepared_metadata.pk_indexes.iter().copied() {
542            // Find value matching current pk_index
543            let next_val = values_iter
544                .nth((pk_index.index - values_iter_offset) as usize)
545                .ok_or_else(|| {
546                    PartitionKeyExtractionError::NoPkIndexValue(
547                        pk_index.index,
548                        bound_values.element_count(),
549                    )
550                })?;
551            // Add it in sequence order to pk_values
552            if let RawValue::Value(v) = next_val {
553                let spec = &prepared_metadata.col_specs[pk_index.index as usize];
554                pk_values[pk_index.sequence as usize] = Some((v, spec));
555            }
556            values_iter_offset = pk_index.index + 1;
557        }
558        Ok(Self { pk_values })
559    }
560
561    pub(crate) fn iter(&self) -> impl Iterator<Item = PartitionKeyValue<'ps>> + Clone + '_ {
562        self.pk_values.iter().flatten().copied()
563    }
564
565    fn write_encoded_partition_key(
566        &self,
567        writer: &mut impl FnMut(&[u8]),
568    ) -> Result<(), TokenCalculationError> {
569        let mut pk_val_iter = self.iter().map(|(val, _spec)| val);
570        if let Some(first_value) = pk_val_iter.next() {
571            if let Some(second_value) = pk_val_iter.next() {
572                // Composite partition key case
573                for value in std::iter::once(first_value)
574                    .chain(std::iter::once(second_value))
575                    .chain(pk_val_iter)
576                {
577                    let v_len_u16: u16 = value
578                        .len()
579                        .try_into()
580                        .map_err(|_| TokenCalculationError::ValueTooLong(value.len()))?;
581                    writer(&v_len_u16.to_be_bytes());
582                    writer(value);
583                    writer(&[0u8]);
584                }
585            } else {
586                // Single-value partition key case
587                writer(first_value);
588            }
589        }
590        Ok(())
591    }
592
593    pub(crate) fn calculate_token(
594        &self,
595        partitioner_name: &PartitionerName,
596    ) -> Result<Token, TokenCalculationError> {
597        let mut partitioner_hasher = partitioner_name.build_hasher();
598        let mut writer = |chunk: &[u8]| partitioner_hasher.write(chunk);
599
600        self.write_encoded_partition_key(&mut writer)?;
601
602        Ok(partitioner_hasher.finish())
603    }
604}
605
606#[cfg(test)]
607mod tests {
608    use scylla_cql::frame::response::result::{
609        ColumnSpec, ColumnType, NativeType, PartitionKeyIndex, PreparedMetadata, TableSpec,
610    };
611    use scylla_cql::serialize::row::SerializedValues;
612
613    use crate::statement::prepared::PartitionKey;
614    use crate::test_utils::setup_tracing;
615
616    fn make_meta(
617        cols: impl IntoIterator<Item = ColumnType<'static>>,
618        idx: impl IntoIterator<Item = usize>,
619    ) -> PreparedMetadata {
620        let table_spec = TableSpec::owned("ks".to_owned(), "t".to_owned());
621        let col_specs: Vec<_> = cols
622            .into_iter()
623            .enumerate()
624            .map(|(i, typ)| ColumnSpec::owned(format!("col_{}", i), typ, table_spec.clone()))
625            .collect();
626        let mut pk_indexes = idx
627            .into_iter()
628            .enumerate()
629            .map(|(sequence, index)| PartitionKeyIndex {
630                index: index as u16,
631                sequence: sequence as u16,
632            })
633            .collect::<Vec<_>>();
634        pk_indexes.sort_unstable_by_key(|pki| pki.index);
635        PreparedMetadata {
636            flags: 0,
637            col_count: col_specs.len(),
638            col_specs,
639            pk_indexes,
640        }
641    }
642
643    #[test]
644    fn test_partition_key_multiple_columns_shuffled() {
645        setup_tracing();
646        let meta = make_meta(
647            [
648                ColumnType::Native(NativeType::TinyInt),
649                ColumnType::Native(NativeType::SmallInt),
650                ColumnType::Native(NativeType::Int),
651                ColumnType::Native(NativeType::BigInt),
652                ColumnType::Native(NativeType::Blob),
653            ],
654            [4, 0, 3],
655        );
656        let mut values = SerializedValues::new();
657        values
658            .add_value(&67i8, &ColumnType::Native(NativeType::TinyInt))
659            .unwrap();
660        values
661            .add_value(&42i16, &ColumnType::Native(NativeType::SmallInt))
662            .unwrap();
663        values
664            .add_value(&23i32, &ColumnType::Native(NativeType::Int))
665            .unwrap();
666        values
667            .add_value(&89i64, &ColumnType::Native(NativeType::BigInt))
668            .unwrap();
669        values
670            .add_value(&[1u8, 2, 3, 4, 5], &ColumnType::Native(NativeType::Blob))
671            .unwrap();
672
673        let pk = PartitionKey::new(&meta, &values).unwrap();
674        let pk_cols = Vec::from_iter(pk.iter());
675        assert_eq!(
676            pk_cols,
677            vec![
678                ([1u8, 2, 3, 4, 5].as_slice(), &meta.col_specs[4]),
679                (67i8.to_be_bytes().as_ref(), &meta.col_specs[0]),
680                (89i64.to_be_bytes().as_ref(), &meta.col_specs[3]),
681            ]
682        );
683    }
684}