scylla/cluster/
metadata.rs

1//! This module holds entities that represent the cluster metadata,
2//! which includes:
3//! - topology metadata:
4//!   - [Peer],
5//! - schema metadata:
6//!   - [Keyspace],
7//!   - [Strategy] - replication strategy employed by a keyspace,
8//!   - [Table],
9//!   - [Column],
10//!   - [ColumnKind],
11//!   - [MaterializedView],
12//!   - CQL types (re-exported from scylla-cql):
13//!     - [ColumnType],
14//!     - [NativeType],
15//!     - [UserDefinedType],
16//!     - [CollectionType],
17//!
18
19use crate::client::pager::{NextPageError, NextRowError, QueryPager};
20use crate::cluster::node::resolve_contact_points;
21use crate::deserialize::DeserializeOwnedRow;
22use crate::errors::{
23    DbError, MetadataFetchError, MetadataFetchErrorKind, NewSessionError, RequestAttemptError,
24};
25use crate::frame::response::event::Event;
26use crate::network::{ConnectionConfig, NodeConnectionPool, PoolConfig, PoolSize};
27#[cfg(feature = "metrics")]
28use crate::observability::metrics::Metrics;
29use crate::policies::host_filter::HostFilter;
30use crate::routing::Token;
31use crate::statement::unprepared::Statement;
32use crate::utils::parse::{ParseErrorCause, ParseResult, ParserState};
33use crate::DeserializeRow;
34
35use futures::future::{self, FutureExt};
36use futures::stream::{self, StreamExt, TryStreamExt};
37use futures::Stream;
38use itertools::Itertools;
39use rand::seq::{IndexedRandom, SliceRandom};
40use rand::{rng, Rng};
41use scylla_cql::frame::response::result::{ColumnSpec, TableSpec};
42use std::borrow::BorrowMut;
43use std::cell::Cell;
44use std::collections::HashMap;
45use std::fmt::{self, Formatter};
46use std::net::{IpAddr, SocketAddr};
47use std::num::NonZeroUsize;
48use std::str::FromStr;
49use std::sync::Arc;
50use std::time::{Duration, Instant};
51use thiserror::Error;
52use tokio::sync::{broadcast, mpsc};
53use tracing::{debug, error, trace, warn};
54use uuid::Uuid;
55
56use crate::cluster::node::{InternalKnownNode, NodeAddr, ResolvedContactPoint};
57use crate::errors::{
58    KeyspaceStrategyError, KeyspacesMetadataError, MetadataError, PeersMetadataError, RequestError,
59    TablesMetadataError, UdtMetadataError,
60};
61
62// Re-export of CQL types.
63pub use scylla_cql::frame::response::result::{
64    CollectionType, ColumnType, NativeType, UserDefinedType,
65};
66
67use super::control_connection::ControlConnection;
68
69type PerKeyspace<T> = HashMap<String, T>;
70type PerKeyspaceResult<T, E> = PerKeyspace<Result<T, E>>;
71type PerTable<T> = HashMap<String, T>;
72type PerKsTable<T> = HashMap<(String, String), T>;
73type PerKsTableResult<T, E> = PerKsTable<Result<T, E>>;
74
75/// Indicates that reading metadata failed, but in a way
76/// that we can handle, by throwing out data for a keyspace.
77/// It is possible that some of the errors could be handled in even
78/// more granular way (e.g. throwing out a single table), but keyspace
79/// granularity seems like a good choice given how independent keyspaces
80/// are from each other.
81#[derive(Clone, Debug, Error)]
82pub(crate) enum SingleKeyspaceMetadataError {
83    #[error(transparent)]
84    MissingUDT(MissingUserDefinedType),
85    #[error("Partition key column with position {0} is missing from metadata")]
86    IncompletePartitionKey(i32),
87    #[error("Clustering key column with position {0} is missing from metadata")]
88    IncompleteClusteringKey(i32),
89}
90
91/// Allows to read current metadata from the cluster
92pub(crate) struct MetadataReader {
93    control_connection_pool_config: PoolConfig,
94    request_serverside_timeout: Option<Duration>,
95
96    control_connection_endpoint: UntranslatedEndpoint,
97    control_connection: NodeConnectionPool,
98
99    // when control connection fails, MetadataReader tries to connect to one of known_peers
100    known_peers: Vec<UntranslatedEndpoint>,
101    keyspaces_to_fetch: Vec<String>,
102    fetch_schema: bool,
103    host_filter: Option<Arc<dyn HostFilter>>,
104
105    // When no known peer is reachable, initial known nodes are resolved once again as a fallback
106    // and establishing control connection to them is attempted.
107    initial_known_nodes: Vec<InternalKnownNode>,
108
109    // When a control connection breaks, the PoolRefiller of its pool uses the requester
110    // to signal ClusterWorker that an immediate metadata refresh is advisable.
111    control_connection_repair_requester: broadcast::Sender<()>,
112
113    #[cfg(feature = "metrics")]
114    metrics: Arc<Metrics>,
115}
116
117/// Describes all metadata retrieved from the cluster
118pub(crate) struct Metadata {
119    pub(crate) peers: Vec<Peer>,
120    pub(crate) keyspaces: HashMap<String, Result<Keyspace, SingleKeyspaceMetadataError>>,
121}
122
123#[non_exhaustive] // <- so that we can add more fields in a backwards-compatible way
124pub struct Peer {
125    pub host_id: Uuid,
126    pub address: NodeAddr,
127    pub tokens: Vec<Token>,
128    pub datacenter: Option<String>,
129    pub rack: Option<String>,
130}
131
132/// An endpoint for a node that the driver is to issue connections to,
133/// possibly after prior address translation.
134#[derive(Clone, Debug)]
135pub(crate) enum UntranslatedEndpoint {
136    /// Provided by user in SessionConfig (initial contact points).
137    ContactPoint(ResolvedContactPoint),
138    /// Fetched in Metadata with `query_peers()`
139    Peer(PeerEndpoint),
140}
141
142impl UntranslatedEndpoint {
143    pub(crate) fn address(&self) -> NodeAddr {
144        match *self {
145            UntranslatedEndpoint::ContactPoint(ResolvedContactPoint { address, .. }) => {
146                NodeAddr::Untranslatable(address)
147            }
148            UntranslatedEndpoint::Peer(PeerEndpoint { address, .. }) => address,
149        }
150    }
151    pub(crate) fn set_port(&mut self, port: u16) {
152        let inner_addr = match self {
153            UntranslatedEndpoint::ContactPoint(ResolvedContactPoint { address, .. }) => address,
154            UntranslatedEndpoint::Peer(PeerEndpoint { address, .. }) => address.inner_mut(),
155        };
156        inner_addr.set_port(port);
157    }
158}
159
160/// Data used to issue connections to a node.
161///
162/// Fetched from the cluster in Metadata.
163#[derive(Clone, Debug)]
164pub(crate) struct PeerEndpoint {
165    pub(crate) host_id: Uuid,
166    pub(crate) address: NodeAddr,
167    pub(crate) datacenter: Option<String>,
168    pub(crate) rack: Option<String>,
169}
170
171impl Peer {
172    pub(crate) fn to_peer_endpoint(&self) -> PeerEndpoint {
173        PeerEndpoint {
174            host_id: self.host_id,
175            address: self.address,
176            datacenter: self.datacenter.clone(),
177            rack: self.rack.clone(),
178        }
179    }
180
181    pub(crate) fn into_peer_endpoint_and_tokens(self) -> (PeerEndpoint, Vec<Token>) {
182        (
183            PeerEndpoint {
184                host_id: self.host_id,
185                address: self.address,
186                datacenter: self.datacenter,
187                rack: self.rack,
188            },
189            self.tokens,
190        )
191    }
192}
193
194/// Describes a keyspace in the cluster.
195#[derive(Clone, Debug, PartialEq, Eq)]
196#[non_exhaustive]
197pub struct Keyspace {
198    pub strategy: Strategy,
199    /// Empty HashMap may as well mean that the client disabled schema fetching in SessionConfig
200    pub tables: HashMap<String, Table>,
201    /// Empty HashMap may as well mean that the client disabled schema fetching in SessionConfig
202    pub views: HashMap<String, MaterializedView>,
203    /// Empty HashMap may as well mean that the client disabled schema fetching in SessionConfig
204    pub user_defined_types: HashMap<String, Arc<UserDefinedType<'static>>>,
205}
206
207/// Describes a table in the cluster.
208#[derive(Clone, Debug, PartialEq, Eq)]
209#[non_exhaustive]
210pub struct Table {
211    pub columns: HashMap<String, Column>,
212    /// Names of the column of partition key.
213    /// All of the names are guaranteed to be present in `columns` field.
214    pub partition_key: Vec<String>,
215    /// Names of the column of clustering key.
216    /// All of the names are guaranteed to be present in `columns` field.
217    pub clustering_key: Vec<String>,
218    pub partitioner: Option<String>,
219    pub(crate) pk_column_specs: Vec<ColumnSpec<'static>>,
220}
221
222/// Describes a materialized view in the cluster.
223#[derive(Clone, Debug, PartialEq, Eq)]
224#[non_exhaustive]
225pub struct MaterializedView {
226    pub view_metadata: Table,
227    pub base_table_name: String,
228}
229
230/// Describes a column of the table.
231#[derive(Clone, Debug, PartialEq, Eq)]
232#[non_exhaustive]
233pub struct Column {
234    pub typ: ColumnType<'static>,
235    pub kind: ColumnKind,
236}
237
238#[derive(Clone, Debug, PartialEq, Eq)]
239enum PreColumnType {
240    Native(NativeType),
241    Collection {
242        frozen: bool,
243        typ: PreCollectionType,
244    },
245    Tuple(Vec<PreColumnType>),
246    Vector {
247        typ: Box<PreColumnType>,
248        dimensions: u16,
249    },
250    UserDefinedType {
251        frozen: bool,
252        name: String,
253    },
254}
255
256impl PreColumnType {
257    pub(crate) fn into_cql_type(
258        self,
259        keyspace_name: &String,
260        keyspace_udts: &PerTable<Arc<UserDefinedType<'static>>>,
261    ) -> Result<ColumnType<'static>, MissingUserDefinedType> {
262        match self {
263            PreColumnType::Native(n) => Ok(ColumnType::Native(n)),
264            PreColumnType::Collection { frozen, typ: type_ } => type_
265                .into_collection_type(keyspace_name, keyspace_udts)
266                .map(|inner| ColumnType::Collection { frozen, typ: inner }),
267            PreColumnType::Tuple(t) => t
268                .into_iter()
269                .map(|t| t.into_cql_type(keyspace_name, keyspace_udts))
270                .collect::<Result<Vec<ColumnType>, MissingUserDefinedType>>()
271                .map(ColumnType::Tuple),
272            PreColumnType::Vector {
273                typ: type_,
274                dimensions,
275            } => type_
276                .into_cql_type(keyspace_name, keyspace_udts)
277                .map(|inner| ColumnType::Vector {
278                    typ: Box::new(inner),
279                    dimensions,
280                }),
281            PreColumnType::UserDefinedType { frozen, name } => {
282                let definition = match keyspace_udts.get(&name) {
283                    Some(def) => def.clone(),
284                    None => {
285                        return Err(MissingUserDefinedType {
286                            name,
287                            keyspace: keyspace_name.clone(),
288                        })
289                    }
290                };
291                Ok(ColumnType::UserDefinedType { frozen, definition })
292            }
293        }
294    }
295}
296
297/// Represents a user defined type whose definition is missing from the metadata.
298#[derive(Clone, Debug, Error)]
299#[error("Missing UDT: {keyspace}, {name}")]
300pub(crate) struct MissingUserDefinedType {
301    name: String,
302    keyspace: String,
303}
304
305#[derive(Clone, Debug, PartialEq, Eq)]
306enum PreCollectionType {
307    List(Box<PreColumnType>),
308    Map(Box<PreColumnType>, Box<PreColumnType>),
309    Set(Box<PreColumnType>),
310}
311
312impl PreCollectionType {
313    pub(crate) fn into_collection_type(
314        self,
315        keyspace_name: &String,
316        keyspace_udts: &PerTable<Arc<UserDefinedType<'static>>>,
317    ) -> Result<CollectionType<'static>, MissingUserDefinedType> {
318        match self {
319            PreCollectionType::List(t) => t
320                .into_cql_type(keyspace_name, keyspace_udts)
321                .map(|inner| CollectionType::List(Box::new(inner))),
322            PreCollectionType::Map(tk, tv) => Ok(CollectionType::Map(
323                Box::new(tk.into_cql_type(keyspace_name, keyspace_udts)?),
324                Box::new(tv.into_cql_type(keyspace_name, keyspace_udts)?),
325            )),
326            PreCollectionType::Set(t) => t
327                .into_cql_type(keyspace_name, keyspace_udts)
328                .map(|inner| CollectionType::Set(Box::new(inner))),
329        }
330    }
331}
332
333#[derive(Clone, Debug, PartialEq, Eq)]
334#[non_exhaustive]
335pub enum ColumnKind {
336    Regular,
337    Static,
338    Clustering,
339    PartitionKey,
340}
341
342/// [ColumnKind] parse error
343#[derive(Clone, Copy, Debug, PartialEq, Eq)]
344pub struct ColumnKindFromStrError;
345
346impl std::str::FromStr for ColumnKind {
347    type Err = ColumnKindFromStrError;
348
349    fn from_str(s: &str) -> Result<Self, Self::Err> {
350        match s {
351            "regular" => Ok(Self::Regular),
352            "static" => Ok(Self::Static),
353            "clustering" => Ok(Self::Clustering),
354            "partition_key" => Ok(Self::PartitionKey),
355            _ => Err(ColumnKindFromStrError),
356        }
357    }
358}
359
360#[derive(Clone, Debug, PartialEq, Eq)]
361#[non_exhaustive]
362#[allow(clippy::enum_variant_names)]
363pub enum Strategy {
364    SimpleStrategy {
365        replication_factor: usize,
366    },
367    NetworkTopologyStrategy {
368        // Replication factors of datacenters with given names
369        datacenter_repfactors: HashMap<String, usize>,
370    },
371    LocalStrategy, // replication_factor == 1
372    Other {
373        name: String,
374        data: HashMap<String, String>,
375    },
376}
377
378#[derive(Clone, Debug)]
379struct InvalidCqlType {
380    typ: String,
381    position: usize,
382    reason: String,
383}
384
385impl fmt::Display for InvalidCqlType {
386    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
387        write!(f, "{:?}", self)
388    }
389}
390
391impl Metadata {
392    /// Creates new, dummy metadata from a given list of peers.
393    ///
394    /// It can be used as a replacement for real metadata when initial
395    /// metadata read fails.
396    pub(crate) fn new_dummy(initial_peers: &[UntranslatedEndpoint]) -> Self {
397        let peers = initial_peers
398            .iter()
399            .enumerate()
400            .map(|(id, endpoint)| {
401                // Given N nodes, divide the ring into N roughly equal parts
402                // and assign them to each node.
403                let token = ((id as u128) << 64) / initial_peers.len() as u128;
404
405                Peer {
406                    address: endpoint.address(),
407                    tokens: vec![Token::new(token as i64)],
408                    datacenter: None,
409                    rack: None,
410                    host_id: Uuid::new_v4(),
411                }
412            })
413            .collect();
414
415        Metadata {
416            peers,
417            keyspaces: HashMap::new(),
418        }
419    }
420}
421
422impl MetadataReader {
423    /// Creates new MetadataReader, which connects to initially_known_peers in the background
424    #[allow(clippy::too_many_arguments)]
425    pub(crate) async fn new(
426        initial_known_nodes: Vec<InternalKnownNode>,
427        control_connection_repair_requester: broadcast::Sender<()>,
428        mut connection_config: ConnectionConfig,
429        request_serverside_timeout: Option<Duration>,
430        server_event_sender: mpsc::Sender<Event>,
431        keyspaces_to_fetch: Vec<String>,
432        fetch_schema: bool,
433        host_filter: &Option<Arc<dyn HostFilter>>,
434        #[cfg(feature = "metrics")] metrics: Arc<Metrics>,
435    ) -> Result<Self, NewSessionError> {
436        let (initial_peers, resolved_hostnames) =
437            resolve_contact_points(&initial_known_nodes).await;
438        // Ensure there is at least one resolved node
439        if initial_peers.is_empty() {
440            return Err(NewSessionError::FailedToResolveAnyHostname(
441                resolved_hostnames,
442            ));
443        }
444
445        let control_connection_endpoint = UntranslatedEndpoint::ContactPoint(
446            initial_peers
447                .choose(&mut rng())
448                .expect("Tried to initialize MetadataReader with empty initial_known_nodes list!")
449                .clone(),
450        );
451
452        // setting event_sender field in connection config will cause control connection to
453        // - send REGISTER message to receive server events
454        // - send received events via server_event_sender
455        connection_config.event_sender = Some(server_event_sender);
456
457        let control_connection_pool_config = PoolConfig {
458            connection_config,
459
460            // We want to have only one connection to receive events from
461            pool_size: PoolSize::PerHost(NonZeroUsize::new(1).unwrap()),
462
463            // The shard-aware port won't be used with PerHost pool size anyway,
464            // so explicitly disable it here
465            can_use_shard_aware_port: false,
466        };
467
468        let control_connection = Self::make_control_connection_pool(
469            control_connection_endpoint.clone(),
470            &control_connection_pool_config,
471            control_connection_repair_requester.clone(),
472            #[cfg(feature = "metrics")]
473            metrics.clone(),
474        );
475
476        Ok(MetadataReader {
477            control_connection_pool_config,
478            control_connection_endpoint,
479            control_connection,
480            request_serverside_timeout,
481            known_peers: initial_peers
482                .into_iter()
483                .map(UntranslatedEndpoint::ContactPoint)
484                .collect(),
485            keyspaces_to_fetch,
486            fetch_schema,
487            host_filter: host_filter.clone(),
488            initial_known_nodes,
489            control_connection_repair_requester,
490            #[cfg(feature = "metrics")]
491            metrics,
492        })
493    }
494
495    /// Fetches current metadata from the cluster
496    pub(crate) async fn read_metadata(&mut self, initial: bool) -> Result<Metadata, MetadataError> {
497        let mut result = self.fetch_metadata(initial).await;
498        let prev_err = match result {
499            Ok(metadata) => {
500                debug!("Fetched new metadata");
501                self.update_known_peers(&metadata);
502                if initial {
503                    self.handle_unaccepted_host_in_control_connection(&metadata);
504                }
505                return Ok(metadata);
506            }
507            Err(err) => err,
508        };
509
510        // At this point, we known that fetching metadata on currect control connection failed.
511        // Therefore, we try to fetch metadata from other known peers, in order.
512
513        // shuffle known_peers to iterate through them in random order later
514        self.known_peers.shuffle(&mut rng());
515        debug!("Known peers: {:?}", self.known_peers.iter().format(", "));
516
517        let address_of_failed_control_connection = self.control_connection_endpoint.address();
518        let filtered_known_peers = self
519            .known_peers
520            .clone()
521            .into_iter()
522            .filter(|peer| peer.address() != address_of_failed_control_connection);
523
524        // if fetching metadata on current control connection failed,
525        // try to fetch metadata from other known peer
526        result = self
527            .retry_fetch_metadata_on_nodes(initial, filtered_known_peers, prev_err)
528            .await;
529
530        if let Err(prev_err) = result {
531            if !initial {
532                // If no known peer is reachable, try falling back to initial contact points, in hope that
533                // there are some hostnames there which will resolve to reachable new addresses.
534                warn!("Failed to establish control connection and fetch metadata on all known peers. Falling back to initial contact points.");
535                let (initial_peers, _hostnames) =
536                    resolve_contact_points(&self.initial_known_nodes).await;
537                result = self
538                    .retry_fetch_metadata_on_nodes(
539                        initial,
540                        initial_peers
541                            .into_iter()
542                            .map(UntranslatedEndpoint::ContactPoint),
543                        prev_err,
544                    )
545                    .await;
546            } else {
547                // No point in falling back as this is an initial connection attempt.
548                result = Err(prev_err);
549            }
550        }
551
552        match &result {
553            Ok(metadata) => {
554                self.update_known_peers(metadata);
555                self.handle_unaccepted_host_in_control_connection(metadata);
556                debug!("Fetched new metadata");
557            }
558            Err(error) => error!(
559                error = %error,
560                "Could not fetch metadata"
561            ),
562        }
563
564        result
565    }
566
567    async fn retry_fetch_metadata_on_nodes(
568        &mut self,
569        initial: bool,
570        nodes: impl Iterator<Item = UntranslatedEndpoint>,
571        prev_err: MetadataError,
572    ) -> Result<Metadata, MetadataError> {
573        let mut result = Err(prev_err);
574        for peer in nodes {
575            let err = match result {
576                Ok(_) => break,
577                Err(err) => err,
578            };
579
580            warn!(
581                control_connection_address = tracing::field::display(self
582                    .control_connection_endpoint
583                    .address()),
584                error = %err,
585                "Failed to fetch metadata using current control connection"
586            );
587
588            self.control_connection_endpoint = peer.clone();
589            self.control_connection = Self::make_control_connection_pool(
590                self.control_connection_endpoint.clone(),
591                &self.control_connection_pool_config,
592                self.control_connection_repair_requester.clone(),
593                #[cfg(feature = "metrics")]
594                Arc::clone(&self.metrics),
595            );
596
597            debug!(
598                "Retrying to establish the control connection on {}",
599                self.control_connection_endpoint.address()
600            );
601            result = self.fetch_metadata(initial).await;
602        }
603        result
604    }
605
606    async fn fetch_metadata(&self, initial: bool) -> Result<Metadata, MetadataError> {
607        // TODO: Timeouts?
608        self.control_connection.wait_until_initialized().await;
609        let conn = ControlConnection::new(self.control_connection.random_connection()?)
610            .override_serverside_timeout(self.request_serverside_timeout);
611
612        let res = conn
613            .query_metadata(
614                self.control_connection_endpoint.address().port(),
615                &self.keyspaces_to_fetch,
616                self.fetch_schema,
617            )
618            .await;
619
620        if initial {
621            if let Err(err) = res {
622                warn!(
623                    error = ?err,
624                    "Initial metadata read failed, proceeding with metadata \
625                    consisting only of the initial peer list and dummy tokens. \
626                    This might result in suboptimal performance and schema \
627                    information not being available."
628                );
629                return Ok(Metadata::new_dummy(&self.known_peers));
630            }
631        }
632
633        res
634    }
635
636    fn update_known_peers(&mut self, metadata: &Metadata) {
637        let host_filter = self.host_filter.as_ref();
638        self.known_peers = metadata
639            .peers
640            .iter()
641            .filter(|peer| host_filter.map_or(true, |f| f.accept(peer)))
642            .map(|peer| UntranslatedEndpoint::Peer(peer.to_peer_endpoint()))
643            .collect();
644
645        // Check if the host filter isn't accidentally too restrictive,
646        // and print an error message about this fact
647        if !metadata.peers.is_empty() && self.known_peers.is_empty() {
648            error!(
649                node_ips = tracing::field::display(
650                    metadata.peers.iter().map(|peer| peer.address).format(", ")
651                ),
652                "The host filter rejected all nodes in the cluster, \
653                no connections that can serve user queries have been \
654                established. The session cannot serve any queries!"
655            )
656        }
657    }
658
659    fn handle_unaccepted_host_in_control_connection(&mut self, metadata: &Metadata) {
660        let control_connection_peer = metadata
661            .peers
662            .iter()
663            .find(|peer| matches!(self.control_connection_endpoint, UntranslatedEndpoint::Peer(PeerEndpoint{address, ..}) if address == peer.address));
664        if let Some(peer) = control_connection_peer {
665            if !self.host_filter.as_ref().map_or(true, |f| f.accept(peer)) {
666                warn!(
667                    filtered_node_ips = tracing::field::display(metadata
668                        .peers
669                        .iter()
670                        .filter(|peer| self.host_filter.as_ref().map_or(true, |p| p.accept(peer)))
671                        .map(|peer| peer.address)
672                        .format(", ")
673                    ),
674                    control_connection_address = ?self.control_connection_endpoint.address(),
675                    "The node that the control connection is established to \
676                    is not accepted by the host filter. Please verify that \
677                    the nodes in your initial peers list are accepted by the \
678                    host filter. The driver will try to re-establish the \
679                    control connection to a different node."
680                );
681
682                // Assuming here that known_peers are up-to-date
683                if !self.known_peers.is_empty() {
684                    self.control_connection_endpoint = self
685                        .known_peers
686                        .choose(&mut rng())
687                        .expect("known_peers is empty - should be impossible")
688                        .clone();
689
690                    self.control_connection = Self::make_control_connection_pool(
691                        self.control_connection_endpoint.clone(),
692                        &self.control_connection_pool_config,
693                        self.control_connection_repair_requester.clone(),
694                        #[cfg(feature = "metrics")]
695                        Arc::clone(&self.metrics),
696                    );
697                }
698            }
699        }
700    }
701
702    fn make_control_connection_pool(
703        endpoint: UntranslatedEndpoint,
704        pool_config: &PoolConfig,
705        refresh_requester: broadcast::Sender<()>,
706        #[cfg(feature = "metrics")] metrics: Arc<Metrics>,
707    ) -> NodeConnectionPool {
708        NodeConnectionPool::new(
709            endpoint,
710            pool_config,
711            None,
712            refresh_requester,
713            #[cfg(feature = "metrics")]
714            metrics,
715        )
716    }
717}
718
719impl ControlConnection {
720    async fn query_metadata(
721        &self,
722        connect_port: u16,
723        keyspace_to_fetch: &[String],
724        fetch_schema: bool,
725    ) -> Result<Metadata, MetadataError> {
726        let peers_query = self.query_peers(connect_port);
727        let keyspaces_query = self.query_keyspaces(keyspace_to_fetch, fetch_schema);
728
729        let (peers, keyspaces) = tokio::try_join!(peers_query, keyspaces_query)?;
730
731        // There must be at least one peer
732        if peers.is_empty() {
733            return Err(MetadataError::Peers(PeersMetadataError::EmptyPeers));
734        }
735
736        // At least one peer has to have some tokens
737        if peers.iter().all(|peer| peer.tokens.is_empty()) {
738            return Err(MetadataError::Peers(PeersMetadataError::EmptyTokenLists));
739        }
740
741        Ok(Metadata { peers, keyspaces })
742    }
743}
744
745#[derive(DeserializeRow)]
746#[scylla(crate = "scylla_cql")]
747struct NodeInfoRow {
748    host_id: Option<Uuid>,
749    #[scylla(rename = "rpc_address")]
750    untranslated_ip_addr: IpAddr,
751    #[scylla(rename = "data_center")]
752    datacenter: Option<String>,
753    rack: Option<String>,
754    tokens: Option<Vec<String>>,
755}
756
757#[derive(Clone, Copy)]
758enum NodeInfoSource {
759    Local,
760    Peer,
761}
762
763impl NodeInfoSource {
764    fn describe(&self) -> &'static str {
765        match self {
766            Self::Local => "local node",
767            Self::Peer => "peer",
768        }
769    }
770}
771
772const METADATA_QUERY_PAGE_SIZE: i32 = 1024;
773
774impl ControlConnection {
775    async fn query_peers(&self, connect_port: u16) -> Result<Vec<Peer>, MetadataError> {
776        let mut peers_query = Statement::new(
777            "select host_id, rpc_address, data_center, rack, tokens from system.peers",
778        );
779        peers_query.set_page_size(METADATA_QUERY_PAGE_SIZE);
780        let peers_query_stream = self
781            .query_iter(peers_query)
782            .map(|pager_res| {
783                let pager = pager_res?;
784                let rows_stream = pager.rows_stream::<NodeInfoRow>()?;
785                Ok::<_, MetadataFetchErrorKind>(rows_stream)
786            })
787            .into_stream()
788            .map(|result| result.map(|stream| stream.map_err(MetadataFetchErrorKind::NextRowError)))
789            .try_flatten()
790            // Add table context to the error.
791            .map_err(|error| MetadataFetchError {
792                error,
793                table: "system.peers",
794            })
795            .and_then(|row_result| future::ok((NodeInfoSource::Peer, row_result)));
796
797        let mut local_query =
798        Statement::new("select host_id, rpc_address, data_center, rack, tokens from system.local WHERE key='local'");
799        local_query.set_page_size(METADATA_QUERY_PAGE_SIZE);
800        let local_query_stream = self
801            .query_iter(local_query)
802            .map(|pager_res| {
803                let pager = pager_res?;
804                let rows_stream = pager.rows_stream::<NodeInfoRow>()?;
805                Ok::<_, MetadataFetchErrorKind>(rows_stream)
806            })
807            .into_stream()
808            .map(|result| result.map(|stream| stream.map_err(MetadataFetchErrorKind::NextRowError)))
809            .try_flatten()
810            // Add table context to the error.
811            .map_err(|error| MetadataFetchError {
812                error,
813                table: "system.local",
814            })
815            .and_then(|row_result| future::ok((NodeInfoSource::Local, row_result)));
816
817        let untranslated_rows = stream::select(peers_query_stream, local_query_stream);
818
819        let local_ip: IpAddr = self.get_connect_address().ip();
820        let local_address = SocketAddr::new(local_ip, connect_port);
821
822        let translated_peers_futures = untranslated_rows.map(|row_result| async {
823            match row_result {
824                Ok((source, row)) => Self::create_peer_from_row(source, row, local_address).await,
825                Err(err) => {
826                    warn!(
827                        "system.peers or system.local has an invalid row, skipping it: {}",
828                        err
829                    );
830                    None
831                }
832            }
833        });
834
835        let peers = translated_peers_futures
836            .buffer_unordered(256)
837            .filter_map(std::future::ready)
838            .collect::<Vec<_>>()
839            .await;
840        Ok(peers)
841    }
842
843    async fn create_peer_from_row(
844        source: NodeInfoSource,
845        row: NodeInfoRow,
846        local_address: SocketAddr,
847    ) -> Option<Peer> {
848        let NodeInfoRow {
849            host_id,
850            untranslated_ip_addr,
851            datacenter,
852            rack,
853            tokens,
854        } = row;
855
856        let host_id = match host_id {
857            Some(host_id) => host_id,
858            None => {
859                warn!("{} (untranslated ip: {}, dc: {:?}, rack: {:?}) has Host ID set to null; skipping node.", source.describe(), untranslated_ip_addr, datacenter, rack);
860                return None;
861            }
862        };
863
864        let connect_port = local_address.port();
865        let untranslated_address = SocketAddr::new(untranslated_ip_addr, connect_port);
866
867        let node_addr = match source {
868            NodeInfoSource::Local => {
869                // For the local node we should use connection's address instead of rpc_address.
870                // (The reason is that rpc_address in system.local can be wrong.)
871                // Thus, we replace address in local_rows with connection's address.
872                // We need to replace rpc_address with control connection address.
873                NodeAddr::Untranslatable(local_address)
874            }
875            NodeInfoSource::Peer => {
876                // The usual case - no translation.
877                NodeAddr::Translatable(untranslated_address)
878            }
879        };
880
881        let tokens_str: Vec<String> = tokens.unwrap_or_default();
882
883        // Parse string representation of tokens as integer values
884        let tokens: Vec<Token> = match tokens_str
885            .iter()
886            .map(|s| Token::from_str(s))
887            .collect::<Result<Vec<Token>, _>>()
888        {
889            Ok(parsed) => parsed,
890            Err(e) => {
891                // FIXME: we could allow the users to provide custom partitioning information
892                // in order for it to work with non-standard token sizes.
893                // Also, we could implement support for Cassandra's other standard partitioners
894                // like RandomPartitioner or ByteOrderedPartitioner.
895                trace!("Couldn't parse tokens as 64-bit integers: {}, proceeding with a dummy token. If you're using a partitioner with different token size, consider migrating to murmur3", e);
896                vec![Token::new(rand::rng().random::<i64>())]
897            }
898        };
899
900        Some(Peer {
901            host_id,
902            address: node_addr,
903            tokens,
904            datacenter,
905            rack,
906        })
907    }
908
909    fn query_filter_keyspace_name<'a, R>(
910        &'a self,
911        query_str: &'a str,
912        keyspaces_to_fetch: &'a [String],
913    ) -> impl Stream<Item = Result<R, MetadataFetchErrorKind>> + 'a
914    where
915        R: DeserializeOwnedRow + 'static,
916    {
917        // This function is extracted to reduce monomorphisation penalty:
918        // query_filter_keyspace_name() is going to be monomorphised into 5 distinct functions,
919        // so it's better to extract the common part.
920        async fn make_keyspace_filtered_query_pager(
921            conn: &ControlConnection,
922            query_str: &str,
923            keyspaces_to_fetch: &[String],
924        ) -> Result<QueryPager, MetadataFetchErrorKind> {
925            if keyspaces_to_fetch.is_empty() {
926                let mut query = Statement::new(query_str);
927                query.set_page_size(METADATA_QUERY_PAGE_SIZE);
928
929                conn.query_iter(query)
930                    .await
931                    .map_err(MetadataFetchErrorKind::NextRowError)
932            } else {
933                let keyspaces = &[keyspaces_to_fetch] as &[&[String]];
934                let query_str = format!("{query_str} where keyspace_name in ?");
935
936                let mut query = Statement::new(query_str);
937                query.set_page_size(METADATA_QUERY_PAGE_SIZE);
938
939                let prepared = conn.prepare(query).await?;
940                let serialized_values = prepared.serialize_values(&keyspaces)?;
941                conn.execute_iter(prepared, serialized_values)
942                    .await
943                    .map_err(MetadataFetchErrorKind::NextRowError)
944            }
945        }
946
947        let fut = async move {
948            let pager =
949                make_keyspace_filtered_query_pager(self, query_str, keyspaces_to_fetch).await?;
950            let stream: crate::client::pager::TypedRowStream<R> = pager.rows_stream::<R>()?;
951            Ok::<_, MetadataFetchErrorKind>(stream)
952        };
953        fut.into_stream()
954            .map(|result| result.map(|stream| stream.map_err(MetadataFetchErrorKind::NextRowError)))
955            .try_flatten()
956    }
957
958    async fn query_keyspaces(
959        &self,
960        keyspaces_to_fetch: &[String],
961        fetch_schema: bool,
962    ) -> Result<PerKeyspaceResult<Keyspace, SingleKeyspaceMetadataError>, MetadataError> {
963        let rows = self
964            .query_filter_keyspace_name::<(String, HashMap<String, String>)>(
965                "select keyspace_name, replication from system_schema.keyspaces",
966                keyspaces_to_fetch,
967            )
968            .map_err(|error| MetadataFetchError {
969                error,
970                table: "system_schema.keyspaces",
971            });
972
973        let (mut all_tables, mut all_views, mut all_user_defined_types) = if fetch_schema {
974            let udts = self.query_user_defined_types(keyspaces_to_fetch).await?;
975            let mut tables_schema = self.query_tables_schema(keyspaces_to_fetch, &udts).await?;
976            (
977                // We pass the mutable reference to the same map to the both functions.
978                // First function fetches `system_schema.tables`, and removes found
979                // table from `tables_schema`.
980                // Second does the same for `system_schema.views`.
981                // The assumption here is that no keys (table names) can appear in both
982                // of those schema table.
983                // As far as we know this assumption is true for Scylla and Cassandra.
984                self.query_tables(keyspaces_to_fetch, &mut tables_schema)
985                    .await?,
986                self.query_views(keyspaces_to_fetch, &mut tables_schema)
987                    .await?,
988                udts,
989            )
990        } else {
991            (HashMap::new(), HashMap::new(), HashMap::new())
992        };
993
994        rows.map(|row_result| {
995            let (keyspace_name, strategy_map) = row_result?;
996
997            let strategy: Strategy = strategy_from_string_map(strategy_map).map_err(|error| {
998                KeyspacesMetadataError::Strategy {
999                    keyspace: keyspace_name.clone(),
1000                    error,
1001                }
1002            })?;
1003            let tables = all_tables
1004                .remove(&keyspace_name)
1005                .unwrap_or_else(|| Ok(HashMap::new()));
1006            let views = all_views
1007                .remove(&keyspace_name)
1008                .unwrap_or_else(|| Ok(HashMap::new()));
1009            let user_defined_types = all_user_defined_types
1010                .remove(&keyspace_name)
1011                .unwrap_or_else(|| Ok(HashMap::new()));
1012
1013            // As you can notice, in this file we generally operate on two layers of errors:
1014            // - Outer (MetadataError) if something went wrong with querying the cluster.
1015            // - Inner (SingleKeyspaceMetadataError) if the fetched metadata turned out to not be fully consistent.
1016            // If there is an inner error, we want to drop metadata for the whole keyspace.
1017            // This logic checks if either tables, views, or UDTs have such inner error, and returns it if so.
1018            // Notice that in the error branch, return value is wrapped in `Ok` - but this is the
1019            // outer error, so it just means there was no error while querying the cluster.
1020            let (tables, views, user_defined_types) = match (tables, views, user_defined_types) {
1021                (Ok(t), Ok(v), Ok(u)) => (t, v, u),
1022                (Err(e), _, _) | (_, Err(e), _) => return Ok((keyspace_name, Err(e))),
1023                (_, _, Err(e)) => {
1024                    return Ok((
1025                        keyspace_name,
1026                        Err(SingleKeyspaceMetadataError::MissingUDT(e)),
1027                    ))
1028                }
1029            };
1030
1031            let keyspace = Keyspace {
1032                strategy,
1033                tables,
1034                views,
1035                user_defined_types,
1036            };
1037
1038            Ok((keyspace_name, Ok(keyspace)))
1039        })
1040        .try_collect()
1041        .await
1042    }
1043}
1044
1045#[derive(DeserializeRow, Debug)]
1046#[scylla(crate = "crate")]
1047struct UdtRow {
1048    keyspace_name: String,
1049    type_name: String,
1050    field_names: Vec<String>,
1051    field_types: Vec<String>,
1052}
1053
1054#[derive(Debug)]
1055struct UdtRowWithParsedFieldTypes {
1056    keyspace_name: String,
1057    type_name: String,
1058    field_names: Vec<String>,
1059    field_types: Vec<PreColumnType>,
1060}
1061
1062impl TryFrom<UdtRow> for UdtRowWithParsedFieldTypes {
1063    type Error = InvalidCqlType;
1064    fn try_from(udt_row: UdtRow) -> Result<Self, InvalidCqlType> {
1065        let UdtRow {
1066            keyspace_name,
1067            type_name,
1068            field_names,
1069            field_types,
1070        } = udt_row;
1071        let field_types = field_types
1072            .into_iter()
1073            .map(|type_| map_string_to_cql_type(&type_))
1074            .collect::<Result<Vec<_>, _>>()?;
1075        Ok(Self {
1076            keyspace_name,
1077            type_name,
1078            field_names,
1079            field_types,
1080        })
1081    }
1082}
1083
1084impl ControlConnection {
1085    async fn query_user_defined_types(
1086        &self,
1087        keyspaces_to_fetch: &[String],
1088    ) -> Result<
1089        PerKeyspaceResult<PerTable<Arc<UserDefinedType<'static>>>, MissingUserDefinedType>,
1090        MetadataError,
1091    > {
1092        let rows = self.query_filter_keyspace_name::<UdtRow>(
1093        "select keyspace_name, type_name, field_names, field_types from system_schema.types",
1094        keyspaces_to_fetch,
1095    )
1096    .map_err(|error| MetadataFetchError {
1097        error,
1098        table: "system_schema.types",
1099    });
1100
1101        let mut udt_rows: Vec<UdtRowWithParsedFieldTypes> = rows
1102            .map(|row_result| {
1103                let udt_row = row_result?.try_into().map_err(|err: InvalidCqlType| {
1104                    MetadataError::Udts(UdtMetadataError::InvalidCqlType {
1105                        typ: err.typ,
1106                        position: err.position,
1107                        reason: err.reason,
1108                    })
1109                })?;
1110
1111                Ok::<_, MetadataError>(udt_row)
1112            })
1113            .try_collect()
1114            .await?;
1115
1116        let instant_before_toposort = Instant::now();
1117        topo_sort_udts(&mut udt_rows)?;
1118        let toposort_elapsed = instant_before_toposort.elapsed();
1119        debug!(
1120            "Toposort of UDT definitions took {:.2} ms (udts len: {})",
1121            toposort_elapsed.as_secs_f64() * 1000.,
1122            udt_rows.len(),
1123        );
1124
1125        let mut udts = HashMap::new();
1126        'udts_loop: for udt_row in udt_rows {
1127            let UdtRowWithParsedFieldTypes {
1128                keyspace_name,
1129                type_name,
1130                field_names,
1131                field_types,
1132            } = udt_row;
1133
1134            let keyspace_name_clone = keyspace_name.clone();
1135            let keyspace_udts_result = udts
1136                .entry(keyspace_name)
1137                .or_insert_with(|| Ok(HashMap::new()));
1138
1139            // If there was previously an error in this keyspace then it makes no sense to process this UDT.
1140            let keyspace_udts = match keyspace_udts_result {
1141                Ok(udts) => udts,
1142                Err(_) => continue,
1143            };
1144
1145            let mut fields = Vec::with_capacity(field_names.len());
1146
1147            for (field_name, field_type) in field_names.into_iter().zip(field_types.into_iter()) {
1148                match field_type.into_cql_type(&keyspace_name_clone, keyspace_udts) {
1149                    Ok(cql_type) => fields.push((field_name.into(), cql_type)),
1150                    Err(e) => {
1151                        *keyspace_udts_result = Err(e);
1152                        continue 'udts_loop;
1153                    }
1154                }
1155            }
1156
1157            let udt = Arc::new(UserDefinedType {
1158                name: type_name.clone().into(),
1159                keyspace: keyspace_name_clone.into(),
1160                field_types: fields,
1161            });
1162
1163            keyspace_udts.insert(type_name, udt);
1164        }
1165
1166        Ok(udts)
1167    }
1168}
1169
1170fn topo_sort_udts(udts: &mut Vec<UdtRowWithParsedFieldTypes>) -> Result<(), UdtMetadataError> {
1171    fn do_with_referenced_udts(what: &mut impl FnMut(&str), pre_cql_type: &PreColumnType) {
1172        match pre_cql_type {
1173            PreColumnType::Native(_) => (),
1174            PreColumnType::Collection { typ: type_, .. } => match type_ {
1175                PreCollectionType::List(t) | PreCollectionType::Set(t) => {
1176                    do_with_referenced_udts(what, t)
1177                }
1178                PreCollectionType::Map(t1, t2) => {
1179                    do_with_referenced_udts(what, t1);
1180                    do_with_referenced_udts(what, t2);
1181                }
1182            },
1183            PreColumnType::Tuple(types) => types
1184                .iter()
1185                .for_each(|type_| do_with_referenced_udts(what, type_)),
1186            PreColumnType::Vector { typ: type_, .. } => do_with_referenced_udts(what, type_),
1187            PreColumnType::UserDefinedType { name, .. } => what(name),
1188        }
1189    }
1190
1191    // Build an indegree map: for each node in the graph, how many directly depending types it has.
1192    let mut indegs = udts
1193        .drain(..)
1194        .map(|def| {
1195            (
1196                (def.keyspace_name.clone(), def.type_name.clone()),
1197                (def, Cell::new(0u32)),
1198            )
1199        })
1200        .collect::<HashMap<_, _>>();
1201
1202    // For each node in the graph...
1203    for (def, _) in indegs.values() {
1204        let mut increment_referred_udts = |type_name: &str| {
1205            let deg = indegs
1206                .get(&(def.keyspace_name.clone(), type_name.to_string()))
1207                .map(|(_, count)| count);
1208
1209            if let Some(deg_cell) = deg {
1210                deg_cell.set(deg_cell.get() + 1);
1211            }
1212        };
1213
1214        // For each type referred by the node...
1215        for field_type in def.field_types.iter() {
1216            do_with_referenced_udts(&mut increment_referred_udts, field_type);
1217        }
1218    }
1219
1220    let mut sorted = Vec::with_capacity(indegs.len());
1221    let mut next_idx = 0;
1222
1223    // Schedule keys that had an initial indeg of 0
1224    for (key, _) in indegs.iter().filter(|(_, (_, deg))| deg.get() == 0) {
1225        sorted.push(key);
1226    }
1227
1228    while let Some(key @ (keyspace, _type_name)) = sorted.get(next_idx).copied() {
1229        next_idx += 1;
1230        // Decrement the counters of all UDTs that this UDT depends upon
1231        // and then schedule them if their counter drops to 0
1232        let mut decrement_referred_udts = |type_name: &str| {
1233            let key_value = indegs.get_key_value(&(keyspace.clone(), type_name.to_string()));
1234
1235            if let Some((ref_key, (_, cnt))) = key_value {
1236                let new_cnt = cnt.get() - 1;
1237                cnt.set(new_cnt);
1238                if new_cnt == 0 {
1239                    sorted.push(ref_key);
1240                }
1241            }
1242        };
1243
1244        let def = &indegs.get(key).unwrap().0;
1245        // For each type referred by the node...
1246        for field_type in def.field_types.iter() {
1247            do_with_referenced_udts(&mut decrement_referred_udts, field_type);
1248        }
1249    }
1250
1251    if sorted.len() < indegs.len() {
1252        // Some UDTs could not become leaves in the graph, which implies cycles.
1253        return Err(UdtMetadataError::CircularTypeDependency);
1254    }
1255
1256    let owned_sorted = sorted.into_iter().cloned().collect::<Vec<_>>();
1257    assert!(udts.is_empty());
1258    for key in owned_sorted.into_iter().rev() {
1259        udts.push(indegs.remove(&key).unwrap().0);
1260    }
1261
1262    Ok(())
1263}
1264
1265#[cfg(test)]
1266mod toposort_tests {
1267    use crate::test_utils::setup_tracing;
1268
1269    use super::{topo_sort_udts, UdtRow, UdtRowWithParsedFieldTypes};
1270
1271    const KEYSPACE1: &str = "KEYSPACE1";
1272    const KEYSPACE2: &str = "KEYSPACE2";
1273
1274    fn make_udt_row(
1275        keyspace_name: String,
1276        type_name: String,
1277        field_types: Vec<String>,
1278    ) -> UdtRowWithParsedFieldTypes {
1279        UdtRow {
1280            keyspace_name,
1281            type_name,
1282            field_names: vec!["udt_field".into(); field_types.len()],
1283            field_types,
1284        }
1285        .try_into()
1286        .unwrap()
1287    }
1288
1289    fn get_udt_idx(
1290        toposorted: &[UdtRowWithParsedFieldTypes],
1291        keyspace_name: &str,
1292        type_name: &str,
1293    ) -> usize {
1294        toposorted
1295            .iter()
1296            .enumerate()
1297            .find_map(|(idx, def)| {
1298                (def.type_name == type_name && def.keyspace_name == keyspace_name).then_some(idx)
1299            })
1300            .unwrap()
1301    }
1302
1303    #[test]
1304    #[ntest::timeout(1000)]
1305    fn test_udt_topo_sort_valid_case() {
1306        setup_tracing();
1307        // UDTs dependencies on each other (arrow A -> B signifies that type B is composed of type A):
1308        //
1309        // KEYSPACE1
1310        //      F -->+
1311        //      ^    |
1312        //      |    |
1313        // A -> B -> C
1314        // |    ^
1315        // + -> D
1316        //
1317        // E   (E is an independent UDT)
1318        //
1319        // KEYSPACE2
1320        // B -> A -> C
1321        // ^    ^
1322        // D -> E
1323
1324        let mut udts = vec![
1325            make_udt_row(KEYSPACE1.into(), "A".into(), vec!["blob".into()]),
1326            make_udt_row(KEYSPACE1.into(), "B".into(), vec!["A".into(), "D".into()]),
1327            make_udt_row(
1328                KEYSPACE1.into(),
1329                "C".into(),
1330                vec!["blob".into(), "B".into(), "list<map<F, text>>".into()],
1331            ),
1332            make_udt_row(
1333                KEYSPACE1.into(),
1334                "D".into(),
1335                vec!["A".into(), "blob".into()],
1336            ),
1337            make_udt_row(KEYSPACE1.into(), "E".into(), vec!["blob".into()]),
1338            make_udt_row(
1339                KEYSPACE1.into(),
1340                "F".into(),
1341                vec!["B".into(), "blob".into()],
1342            ),
1343            make_udt_row(
1344                KEYSPACE2.into(),
1345                "A".into(),
1346                vec!["B".into(), "tuple<E, E>".into()],
1347            ),
1348            make_udt_row(KEYSPACE2.into(), "B".into(), vec!["map<text, D>".into()]),
1349            make_udt_row(KEYSPACE2.into(), "C".into(), vec!["frozen<A>".into()]),
1350            make_udt_row(KEYSPACE2.into(), "D".into(), vec!["blob".into()]),
1351            make_udt_row(KEYSPACE2.into(), "E".into(), vec!["D".into()]),
1352        ];
1353
1354        topo_sort_udts(&mut udts).unwrap();
1355
1356        assert!(get_udt_idx(&udts, KEYSPACE1, "A") < get_udt_idx(&udts, KEYSPACE1, "B"));
1357        assert!(get_udt_idx(&udts, KEYSPACE1, "A") < get_udt_idx(&udts, KEYSPACE1, "D"));
1358        assert!(get_udt_idx(&udts, KEYSPACE1, "B") < get_udt_idx(&udts, KEYSPACE1, "C"));
1359        assert!(get_udt_idx(&udts, KEYSPACE1, "B") < get_udt_idx(&udts, KEYSPACE1, "F"));
1360        assert!(get_udt_idx(&udts, KEYSPACE1, "F") < get_udt_idx(&udts, KEYSPACE1, "C"));
1361        assert!(get_udt_idx(&udts, KEYSPACE1, "D") < get_udt_idx(&udts, KEYSPACE1, "B"));
1362
1363        assert!(get_udt_idx(&udts, KEYSPACE2, "B") < get_udt_idx(&udts, KEYSPACE2, "A"));
1364        assert!(get_udt_idx(&udts, KEYSPACE2, "D") < get_udt_idx(&udts, KEYSPACE2, "B"));
1365        assert!(get_udt_idx(&udts, KEYSPACE2, "D") < get_udt_idx(&udts, KEYSPACE2, "E"));
1366        assert!(get_udt_idx(&udts, KEYSPACE2, "E") < get_udt_idx(&udts, KEYSPACE2, "A"));
1367        assert!(get_udt_idx(&udts, KEYSPACE2, "A") < get_udt_idx(&udts, KEYSPACE2, "C"));
1368    }
1369
1370    #[test]
1371    #[ntest::timeout(1000)]
1372    fn test_udt_topo_sort_detects_cycles() {
1373        setup_tracing();
1374        const KEYSPACE1: &str = "KEYSPACE1";
1375        let tests = [
1376            // test 1
1377            // A depends on itself.
1378            vec![make_udt_row(
1379                KEYSPACE1.into(),
1380                "A".into(),
1381                vec!["blob".into(), "A".into()],
1382            )],
1383            // test 2
1384            // A depends on B, which depends on A; also, there is an independent E.
1385            vec![
1386                make_udt_row(
1387                    KEYSPACE1.into(),
1388                    "A".into(),
1389                    vec!["blob".into(), "B".into()],
1390                ),
1391                make_udt_row(
1392                    KEYSPACE1.into(),
1393                    "B".into(),
1394                    vec!["int".into(), "map<text, A>".into()],
1395                ),
1396                make_udt_row(KEYSPACE1.into(), "E".into(), vec!["text".into()]),
1397            ],
1398        ];
1399
1400        for mut udts in tests {
1401            topo_sort_udts(&mut udts).unwrap_err();
1402        }
1403    }
1404
1405    #[test]
1406    #[ntest::timeout(1000)]
1407    fn test_udt_topo_sort_ignores_invalid_metadata() {
1408        setup_tracing();
1409        // A depends on B, which depends on unknown C; also, there is an independent E.
1410        let mut udts = vec![
1411            make_udt_row(
1412                KEYSPACE1.into(),
1413                "A".into(),
1414                vec!["blob".into(), "B".into()],
1415            ),
1416            make_udt_row(
1417                KEYSPACE1.into(),
1418                "B".into(),
1419                vec!["int".into(), "map<text, C>".into()],
1420            ),
1421            make_udt_row(KEYSPACE1.into(), "E".into(), vec!["text".into()]),
1422        ];
1423
1424        topo_sort_udts(&mut udts).unwrap();
1425
1426        assert!(get_udt_idx(&udts, KEYSPACE1, "B") < get_udt_idx(&udts, KEYSPACE1, "A"));
1427    }
1428}
1429
1430impl ControlConnection {
1431    async fn query_tables(
1432        &self,
1433        keyspaces_to_fetch: &[String],
1434        tables: &mut PerKsTableResult<Table, SingleKeyspaceMetadataError>,
1435    ) -> Result<PerKeyspaceResult<PerTable<Table>, SingleKeyspaceMetadataError>, MetadataError>
1436    {
1437        let rows = self
1438            .query_filter_keyspace_name::<(String, String)>(
1439                "SELECT keyspace_name, table_name FROM system_schema.tables",
1440                keyspaces_to_fetch,
1441            )
1442            .map_err(|error| MetadataFetchError {
1443                error,
1444                table: "system_schema.tables",
1445            });
1446        let mut result = HashMap::new();
1447
1448        rows.map(|row_result| {
1449            let keyspace_and_table_name = row_result?;
1450
1451            let table = tables.remove(&keyspace_and_table_name).unwrap_or(Ok(Table {
1452                columns: HashMap::new(),
1453                partition_key: vec![],
1454                clustering_key: vec![],
1455                partitioner: None,
1456                pk_column_specs: vec![],
1457            }));
1458
1459            let mut entry = result
1460                .entry(keyspace_and_table_name.0)
1461                .or_insert_with(|| Ok(HashMap::new()));
1462            match (&mut entry, table) {
1463                (Ok(tables), Ok(table)) => {
1464                    let _ = tables.insert(keyspace_and_table_name.1, table);
1465                }
1466                (Err(_), _) => (),
1467                (Ok(_), Err(e)) => *entry = Err(e),
1468            };
1469
1470            Ok::<_, MetadataError>(())
1471        })
1472        .try_for_each(|_| future::ok(()))
1473        .await?;
1474
1475        Ok(result)
1476    }
1477
1478    async fn query_views(
1479        &self,
1480        keyspaces_to_fetch: &[String],
1481        tables: &mut PerKsTableResult<Table, SingleKeyspaceMetadataError>,
1482    ) -> Result<
1483        PerKeyspaceResult<PerTable<MaterializedView>, SingleKeyspaceMetadataError>,
1484        MetadataError,
1485    > {
1486        let rows = self
1487            .query_filter_keyspace_name::<(String, String, String)>(
1488                "SELECT keyspace_name, view_name, base_table_name FROM system_schema.views",
1489                keyspaces_to_fetch,
1490            )
1491            .map_err(|error| MetadataFetchError {
1492                error,
1493                table: "system_schema.views",
1494            });
1495
1496        let mut result = HashMap::new();
1497
1498        rows.map(|row_result| {
1499            let (keyspace_name, view_name, base_table_name) = row_result?;
1500
1501            let keyspace_and_view_name = (keyspace_name, view_name);
1502
1503            let materialized_view = tables
1504                .remove(&keyspace_and_view_name)
1505                .unwrap_or(Ok(Table {
1506                    columns: HashMap::new(),
1507                    partition_key: vec![],
1508                    clustering_key: vec![],
1509                    partitioner: None,
1510                    pk_column_specs: vec![],
1511                }))
1512                .map(|table| MaterializedView {
1513                    view_metadata: table,
1514                    base_table_name,
1515                });
1516
1517            let mut entry = result
1518                .entry(keyspace_and_view_name.0)
1519                .or_insert_with(|| Ok(HashMap::new()));
1520
1521            match (&mut entry, materialized_view) {
1522                (Ok(views), Ok(view)) => {
1523                    let _ = views.insert(keyspace_and_view_name.1, view);
1524                }
1525                (Err(_), _) => (),
1526                (Ok(_), Err(e)) => *entry = Err(e),
1527            };
1528
1529            Ok::<_, MetadataError>(())
1530        })
1531        .try_for_each(|_| future::ok(()))
1532        .await?;
1533
1534        Ok(result)
1535    }
1536
1537    async fn query_tables_schema(
1538        &self,
1539        keyspaces_to_fetch: &[String],
1540        udts: &PerKeyspaceResult<PerTable<Arc<UserDefinedType<'static>>>, MissingUserDefinedType>,
1541    ) -> Result<PerKsTableResult<Table, SingleKeyspaceMetadataError>, MetadataError> {
1542        // Upon migration from thrift to CQL, Cassandra internally creates a surrogate column "value" of
1543        // type EmptyType for dense tables. This resolves into this CQL type name.
1544        // This column shouldn't be exposed to the user but is currently exposed in system tables.
1545        const THRIFT_EMPTY_TYPE: &str = "empty";
1546
1547        type RowType = (String, String, String, String, i32, String);
1548
1549        let rows = self.query_filter_keyspace_name::<RowType>(
1550        "select keyspace_name, table_name, column_name, kind, position, type from system_schema.columns",
1551        keyspaces_to_fetch
1552    ).map_err(|error| MetadataFetchError {
1553        error,
1554        table: "system_schema.columns",
1555    });
1556
1557        let empty_ok_map = Ok(HashMap::new());
1558
1559        let mut tables_schema: HashMap<_, Result<_, SingleKeyspaceMetadataError>> = HashMap::new();
1560
1561        rows.map(|row_result| {
1562            let (keyspace_name, table_name, column_name, kind, position, type_) = row_result?;
1563
1564            if type_ == THRIFT_EMPTY_TYPE {
1565                return Ok::<_, MetadataError>(());
1566            }
1567
1568            let keyspace_udts: &PerTable<Arc<UserDefinedType<'static>>> =
1569                match udts.get(&keyspace_name).unwrap_or(&empty_ok_map) {
1570                    Ok(udts) => udts,
1571                    Err(e) => {
1572                        // There are two things we could do here
1573                        // 1. Not inserting, just returning. In that case the keyspaces containing
1574                        //    tables that have a column with a broken UDT will not be present in
1575                        //    the output of this function at all.
1576                        // 2. Inserting an error (which requires cloning it). In that case,
1577                        //    keyspace containing a table with broken UDT will have the error
1578                        //    cloned from this UDT.
1579                        //
1580                        // Solution number 1 seems weird because it can be seen as silencing
1581                        // the error: we have data for a keyspace, but we just don't put
1582                        // it in the result at all.
1583                        // Solution 2 is also not perfect because it:
1584                        // - Returns error for the keyspace even if the broken UDT is not used in any table.
1585                        // - Doesn't really distinguish between a table using a broken UDT and
1586                        //   a keyspace just containing some broken UDTs.
1587                        //
1588                        // I chose solution 2. Its first problem is not really important because
1589                        // the caller will error out the entire keyspace anyway. The second problem
1590                        // is minor enough to ignore. Note that the first issue also applies to
1591                        // solution 1: but the keyspace won't be present in the result at all,
1592                        // which is arguably worse.
1593                        tables_schema.insert(
1594                            (keyspace_name, table_name),
1595                            Err(SingleKeyspaceMetadataError::MissingUDT(e.clone())),
1596                        );
1597                        return Ok::<_, MetadataError>(());
1598                    }
1599                };
1600            let pre_cql_type = map_string_to_cql_type(&type_).map_err(|err: InvalidCqlType| {
1601                TablesMetadataError::InvalidCqlType {
1602                    typ: err.typ,
1603                    position: err.position,
1604                    reason: err.reason,
1605                }
1606            })?;
1607            let cql_type = match pre_cql_type.into_cql_type(&keyspace_name, keyspace_udts) {
1608                Ok(t) => t,
1609                Err(e) => {
1610                    tables_schema.insert(
1611                        (keyspace_name, table_name),
1612                        Err(SingleKeyspaceMetadataError::MissingUDT(e)),
1613                    );
1614                    return Ok::<_, MetadataError>(());
1615                }
1616            };
1617
1618            let kind = ColumnKind::from_str(&kind).map_err(|_| {
1619                TablesMetadataError::UnknownColumnKind {
1620                    keyspace_name: keyspace_name.clone(),
1621                    table_name: table_name.clone(),
1622                    column_name: column_name.clone(),
1623                    column_kind: kind,
1624                }
1625            })?;
1626
1627            let Ok(entry) = tables_schema
1628                .entry((keyspace_name, table_name))
1629                .or_insert(Ok((
1630                    HashMap::new(), // columns
1631                    Vec::new(),     // partition key
1632                    Vec::new(),     // clustering key
1633                )))
1634            else {
1635                // This table was previously marked as broken, no way to insert anything.
1636                return Ok::<_, MetadataError>(());
1637            };
1638
1639            if kind == ColumnKind::PartitionKey || kind == ColumnKind::Clustering {
1640                let key_list: &mut Vec<(i32, String)> = if kind == ColumnKind::PartitionKey {
1641                    entry.1.borrow_mut()
1642                } else {
1643                    entry.2.borrow_mut()
1644                };
1645                key_list.push((position, column_name.clone()));
1646            }
1647
1648            entry.0.insert(
1649                column_name,
1650                Column {
1651                    typ: cql_type,
1652                    kind,
1653                },
1654            );
1655
1656            Ok::<_, MetadataError>(())
1657        })
1658        .try_for_each(|_| future::ok(()))
1659        .await?;
1660
1661        let mut all_partitioners = self.query_table_partitioners().await?;
1662        let mut result = HashMap::new();
1663
1664        'tables_loop: for ((keyspace_name, table_name), table_result) in tables_schema {
1665            let keyspace_and_table_name = (keyspace_name, table_name);
1666
1667            #[allow(clippy::type_complexity)]
1668            let (columns, partition_key_columns, clustering_key_columns): (
1669                HashMap<String, Column>,
1670                Vec<(i32, String)>,
1671                Vec<(i32, String)>,
1672            ) = match table_result {
1673                Ok(table) => table,
1674                Err(e) => {
1675                    let _ = result.insert(keyspace_and_table_name, Err(e));
1676                    continue;
1677                }
1678            };
1679
1680            fn validate_key_columns(
1681                mut key_columns: Vec<(i32, String)>,
1682            ) -> Result<Vec<String>, i32> {
1683                key_columns.sort_unstable_by_key(|(position, _)| *position);
1684
1685                key_columns
1686                    .into_iter()
1687                    .enumerate()
1688                    .map(|(idx, (position, column_name))| {
1689                        // unwrap: I don't see the point of handling the scenario of fetching over
1690                        // 2 * 10^9 columns.
1691                        let idx: i32 = idx.try_into().unwrap();
1692                        if idx == position {
1693                            Ok(column_name)
1694                        } else {
1695                            Err(idx)
1696                        }
1697                    })
1698                    .collect::<Result<Vec<_>, _>>()
1699            }
1700
1701            let partition_key = match validate_key_columns(partition_key_columns) {
1702                Ok(partition_key_columns) => partition_key_columns,
1703                Err(position) => {
1704                    result.insert(
1705                        keyspace_and_table_name,
1706                        Err(SingleKeyspaceMetadataError::IncompletePartitionKey(
1707                            position,
1708                        )),
1709                    );
1710                    continue 'tables_loop;
1711                }
1712            };
1713
1714            let clustering_key = match validate_key_columns(clustering_key_columns) {
1715                Ok(clustering_key_columns) => clustering_key_columns,
1716                Err(position) => {
1717                    result.insert(
1718                        keyspace_and_table_name,
1719                        Err(SingleKeyspaceMetadataError::IncompleteClusteringKey(
1720                            position,
1721                        )),
1722                    );
1723                    continue 'tables_loop;
1724                }
1725            };
1726
1727            let partitioner = all_partitioners
1728                .remove(&keyspace_and_table_name)
1729                .unwrap_or_default();
1730
1731            // unwrap of get() result: all column names in `partition_key` are at this
1732            // point guaranteed to be present in `columns`. See the construction of `partition_key`
1733            let pk_column_specs = partition_key
1734                .iter()
1735                .map(|column_name| (column_name, columns.get(column_name).unwrap().clone().typ))
1736                .map(|(name, typ)| {
1737                    let table_spec = TableSpec::owned(
1738                        keyspace_and_table_name.0.clone(),
1739                        keyspace_and_table_name.1.clone(),
1740                    );
1741                    ColumnSpec::owned(name.to_owned(), typ, table_spec)
1742                })
1743                .collect();
1744
1745            result.insert(
1746                keyspace_and_table_name,
1747                Ok(Table {
1748                    columns,
1749                    partition_key,
1750                    clustering_key,
1751                    partitioner,
1752                    pk_column_specs,
1753                }),
1754            );
1755        }
1756
1757        Ok(result)
1758    }
1759}
1760
1761fn map_string_to_cql_type(typ: &str) -> Result<PreColumnType, InvalidCqlType> {
1762    match parse_cql_type(ParserState::new(typ)) {
1763        Err(err) => Err(InvalidCqlType {
1764            typ: typ.to_string(),
1765            position: err.calculate_position(typ).unwrap_or(0),
1766            reason: err.get_cause().to_string(),
1767        }),
1768        Ok((_, p)) if !p.is_at_eof() => Err(InvalidCqlType {
1769            typ: typ.to_string(),
1770            position: p.calculate_position(typ).unwrap_or(0),
1771            reason: "leftover characters".to_string(),
1772        }),
1773        Ok((typ, _)) => Ok(typ),
1774    }
1775}
1776
1777fn parse_cql_type(p: ParserState<'_>) -> ParseResult<(PreColumnType, ParserState<'_>)> {
1778    if let Ok(p) = p.accept("frozen<") {
1779        let (inner_type, p) = parse_cql_type(p)?;
1780        let p = p.accept(">")?;
1781
1782        let frozen_type = freeze_type(inner_type);
1783
1784        Ok((frozen_type, p))
1785    } else if let Ok(p) = p.accept("map<") {
1786        let (key, p) = parse_cql_type(p)?;
1787        let p = p.accept(",")?.skip_white();
1788        let (value, p) = parse_cql_type(p)?;
1789        let p = p.accept(">")?;
1790
1791        let typ = PreColumnType::Collection {
1792            frozen: false,
1793            typ: PreCollectionType::Map(Box::new(key), Box::new(value)),
1794        };
1795
1796        Ok((typ, p))
1797    } else if let Ok(p) = p.accept("list<") {
1798        let (inner_type, p) = parse_cql_type(p)?;
1799        let p = p.accept(">")?;
1800
1801        let typ = PreColumnType::Collection {
1802            frozen: false,
1803            typ: PreCollectionType::List(Box::new(inner_type)),
1804        };
1805
1806        Ok((typ, p))
1807    } else if let Ok(p) = p.accept("set<") {
1808        let (inner_type, p) = parse_cql_type(p)?;
1809        let p = p.accept(">")?;
1810
1811        let typ = PreColumnType::Collection {
1812            frozen: false,
1813            typ: PreCollectionType::Set(Box::new(inner_type)),
1814        };
1815
1816        Ok((typ, p))
1817    } else if let Ok(p) = p.accept("tuple<") {
1818        let mut types = Vec::new();
1819        let p = p.parse_while(|p| {
1820            let (inner_type, p) = parse_cql_type(p)?;
1821            types.push(inner_type);
1822
1823            if let Ok(p) = p.accept(",") {
1824                let p = p.skip_white();
1825                Ok((true, p))
1826            } else if let Ok(p) = p.accept(">") {
1827                Ok((false, p))
1828            } else {
1829                Err(p.error(ParseErrorCause::Other("expected \",\" or \">\"")))
1830            }
1831        })?;
1832
1833        Ok((PreColumnType::Tuple(types), p))
1834    } else if let Ok(p) = p.accept("vector<") {
1835        let (inner_type, p) = parse_cql_type(p)?;
1836
1837        let p = p.skip_white();
1838        let p = p.accept(",")?;
1839        let p = p.skip_white();
1840        let (size, p) = p.parse_u16()?;
1841        let p = p.skip_white();
1842        let p = p.accept(">")?;
1843
1844        let typ = PreColumnType::Vector {
1845            typ: Box::new(inner_type),
1846            dimensions: size,
1847        };
1848
1849        Ok((typ, p))
1850    } else if let Ok((typ, p)) = parse_native_type(p) {
1851        Ok((PreColumnType::Native(typ), p))
1852    } else if let Ok((name, p)) = parse_user_defined_type(p) {
1853        let typ = PreColumnType::UserDefinedType {
1854            frozen: false,
1855            name: name.to_string(),
1856        };
1857        Ok((typ, p))
1858    } else {
1859        Err(p.error(ParseErrorCause::Other("invalid cql type")))
1860    }
1861}
1862
1863fn parse_native_type(p: ParserState) -> ParseResult<(NativeType, ParserState)> {
1864    let (tok, p) = p.take_while(|c| c.is_alphanumeric() || c == '_');
1865    let typ = match tok {
1866        "ascii" => NativeType::Ascii,
1867        "boolean" => NativeType::Boolean,
1868        "blob" => NativeType::Blob,
1869        "counter" => NativeType::Counter,
1870        "date" => NativeType::Date,
1871        "decimal" => NativeType::Decimal,
1872        "double" => NativeType::Double,
1873        "duration" => NativeType::Duration,
1874        "float" => NativeType::Float,
1875        "int" => NativeType::Int,
1876        "bigint" => NativeType::BigInt,
1877        "text" => NativeType::Text,
1878        "timestamp" => NativeType::Timestamp,
1879        "inet" => NativeType::Inet,
1880        "smallint" => NativeType::SmallInt,
1881        "tinyint" => NativeType::TinyInt,
1882        "time" => NativeType::Time,
1883        "timeuuid" => NativeType::Timeuuid,
1884        "uuid" => NativeType::Uuid,
1885        "varint" => NativeType::Varint,
1886        _ => return Err(p.error(ParseErrorCause::Other("invalid native type"))),
1887    };
1888    Ok((typ, p))
1889}
1890
1891fn parse_user_defined_type(p: ParserState) -> ParseResult<(&str, ParserState)> {
1892    // Java identifiers allow letters, underscores and dollar signs at any position
1893    // and digits in non-first position. Dots are accepted here because the names
1894    // are usually fully qualified.
1895    let (tok, p) = p.take_while(|c| c.is_alphanumeric() || c == '.' || c == '_' || c == '$');
1896    if tok.is_empty() {
1897        return Err(p.error(ParseErrorCause::Other("invalid user defined type")));
1898    }
1899    Ok((tok, p))
1900}
1901
1902fn freeze_type(typ: PreColumnType) -> PreColumnType {
1903    match typ {
1904        PreColumnType::Collection { typ: type_, .. } => PreColumnType::Collection {
1905            frozen: true,
1906            typ: type_,
1907        },
1908        PreColumnType::UserDefinedType { name, .. } => {
1909            PreColumnType::UserDefinedType { frozen: true, name }
1910        }
1911        other => other,
1912    }
1913}
1914
1915impl ControlConnection {
1916    async fn query_table_partitioners(
1917        &self,
1918    ) -> Result<PerKsTable<Option<String>>, MetadataFetchError> {
1919        fn create_err(err: impl Into<MetadataFetchErrorKind>) -> MetadataFetchError {
1920            MetadataFetchError {
1921                error: err.into(),
1922                table: "system_schema.scylla_tables",
1923            }
1924        }
1925
1926        let mut partitioner_query = Statement::new(
1927            "select keyspace_name, table_name, partitioner from system_schema.scylla_tables",
1928        );
1929        partitioner_query.set_page_size(METADATA_QUERY_PAGE_SIZE);
1930
1931        let rows = self
1932            .query_iter(partitioner_query)
1933            .map(|pager_res| {
1934                let pager = pager_res.map_err(create_err)?;
1935                let stream = pager
1936                    .rows_stream::<(String, String, Option<String>)>()
1937                    // Map the error of Result<TypedRowStream, TypecheckError>
1938                    .map_err(create_err)?
1939                    // Map the error of single stream iteration (NextRowError)
1940                    .map_err(create_err);
1941                Ok::<_, MetadataFetchError>(stream)
1942            })
1943            .into_stream()
1944            .try_flatten();
1945
1946        let result = rows
1947            .map(|row_result| {
1948                let (keyspace_name, table_name, partitioner) = row_result?;
1949                Ok::<_, MetadataFetchError>(((keyspace_name, table_name), partitioner))
1950            })
1951            .try_collect::<HashMap<_, _>>()
1952            .await;
1953
1954        match result {
1955            // FIXME: This match catches all database errors with this error code despite the fact
1956            // that we are only interested in the ones resulting from non-existent table
1957            // system_schema.scylla_tables.
1958            // For more information please refer to https://github.com/scylladb/scylla-rust-driver/pull/349#discussion_r762050262
1959            Err(MetadataFetchError {
1960                error:
1961                    MetadataFetchErrorKind::NextRowError(NextRowError::NextPageError(
1962                        NextPageError::RequestFailure(RequestError::LastAttemptError(
1963                            RequestAttemptError::DbError(DbError::Invalid, _),
1964                        )),
1965                    )),
1966                ..
1967            }) => Ok(HashMap::new()),
1968            result => result,
1969        }
1970    }
1971}
1972
1973fn strategy_from_string_map(
1974    mut strategy_map: HashMap<String, String>,
1975) -> Result<Strategy, KeyspaceStrategyError> {
1976    let strategy_name: String = strategy_map
1977        .remove("class")
1978        .ok_or(KeyspaceStrategyError::MissingClassForStrategyDefinition)?;
1979
1980    let strategy: Strategy = match strategy_name.as_str() {
1981        "org.apache.cassandra.locator.SimpleStrategy" | "SimpleStrategy" => {
1982            let rep_factor_str: String = strategy_map
1983                .remove("replication_factor")
1984                .ok_or(KeyspaceStrategyError::MissingReplicationFactorForSimpleStrategy)?;
1985
1986            let replication_factor: usize = usize::from_str(&rep_factor_str)
1987                .map_err(KeyspaceStrategyError::ReplicationFactorParseError)?;
1988
1989            Strategy::SimpleStrategy { replication_factor }
1990        }
1991        "org.apache.cassandra.locator.NetworkTopologyStrategy" | "NetworkTopologyStrategy" => {
1992            let mut datacenter_repfactors: HashMap<String, usize> =
1993                HashMap::with_capacity(strategy_map.len());
1994
1995            for (key, value) in strategy_map.drain() {
1996                let rep_factor: usize = usize::from_str(&value).map_err(|_| {
1997                    // Unexpected NTS option.
1998                    // We only expect 'class' (which is resolved above)
1999                    // and replication factors per dc.
2000                    KeyspaceStrategyError::UnexpectedNetworkTopologyStrategyOption {
2001                        key: key.clone(),
2002                        value,
2003                    }
2004                })?;
2005
2006                datacenter_repfactors.insert(key, rep_factor);
2007            }
2008
2009            Strategy::NetworkTopologyStrategy {
2010                datacenter_repfactors,
2011            }
2012        }
2013        "org.apache.cassandra.locator.LocalStrategy" | "LocalStrategy" => Strategy::LocalStrategy,
2014        _ => Strategy::Other {
2015            name: strategy_name,
2016            data: strategy_map,
2017        },
2018    };
2019
2020    Ok(strategy)
2021}
2022
2023#[cfg(test)]
2024mod tests {
2025    use crate::test_utils::setup_tracing;
2026
2027    use super::*;
2028
2029    #[test]
2030    fn test_cql_type_parsing() {
2031        setup_tracing();
2032        let test_cases = [
2033            ("bigint", PreColumnType::Native(NativeType::BigInt)),
2034            (
2035                "list<int>",
2036                PreColumnType::Collection {
2037                    frozen: false,
2038                    typ: PreCollectionType::List(Box::new(PreColumnType::Native(NativeType::Int))),
2039                },
2040            ),
2041            (
2042                "set<ascii>",
2043                PreColumnType::Collection {
2044                    frozen: false,
2045                    typ: PreCollectionType::Set(Box::new(PreColumnType::Native(NativeType::Ascii))),
2046                },
2047            ),
2048            (
2049                "map<blob, boolean>",
2050                PreColumnType::Collection {
2051                    frozen: false,
2052                    typ: PreCollectionType::Map(
2053                        Box::new(PreColumnType::Native(NativeType::Blob)),
2054                        Box::new(PreColumnType::Native(NativeType::Boolean)),
2055                    ),
2056                },
2057            ),
2058            (
2059                "frozen<map<text, text>>",
2060                PreColumnType::Collection {
2061                    frozen: true,
2062                    typ: PreCollectionType::Map(
2063                        Box::new(PreColumnType::Native(NativeType::Text)),
2064                        Box::new(PreColumnType::Native(NativeType::Text)),
2065                    ),
2066                },
2067            ),
2068            (
2069                "tuple<tinyint, smallint, int, bigint, varint>",
2070                PreColumnType::Tuple(vec![
2071                    PreColumnType::Native(NativeType::TinyInt),
2072                    PreColumnType::Native(NativeType::SmallInt),
2073                    PreColumnType::Native(NativeType::Int),
2074                    PreColumnType::Native(NativeType::BigInt),
2075                    PreColumnType::Native(NativeType::Varint),
2076                ]),
2077            ),
2078            (
2079                "vector<int, 5>",
2080                PreColumnType::Vector {
2081                    typ: Box::new(PreColumnType::Native(NativeType::Int)),
2082                    dimensions: 5,
2083                },
2084            ),
2085            (
2086                "vector<text, 1234>",
2087                PreColumnType::Vector {
2088                    typ: Box::new(PreColumnType::Native(NativeType::Text)),
2089                    dimensions: 1234,
2090                },
2091            ),
2092            (
2093                "com.scylladb.types.AwesomeType",
2094                PreColumnType::UserDefinedType {
2095                    frozen: false,
2096                    name: "com.scylladb.types.AwesomeType".to_string(),
2097                },
2098            ),
2099            (
2100                "frozen<ks.my_udt>",
2101                PreColumnType::UserDefinedType {
2102                    frozen: true,
2103                    name: "ks.my_udt".to_string(),
2104                },
2105            ),
2106            (
2107                "map<text, frozen<map<text, text>>>",
2108                PreColumnType::Collection {
2109                    frozen: false,
2110                    typ: PreCollectionType::Map(
2111                        Box::new(PreColumnType::Native(NativeType::Text)),
2112                        Box::new(PreColumnType::Collection {
2113                            frozen: true,
2114                            typ: PreCollectionType::Map(
2115                                Box::new(PreColumnType::Native(NativeType::Text)),
2116                                Box::new(PreColumnType::Native(NativeType::Text)),
2117                            ),
2118                        }),
2119                    ),
2120                },
2121            ),
2122            (
2123                "map<\
2124                    frozen<list<int>>, \
2125                    set<\
2126                        list<\
2127                            tuple<\
2128                                list<list<text>>, \
2129                                map<text, map<ks.my_type, blob>>, \
2130                                frozen<set<set<int>>>\
2131                            >\
2132                        >\
2133                    >\
2134                >",
2135                // map<...>
2136                PreColumnType::Collection {
2137                    frozen: false,
2138                    typ: PreCollectionType::Map(
2139                        Box::new(PreColumnType::Collection {
2140                            // frozen<list<int>>
2141                            frozen: true,
2142                            typ: PreCollectionType::List(Box::new(PreColumnType::Native(
2143                                NativeType::Int,
2144                            ))),
2145                        }),
2146                        Box::new(PreColumnType::Collection {
2147                            // set<...>
2148                            frozen: false,
2149                            typ: PreCollectionType::Set(Box::new(PreColumnType::Collection {
2150                                // list<tuple<...>>
2151                                frozen: false,
2152                                typ: PreCollectionType::List(Box::new(PreColumnType::Tuple(vec![
2153                                    PreColumnType::Collection {
2154                                        // list<list<text>>
2155                                        frozen: false,
2156                                        typ: PreCollectionType::List(Box::new(
2157                                            PreColumnType::Collection {
2158                                                frozen: false,
2159                                                typ: PreCollectionType::List(Box::new(
2160                                                    PreColumnType::Native(NativeType::Text),
2161                                                )),
2162                                            },
2163                                        )),
2164                                    },
2165                                    PreColumnType::Collection {
2166                                        // map<text, map<ks.my_type, blob>>
2167                                        frozen: false,
2168                                        typ: PreCollectionType::Map(
2169                                            Box::new(PreColumnType::Native(NativeType::Text)),
2170                                            Box::new(PreColumnType::Collection {
2171                                                frozen: false,
2172                                                typ: PreCollectionType::Map(
2173                                                    Box::new(PreColumnType::UserDefinedType {
2174                                                        frozen: false,
2175                                                        name: "ks.my_type".to_string(),
2176                                                    }),
2177                                                    Box::new(PreColumnType::Native(
2178                                                        NativeType::Blob,
2179                                                    )),
2180                                                ),
2181                                            }),
2182                                        ),
2183                                    },
2184                                    PreColumnType::Collection {
2185                                        // frozen<set<set<int>>>
2186                                        frozen: true,
2187                                        typ: PreCollectionType::Set(Box::new(
2188                                            PreColumnType::Collection {
2189                                                frozen: false,
2190                                                typ: PreCollectionType::Set(Box::new(
2191                                                    PreColumnType::Native(NativeType::Int),
2192                                                )),
2193                                            },
2194                                        )),
2195                                    },
2196                                ]))),
2197                            })),
2198                        }),
2199                    ),
2200                },
2201            ),
2202        ];
2203
2204        for (s, expected) in test_cases {
2205            let parsed = map_string_to_cql_type(s).unwrap();
2206            assert_eq!(parsed, expected);
2207        }
2208    }
2209}