1use crate::client::pager::{NextPageError, NextRowError, QueryPager};
20use crate::cluster::node::resolve_contact_points;
21use crate::deserialize::DeserializeOwnedRow;
22use crate::errors::{
23 DbError, MetadataFetchError, MetadataFetchErrorKind, NewSessionError, RequestAttemptError,
24};
25use crate::frame::response::event::Event;
26use crate::network::{ConnectionConfig, NodeConnectionPool, PoolConfig, PoolSize};
27#[cfg(feature = "metrics")]
28use crate::observability::metrics::Metrics;
29use crate::policies::host_filter::HostFilter;
30use crate::routing::Token;
31use crate::statement::unprepared::Statement;
32use crate::utils::parse::{ParseErrorCause, ParseResult, ParserState};
33use crate::DeserializeRow;
34
35use futures::future::{self, FutureExt};
36use futures::stream::{self, StreamExt, TryStreamExt};
37use futures::Stream;
38use itertools::Itertools;
39use rand::seq::{IndexedRandom, SliceRandom};
40use rand::{rng, Rng};
41use scylla_cql::frame::response::result::{ColumnSpec, TableSpec};
42use std::borrow::BorrowMut;
43use std::cell::Cell;
44use std::collections::HashMap;
45use std::fmt::{self, Formatter};
46use std::net::{IpAddr, SocketAddr};
47use std::num::NonZeroUsize;
48use std::str::FromStr;
49use std::sync::Arc;
50use std::time::{Duration, Instant};
51use thiserror::Error;
52use tokio::sync::{broadcast, mpsc};
53use tracing::{debug, error, trace, warn};
54use uuid::Uuid;
55
56use crate::cluster::node::{InternalKnownNode, NodeAddr, ResolvedContactPoint};
57use crate::errors::{
58 KeyspaceStrategyError, KeyspacesMetadataError, MetadataError, PeersMetadataError, RequestError,
59 TablesMetadataError, UdtMetadataError,
60};
61
62pub use scylla_cql::frame::response::result::{
64 CollectionType, ColumnType, NativeType, UserDefinedType,
65};
66
67use super::control_connection::ControlConnection;
68
69type PerKeyspace<T> = HashMap<String, T>;
70type PerKeyspaceResult<T, E> = PerKeyspace<Result<T, E>>;
71type PerTable<T> = HashMap<String, T>;
72type PerKsTable<T> = HashMap<(String, String), T>;
73type PerKsTableResult<T, E> = PerKsTable<Result<T, E>>;
74
75#[derive(Clone, Debug, Error)]
82pub(crate) enum SingleKeyspaceMetadataError {
83 #[error(transparent)]
84 MissingUDT(MissingUserDefinedType),
85 #[error("Partition key column with position {0} is missing from metadata")]
86 IncompletePartitionKey(i32),
87 #[error("Clustering key column with position {0} is missing from metadata")]
88 IncompleteClusteringKey(i32),
89}
90
91pub(crate) struct MetadataReader {
93 control_connection_pool_config: PoolConfig,
94 request_serverside_timeout: Option<Duration>,
95
96 control_connection_endpoint: UntranslatedEndpoint,
97 control_connection: NodeConnectionPool,
98
99 known_peers: Vec<UntranslatedEndpoint>,
101 keyspaces_to_fetch: Vec<String>,
102 fetch_schema: bool,
103 host_filter: Option<Arc<dyn HostFilter>>,
104
105 initial_known_nodes: Vec<InternalKnownNode>,
108
109 control_connection_repair_requester: broadcast::Sender<()>,
112
113 #[cfg(feature = "metrics")]
114 metrics: Arc<Metrics>,
115}
116
117pub(crate) struct Metadata {
119 pub(crate) peers: Vec<Peer>,
120 pub(crate) keyspaces: HashMap<String, Result<Keyspace, SingleKeyspaceMetadataError>>,
121}
122
123#[non_exhaustive] pub struct Peer {
125 pub host_id: Uuid,
126 pub address: NodeAddr,
127 pub tokens: Vec<Token>,
128 pub datacenter: Option<String>,
129 pub rack: Option<String>,
130}
131
132#[derive(Clone, Debug)]
135pub(crate) enum UntranslatedEndpoint {
136 ContactPoint(ResolvedContactPoint),
138 Peer(PeerEndpoint),
140}
141
142impl UntranslatedEndpoint {
143 pub(crate) fn address(&self) -> NodeAddr {
144 match *self {
145 UntranslatedEndpoint::ContactPoint(ResolvedContactPoint { address, .. }) => {
146 NodeAddr::Untranslatable(address)
147 }
148 UntranslatedEndpoint::Peer(PeerEndpoint { address, .. }) => address,
149 }
150 }
151 pub(crate) fn set_port(&mut self, port: u16) {
152 let inner_addr = match self {
153 UntranslatedEndpoint::ContactPoint(ResolvedContactPoint { address, .. }) => address,
154 UntranslatedEndpoint::Peer(PeerEndpoint { address, .. }) => address.inner_mut(),
155 };
156 inner_addr.set_port(port);
157 }
158}
159
160#[derive(Clone, Debug)]
164pub(crate) struct PeerEndpoint {
165 pub(crate) host_id: Uuid,
166 pub(crate) address: NodeAddr,
167 pub(crate) datacenter: Option<String>,
168 pub(crate) rack: Option<String>,
169}
170
171impl Peer {
172 pub(crate) fn to_peer_endpoint(&self) -> PeerEndpoint {
173 PeerEndpoint {
174 host_id: self.host_id,
175 address: self.address,
176 datacenter: self.datacenter.clone(),
177 rack: self.rack.clone(),
178 }
179 }
180
181 pub(crate) fn into_peer_endpoint_and_tokens(self) -> (PeerEndpoint, Vec<Token>) {
182 (
183 PeerEndpoint {
184 host_id: self.host_id,
185 address: self.address,
186 datacenter: self.datacenter,
187 rack: self.rack,
188 },
189 self.tokens,
190 )
191 }
192}
193
194#[derive(Clone, Debug, PartialEq, Eq)]
196#[non_exhaustive]
197pub struct Keyspace {
198 pub strategy: Strategy,
199 pub tables: HashMap<String, Table>,
201 pub views: HashMap<String, MaterializedView>,
203 pub user_defined_types: HashMap<String, Arc<UserDefinedType<'static>>>,
205}
206
207#[derive(Clone, Debug, PartialEq, Eq)]
209#[non_exhaustive]
210pub struct Table {
211 pub columns: HashMap<String, Column>,
212 pub partition_key: Vec<String>,
215 pub clustering_key: Vec<String>,
218 pub partitioner: Option<String>,
219 pub(crate) pk_column_specs: Vec<ColumnSpec<'static>>,
220}
221
222#[derive(Clone, Debug, PartialEq, Eq)]
224#[non_exhaustive]
225pub struct MaterializedView {
226 pub view_metadata: Table,
227 pub base_table_name: String,
228}
229
230#[derive(Clone, Debug, PartialEq, Eq)]
232#[non_exhaustive]
233pub struct Column {
234 pub typ: ColumnType<'static>,
235 pub kind: ColumnKind,
236}
237
238#[derive(Clone, Debug, PartialEq, Eq)]
239enum PreColumnType {
240 Native(NativeType),
241 Collection {
242 frozen: bool,
243 typ: PreCollectionType,
244 },
245 Tuple(Vec<PreColumnType>),
246 Vector {
247 typ: Box<PreColumnType>,
248 dimensions: u16,
249 },
250 UserDefinedType {
251 frozen: bool,
252 name: String,
253 },
254}
255
256impl PreColumnType {
257 pub(crate) fn into_cql_type(
258 self,
259 keyspace_name: &String,
260 keyspace_udts: &PerTable<Arc<UserDefinedType<'static>>>,
261 ) -> Result<ColumnType<'static>, MissingUserDefinedType> {
262 match self {
263 PreColumnType::Native(n) => Ok(ColumnType::Native(n)),
264 PreColumnType::Collection { frozen, typ: type_ } => type_
265 .into_collection_type(keyspace_name, keyspace_udts)
266 .map(|inner| ColumnType::Collection { frozen, typ: inner }),
267 PreColumnType::Tuple(t) => t
268 .into_iter()
269 .map(|t| t.into_cql_type(keyspace_name, keyspace_udts))
270 .collect::<Result<Vec<ColumnType>, MissingUserDefinedType>>()
271 .map(ColumnType::Tuple),
272 PreColumnType::Vector {
273 typ: type_,
274 dimensions,
275 } => type_
276 .into_cql_type(keyspace_name, keyspace_udts)
277 .map(|inner| ColumnType::Vector {
278 typ: Box::new(inner),
279 dimensions,
280 }),
281 PreColumnType::UserDefinedType { frozen, name } => {
282 let definition = match keyspace_udts.get(&name) {
283 Some(def) => def.clone(),
284 None => {
285 return Err(MissingUserDefinedType {
286 name,
287 keyspace: keyspace_name.clone(),
288 })
289 }
290 };
291 Ok(ColumnType::UserDefinedType { frozen, definition })
292 }
293 }
294 }
295}
296
297#[derive(Clone, Debug, Error)]
299#[error("Missing UDT: {keyspace}, {name}")]
300pub(crate) struct MissingUserDefinedType {
301 name: String,
302 keyspace: String,
303}
304
305#[derive(Clone, Debug, PartialEq, Eq)]
306enum PreCollectionType {
307 List(Box<PreColumnType>),
308 Map(Box<PreColumnType>, Box<PreColumnType>),
309 Set(Box<PreColumnType>),
310}
311
312impl PreCollectionType {
313 pub(crate) fn into_collection_type(
314 self,
315 keyspace_name: &String,
316 keyspace_udts: &PerTable<Arc<UserDefinedType<'static>>>,
317 ) -> Result<CollectionType<'static>, MissingUserDefinedType> {
318 match self {
319 PreCollectionType::List(t) => t
320 .into_cql_type(keyspace_name, keyspace_udts)
321 .map(|inner| CollectionType::List(Box::new(inner))),
322 PreCollectionType::Map(tk, tv) => Ok(CollectionType::Map(
323 Box::new(tk.into_cql_type(keyspace_name, keyspace_udts)?),
324 Box::new(tv.into_cql_type(keyspace_name, keyspace_udts)?),
325 )),
326 PreCollectionType::Set(t) => t
327 .into_cql_type(keyspace_name, keyspace_udts)
328 .map(|inner| CollectionType::Set(Box::new(inner))),
329 }
330 }
331}
332
333#[derive(Clone, Debug, PartialEq, Eq)]
334#[non_exhaustive]
335pub enum ColumnKind {
336 Regular,
337 Static,
338 Clustering,
339 PartitionKey,
340}
341
342#[derive(Clone, Copy, Debug, PartialEq, Eq)]
344pub struct ColumnKindFromStrError;
345
346impl std::str::FromStr for ColumnKind {
347 type Err = ColumnKindFromStrError;
348
349 fn from_str(s: &str) -> Result<Self, Self::Err> {
350 match s {
351 "regular" => Ok(Self::Regular),
352 "static" => Ok(Self::Static),
353 "clustering" => Ok(Self::Clustering),
354 "partition_key" => Ok(Self::PartitionKey),
355 _ => Err(ColumnKindFromStrError),
356 }
357 }
358}
359
360#[derive(Clone, Debug, PartialEq, Eq)]
361#[non_exhaustive]
362#[allow(clippy::enum_variant_names)]
363pub enum Strategy {
364 SimpleStrategy {
365 replication_factor: usize,
366 },
367 NetworkTopologyStrategy {
368 datacenter_repfactors: HashMap<String, usize>,
370 },
371 LocalStrategy, Other {
373 name: String,
374 data: HashMap<String, String>,
375 },
376}
377
378#[derive(Clone, Debug)]
379struct InvalidCqlType {
380 typ: String,
381 position: usize,
382 reason: String,
383}
384
385impl fmt::Display for InvalidCqlType {
386 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
387 write!(f, "{:?}", self)
388 }
389}
390
391impl Metadata {
392 pub(crate) fn new_dummy(initial_peers: &[UntranslatedEndpoint]) -> Self {
397 let peers = initial_peers
398 .iter()
399 .enumerate()
400 .map(|(id, endpoint)| {
401 let token = ((id as u128) << 64) / initial_peers.len() as u128;
404
405 Peer {
406 address: endpoint.address(),
407 tokens: vec![Token::new(token as i64)],
408 datacenter: None,
409 rack: None,
410 host_id: Uuid::new_v4(),
411 }
412 })
413 .collect();
414
415 Metadata {
416 peers,
417 keyspaces: HashMap::new(),
418 }
419 }
420}
421
422impl MetadataReader {
423 #[allow(clippy::too_many_arguments)]
425 pub(crate) async fn new(
426 initial_known_nodes: Vec<InternalKnownNode>,
427 control_connection_repair_requester: broadcast::Sender<()>,
428 mut connection_config: ConnectionConfig,
429 request_serverside_timeout: Option<Duration>,
430 server_event_sender: mpsc::Sender<Event>,
431 keyspaces_to_fetch: Vec<String>,
432 fetch_schema: bool,
433 host_filter: &Option<Arc<dyn HostFilter>>,
434 #[cfg(feature = "metrics")] metrics: Arc<Metrics>,
435 ) -> Result<Self, NewSessionError> {
436 let (initial_peers, resolved_hostnames) =
437 resolve_contact_points(&initial_known_nodes).await;
438 if initial_peers.is_empty() {
440 return Err(NewSessionError::FailedToResolveAnyHostname(
441 resolved_hostnames,
442 ));
443 }
444
445 let control_connection_endpoint = UntranslatedEndpoint::ContactPoint(
446 initial_peers
447 .choose(&mut rng())
448 .expect("Tried to initialize MetadataReader with empty initial_known_nodes list!")
449 .clone(),
450 );
451
452 connection_config.event_sender = Some(server_event_sender);
456
457 let control_connection_pool_config = PoolConfig {
458 connection_config,
459
460 pool_size: PoolSize::PerHost(NonZeroUsize::new(1).unwrap()),
462
463 can_use_shard_aware_port: false,
466 };
467
468 let control_connection = Self::make_control_connection_pool(
469 control_connection_endpoint.clone(),
470 &control_connection_pool_config,
471 control_connection_repair_requester.clone(),
472 #[cfg(feature = "metrics")]
473 metrics.clone(),
474 );
475
476 Ok(MetadataReader {
477 control_connection_pool_config,
478 control_connection_endpoint,
479 control_connection,
480 request_serverside_timeout,
481 known_peers: initial_peers
482 .into_iter()
483 .map(UntranslatedEndpoint::ContactPoint)
484 .collect(),
485 keyspaces_to_fetch,
486 fetch_schema,
487 host_filter: host_filter.clone(),
488 initial_known_nodes,
489 control_connection_repair_requester,
490 #[cfg(feature = "metrics")]
491 metrics,
492 })
493 }
494
495 pub(crate) async fn read_metadata(&mut self, initial: bool) -> Result<Metadata, MetadataError> {
497 let mut result = self.fetch_metadata(initial).await;
498 let prev_err = match result {
499 Ok(metadata) => {
500 debug!("Fetched new metadata");
501 self.update_known_peers(&metadata);
502 if initial {
503 self.handle_unaccepted_host_in_control_connection(&metadata);
504 }
505 return Ok(metadata);
506 }
507 Err(err) => err,
508 };
509
510 self.known_peers.shuffle(&mut rng());
515 debug!("Known peers: {:?}", self.known_peers.iter().format(", "));
516
517 let address_of_failed_control_connection = self.control_connection_endpoint.address();
518 let filtered_known_peers = self
519 .known_peers
520 .clone()
521 .into_iter()
522 .filter(|peer| peer.address() != address_of_failed_control_connection);
523
524 result = self
527 .retry_fetch_metadata_on_nodes(initial, filtered_known_peers, prev_err)
528 .await;
529
530 if let Err(prev_err) = result {
531 if !initial {
532 warn!("Failed to establish control connection and fetch metadata on all known peers. Falling back to initial contact points.");
535 let (initial_peers, _hostnames) =
536 resolve_contact_points(&self.initial_known_nodes).await;
537 result = self
538 .retry_fetch_metadata_on_nodes(
539 initial,
540 initial_peers
541 .into_iter()
542 .map(UntranslatedEndpoint::ContactPoint),
543 prev_err,
544 )
545 .await;
546 } else {
547 result = Err(prev_err);
549 }
550 }
551
552 match &result {
553 Ok(metadata) => {
554 self.update_known_peers(metadata);
555 self.handle_unaccepted_host_in_control_connection(metadata);
556 debug!("Fetched new metadata");
557 }
558 Err(error) => error!(
559 error = %error,
560 "Could not fetch metadata"
561 ),
562 }
563
564 result
565 }
566
567 async fn retry_fetch_metadata_on_nodes(
568 &mut self,
569 initial: bool,
570 nodes: impl Iterator<Item = UntranslatedEndpoint>,
571 prev_err: MetadataError,
572 ) -> Result<Metadata, MetadataError> {
573 let mut result = Err(prev_err);
574 for peer in nodes {
575 let err = match result {
576 Ok(_) => break,
577 Err(err) => err,
578 };
579
580 warn!(
581 control_connection_address = tracing::field::display(self
582 .control_connection_endpoint
583 .address()),
584 error = %err,
585 "Failed to fetch metadata using current control connection"
586 );
587
588 self.control_connection_endpoint = peer.clone();
589 self.control_connection = Self::make_control_connection_pool(
590 self.control_connection_endpoint.clone(),
591 &self.control_connection_pool_config,
592 self.control_connection_repair_requester.clone(),
593 #[cfg(feature = "metrics")]
594 Arc::clone(&self.metrics),
595 );
596
597 debug!(
598 "Retrying to establish the control connection on {}",
599 self.control_connection_endpoint.address()
600 );
601 result = self.fetch_metadata(initial).await;
602 }
603 result
604 }
605
606 async fn fetch_metadata(&self, initial: bool) -> Result<Metadata, MetadataError> {
607 self.control_connection.wait_until_initialized().await;
609 let conn = ControlConnection::new(self.control_connection.random_connection()?)
610 .override_serverside_timeout(self.request_serverside_timeout);
611
612 let res = conn
613 .query_metadata(
614 self.control_connection_endpoint.address().port(),
615 &self.keyspaces_to_fetch,
616 self.fetch_schema,
617 )
618 .await;
619
620 if initial {
621 if let Err(err) = res {
622 warn!(
623 error = ?err,
624 "Initial metadata read failed, proceeding with metadata \
625 consisting only of the initial peer list and dummy tokens. \
626 This might result in suboptimal performance and schema \
627 information not being available."
628 );
629 return Ok(Metadata::new_dummy(&self.known_peers));
630 }
631 }
632
633 res
634 }
635
636 fn update_known_peers(&mut self, metadata: &Metadata) {
637 let host_filter = self.host_filter.as_ref();
638 self.known_peers = metadata
639 .peers
640 .iter()
641 .filter(|peer| host_filter.map_or(true, |f| f.accept(peer)))
642 .map(|peer| UntranslatedEndpoint::Peer(peer.to_peer_endpoint()))
643 .collect();
644
645 if !metadata.peers.is_empty() && self.known_peers.is_empty() {
648 error!(
649 node_ips = tracing::field::display(
650 metadata.peers.iter().map(|peer| peer.address).format(", ")
651 ),
652 "The host filter rejected all nodes in the cluster, \
653 no connections that can serve user queries have been \
654 established. The session cannot serve any queries!"
655 )
656 }
657 }
658
659 fn handle_unaccepted_host_in_control_connection(&mut self, metadata: &Metadata) {
660 let control_connection_peer = metadata
661 .peers
662 .iter()
663 .find(|peer| matches!(self.control_connection_endpoint, UntranslatedEndpoint::Peer(PeerEndpoint{address, ..}) if address == peer.address));
664 if let Some(peer) = control_connection_peer {
665 if !self.host_filter.as_ref().map_or(true, |f| f.accept(peer)) {
666 warn!(
667 filtered_node_ips = tracing::field::display(metadata
668 .peers
669 .iter()
670 .filter(|peer| self.host_filter.as_ref().map_or(true, |p| p.accept(peer)))
671 .map(|peer| peer.address)
672 .format(", ")
673 ),
674 control_connection_address = ?self.control_connection_endpoint.address(),
675 "The node that the control connection is established to \
676 is not accepted by the host filter. Please verify that \
677 the nodes in your initial peers list are accepted by the \
678 host filter. The driver will try to re-establish the \
679 control connection to a different node."
680 );
681
682 if !self.known_peers.is_empty() {
684 self.control_connection_endpoint = self
685 .known_peers
686 .choose(&mut rng())
687 .expect("known_peers is empty - should be impossible")
688 .clone();
689
690 self.control_connection = Self::make_control_connection_pool(
691 self.control_connection_endpoint.clone(),
692 &self.control_connection_pool_config,
693 self.control_connection_repair_requester.clone(),
694 #[cfg(feature = "metrics")]
695 Arc::clone(&self.metrics),
696 );
697 }
698 }
699 }
700 }
701
702 fn make_control_connection_pool(
703 endpoint: UntranslatedEndpoint,
704 pool_config: &PoolConfig,
705 refresh_requester: broadcast::Sender<()>,
706 #[cfg(feature = "metrics")] metrics: Arc<Metrics>,
707 ) -> NodeConnectionPool {
708 NodeConnectionPool::new(
709 endpoint,
710 pool_config,
711 None,
712 refresh_requester,
713 #[cfg(feature = "metrics")]
714 metrics,
715 )
716 }
717}
718
719impl ControlConnection {
720 async fn query_metadata(
721 &self,
722 connect_port: u16,
723 keyspace_to_fetch: &[String],
724 fetch_schema: bool,
725 ) -> Result<Metadata, MetadataError> {
726 let peers_query = self.query_peers(connect_port);
727 let keyspaces_query = self.query_keyspaces(keyspace_to_fetch, fetch_schema);
728
729 let (peers, keyspaces) = tokio::try_join!(peers_query, keyspaces_query)?;
730
731 if peers.is_empty() {
733 return Err(MetadataError::Peers(PeersMetadataError::EmptyPeers));
734 }
735
736 if peers.iter().all(|peer| peer.tokens.is_empty()) {
738 return Err(MetadataError::Peers(PeersMetadataError::EmptyTokenLists));
739 }
740
741 Ok(Metadata { peers, keyspaces })
742 }
743}
744
745#[derive(DeserializeRow)]
746#[scylla(crate = "scylla_cql")]
747struct NodeInfoRow {
748 host_id: Option<Uuid>,
749 #[scylla(rename = "rpc_address")]
750 untranslated_ip_addr: IpAddr,
751 #[scylla(rename = "data_center")]
752 datacenter: Option<String>,
753 rack: Option<String>,
754 tokens: Option<Vec<String>>,
755}
756
757#[derive(Clone, Copy)]
758enum NodeInfoSource {
759 Local,
760 Peer,
761}
762
763impl NodeInfoSource {
764 fn describe(&self) -> &'static str {
765 match self {
766 Self::Local => "local node",
767 Self::Peer => "peer",
768 }
769 }
770}
771
772const METADATA_QUERY_PAGE_SIZE: i32 = 1024;
773
774impl ControlConnection {
775 async fn query_peers(&self, connect_port: u16) -> Result<Vec<Peer>, MetadataError> {
776 let mut peers_query = Statement::new(
777 "select host_id, rpc_address, data_center, rack, tokens from system.peers",
778 );
779 peers_query.set_page_size(METADATA_QUERY_PAGE_SIZE);
780 let peers_query_stream = self
781 .query_iter(peers_query)
782 .map(|pager_res| {
783 let pager = pager_res?;
784 let rows_stream = pager.rows_stream::<NodeInfoRow>()?;
785 Ok::<_, MetadataFetchErrorKind>(rows_stream)
786 })
787 .into_stream()
788 .map(|result| result.map(|stream| stream.map_err(MetadataFetchErrorKind::NextRowError)))
789 .try_flatten()
790 .map_err(|error| MetadataFetchError {
792 error,
793 table: "system.peers",
794 })
795 .and_then(|row_result| future::ok((NodeInfoSource::Peer, row_result)));
796
797 let mut local_query =
798 Statement::new("select host_id, rpc_address, data_center, rack, tokens from system.local WHERE key='local'");
799 local_query.set_page_size(METADATA_QUERY_PAGE_SIZE);
800 let local_query_stream = self
801 .query_iter(local_query)
802 .map(|pager_res| {
803 let pager = pager_res?;
804 let rows_stream = pager.rows_stream::<NodeInfoRow>()?;
805 Ok::<_, MetadataFetchErrorKind>(rows_stream)
806 })
807 .into_stream()
808 .map(|result| result.map(|stream| stream.map_err(MetadataFetchErrorKind::NextRowError)))
809 .try_flatten()
810 .map_err(|error| MetadataFetchError {
812 error,
813 table: "system.local",
814 })
815 .and_then(|row_result| future::ok((NodeInfoSource::Local, row_result)));
816
817 let untranslated_rows = stream::select(peers_query_stream, local_query_stream);
818
819 let local_ip: IpAddr = self.get_connect_address().ip();
820 let local_address = SocketAddr::new(local_ip, connect_port);
821
822 let translated_peers_futures = untranslated_rows.map(|row_result| async {
823 match row_result {
824 Ok((source, row)) => Self::create_peer_from_row(source, row, local_address).await,
825 Err(err) => {
826 warn!(
827 "system.peers or system.local has an invalid row, skipping it: {}",
828 err
829 );
830 None
831 }
832 }
833 });
834
835 let peers = translated_peers_futures
836 .buffer_unordered(256)
837 .filter_map(std::future::ready)
838 .collect::<Vec<_>>()
839 .await;
840 Ok(peers)
841 }
842
843 async fn create_peer_from_row(
844 source: NodeInfoSource,
845 row: NodeInfoRow,
846 local_address: SocketAddr,
847 ) -> Option<Peer> {
848 let NodeInfoRow {
849 host_id,
850 untranslated_ip_addr,
851 datacenter,
852 rack,
853 tokens,
854 } = row;
855
856 let host_id = match host_id {
857 Some(host_id) => host_id,
858 None => {
859 warn!("{} (untranslated ip: {}, dc: {:?}, rack: {:?}) has Host ID set to null; skipping node.", source.describe(), untranslated_ip_addr, datacenter, rack);
860 return None;
861 }
862 };
863
864 let connect_port = local_address.port();
865 let untranslated_address = SocketAddr::new(untranslated_ip_addr, connect_port);
866
867 let node_addr = match source {
868 NodeInfoSource::Local => {
869 NodeAddr::Untranslatable(local_address)
874 }
875 NodeInfoSource::Peer => {
876 NodeAddr::Translatable(untranslated_address)
878 }
879 };
880
881 let tokens_str: Vec<String> = tokens.unwrap_or_default();
882
883 let tokens: Vec<Token> = match tokens_str
885 .iter()
886 .map(|s| Token::from_str(s))
887 .collect::<Result<Vec<Token>, _>>()
888 {
889 Ok(parsed) => parsed,
890 Err(e) => {
891 trace!("Couldn't parse tokens as 64-bit integers: {}, proceeding with a dummy token. If you're using a partitioner with different token size, consider migrating to murmur3", e);
896 vec![Token::new(rand::rng().random::<i64>())]
897 }
898 };
899
900 Some(Peer {
901 host_id,
902 address: node_addr,
903 tokens,
904 datacenter,
905 rack,
906 })
907 }
908
909 fn query_filter_keyspace_name<'a, R>(
910 &'a self,
911 query_str: &'a str,
912 keyspaces_to_fetch: &'a [String],
913 ) -> impl Stream<Item = Result<R, MetadataFetchErrorKind>> + 'a
914 where
915 R: DeserializeOwnedRow + 'static,
916 {
917 async fn make_keyspace_filtered_query_pager(
921 conn: &ControlConnection,
922 query_str: &str,
923 keyspaces_to_fetch: &[String],
924 ) -> Result<QueryPager, MetadataFetchErrorKind> {
925 if keyspaces_to_fetch.is_empty() {
926 let mut query = Statement::new(query_str);
927 query.set_page_size(METADATA_QUERY_PAGE_SIZE);
928
929 conn.query_iter(query)
930 .await
931 .map_err(MetadataFetchErrorKind::NextRowError)
932 } else {
933 let keyspaces = &[keyspaces_to_fetch] as &[&[String]];
934 let query_str = format!("{query_str} where keyspace_name in ?");
935
936 let mut query = Statement::new(query_str);
937 query.set_page_size(METADATA_QUERY_PAGE_SIZE);
938
939 let prepared = conn.prepare(query).await?;
940 let serialized_values = prepared.serialize_values(&keyspaces)?;
941 conn.execute_iter(prepared, serialized_values)
942 .await
943 .map_err(MetadataFetchErrorKind::NextRowError)
944 }
945 }
946
947 let fut = async move {
948 let pager =
949 make_keyspace_filtered_query_pager(self, query_str, keyspaces_to_fetch).await?;
950 let stream: crate::client::pager::TypedRowStream<R> = pager.rows_stream::<R>()?;
951 Ok::<_, MetadataFetchErrorKind>(stream)
952 };
953 fut.into_stream()
954 .map(|result| result.map(|stream| stream.map_err(MetadataFetchErrorKind::NextRowError)))
955 .try_flatten()
956 }
957
958 async fn query_keyspaces(
959 &self,
960 keyspaces_to_fetch: &[String],
961 fetch_schema: bool,
962 ) -> Result<PerKeyspaceResult<Keyspace, SingleKeyspaceMetadataError>, MetadataError> {
963 let rows = self
964 .query_filter_keyspace_name::<(String, HashMap<String, String>)>(
965 "select keyspace_name, replication from system_schema.keyspaces",
966 keyspaces_to_fetch,
967 )
968 .map_err(|error| MetadataFetchError {
969 error,
970 table: "system_schema.keyspaces",
971 });
972
973 let (mut all_tables, mut all_views, mut all_user_defined_types) = if fetch_schema {
974 let udts = self.query_user_defined_types(keyspaces_to_fetch).await?;
975 let mut tables_schema = self.query_tables_schema(keyspaces_to_fetch, &udts).await?;
976 (
977 self.query_tables(keyspaces_to_fetch, &mut tables_schema)
985 .await?,
986 self.query_views(keyspaces_to_fetch, &mut tables_schema)
987 .await?,
988 udts,
989 )
990 } else {
991 (HashMap::new(), HashMap::new(), HashMap::new())
992 };
993
994 rows.map(|row_result| {
995 let (keyspace_name, strategy_map) = row_result?;
996
997 let strategy: Strategy = strategy_from_string_map(strategy_map).map_err(|error| {
998 KeyspacesMetadataError::Strategy {
999 keyspace: keyspace_name.clone(),
1000 error,
1001 }
1002 })?;
1003 let tables = all_tables
1004 .remove(&keyspace_name)
1005 .unwrap_or_else(|| Ok(HashMap::new()));
1006 let views = all_views
1007 .remove(&keyspace_name)
1008 .unwrap_or_else(|| Ok(HashMap::new()));
1009 let user_defined_types = all_user_defined_types
1010 .remove(&keyspace_name)
1011 .unwrap_or_else(|| Ok(HashMap::new()));
1012
1013 let (tables, views, user_defined_types) = match (tables, views, user_defined_types) {
1021 (Ok(t), Ok(v), Ok(u)) => (t, v, u),
1022 (Err(e), _, _) | (_, Err(e), _) => return Ok((keyspace_name, Err(e))),
1023 (_, _, Err(e)) => {
1024 return Ok((
1025 keyspace_name,
1026 Err(SingleKeyspaceMetadataError::MissingUDT(e)),
1027 ))
1028 }
1029 };
1030
1031 let keyspace = Keyspace {
1032 strategy,
1033 tables,
1034 views,
1035 user_defined_types,
1036 };
1037
1038 Ok((keyspace_name, Ok(keyspace)))
1039 })
1040 .try_collect()
1041 .await
1042 }
1043}
1044
1045#[derive(DeserializeRow, Debug)]
1046#[scylla(crate = "crate")]
1047struct UdtRow {
1048 keyspace_name: String,
1049 type_name: String,
1050 field_names: Vec<String>,
1051 field_types: Vec<String>,
1052}
1053
1054#[derive(Debug)]
1055struct UdtRowWithParsedFieldTypes {
1056 keyspace_name: String,
1057 type_name: String,
1058 field_names: Vec<String>,
1059 field_types: Vec<PreColumnType>,
1060}
1061
1062impl TryFrom<UdtRow> for UdtRowWithParsedFieldTypes {
1063 type Error = InvalidCqlType;
1064 fn try_from(udt_row: UdtRow) -> Result<Self, InvalidCqlType> {
1065 let UdtRow {
1066 keyspace_name,
1067 type_name,
1068 field_names,
1069 field_types,
1070 } = udt_row;
1071 let field_types = field_types
1072 .into_iter()
1073 .map(|type_| map_string_to_cql_type(&type_))
1074 .collect::<Result<Vec<_>, _>>()?;
1075 Ok(Self {
1076 keyspace_name,
1077 type_name,
1078 field_names,
1079 field_types,
1080 })
1081 }
1082}
1083
1084impl ControlConnection {
1085 async fn query_user_defined_types(
1086 &self,
1087 keyspaces_to_fetch: &[String],
1088 ) -> Result<
1089 PerKeyspaceResult<PerTable<Arc<UserDefinedType<'static>>>, MissingUserDefinedType>,
1090 MetadataError,
1091 > {
1092 let rows = self.query_filter_keyspace_name::<UdtRow>(
1093 "select keyspace_name, type_name, field_names, field_types from system_schema.types",
1094 keyspaces_to_fetch,
1095 )
1096 .map_err(|error| MetadataFetchError {
1097 error,
1098 table: "system_schema.types",
1099 });
1100
1101 let mut udt_rows: Vec<UdtRowWithParsedFieldTypes> = rows
1102 .map(|row_result| {
1103 let udt_row = row_result?.try_into().map_err(|err: InvalidCqlType| {
1104 MetadataError::Udts(UdtMetadataError::InvalidCqlType {
1105 typ: err.typ,
1106 position: err.position,
1107 reason: err.reason,
1108 })
1109 })?;
1110
1111 Ok::<_, MetadataError>(udt_row)
1112 })
1113 .try_collect()
1114 .await?;
1115
1116 let instant_before_toposort = Instant::now();
1117 topo_sort_udts(&mut udt_rows)?;
1118 let toposort_elapsed = instant_before_toposort.elapsed();
1119 debug!(
1120 "Toposort of UDT definitions took {:.2} ms (udts len: {})",
1121 toposort_elapsed.as_secs_f64() * 1000.,
1122 udt_rows.len(),
1123 );
1124
1125 let mut udts = HashMap::new();
1126 'udts_loop: for udt_row in udt_rows {
1127 let UdtRowWithParsedFieldTypes {
1128 keyspace_name,
1129 type_name,
1130 field_names,
1131 field_types,
1132 } = udt_row;
1133
1134 let keyspace_name_clone = keyspace_name.clone();
1135 let keyspace_udts_result = udts
1136 .entry(keyspace_name)
1137 .or_insert_with(|| Ok(HashMap::new()));
1138
1139 let keyspace_udts = match keyspace_udts_result {
1141 Ok(udts) => udts,
1142 Err(_) => continue,
1143 };
1144
1145 let mut fields = Vec::with_capacity(field_names.len());
1146
1147 for (field_name, field_type) in field_names.into_iter().zip(field_types.into_iter()) {
1148 match field_type.into_cql_type(&keyspace_name_clone, keyspace_udts) {
1149 Ok(cql_type) => fields.push((field_name.into(), cql_type)),
1150 Err(e) => {
1151 *keyspace_udts_result = Err(e);
1152 continue 'udts_loop;
1153 }
1154 }
1155 }
1156
1157 let udt = Arc::new(UserDefinedType {
1158 name: type_name.clone().into(),
1159 keyspace: keyspace_name_clone.into(),
1160 field_types: fields,
1161 });
1162
1163 keyspace_udts.insert(type_name, udt);
1164 }
1165
1166 Ok(udts)
1167 }
1168}
1169
1170fn topo_sort_udts(udts: &mut Vec<UdtRowWithParsedFieldTypes>) -> Result<(), UdtMetadataError> {
1171 fn do_with_referenced_udts(what: &mut impl FnMut(&str), pre_cql_type: &PreColumnType) {
1172 match pre_cql_type {
1173 PreColumnType::Native(_) => (),
1174 PreColumnType::Collection { typ: type_, .. } => match type_ {
1175 PreCollectionType::List(t) | PreCollectionType::Set(t) => {
1176 do_with_referenced_udts(what, t)
1177 }
1178 PreCollectionType::Map(t1, t2) => {
1179 do_with_referenced_udts(what, t1);
1180 do_with_referenced_udts(what, t2);
1181 }
1182 },
1183 PreColumnType::Tuple(types) => types
1184 .iter()
1185 .for_each(|type_| do_with_referenced_udts(what, type_)),
1186 PreColumnType::Vector { typ: type_, .. } => do_with_referenced_udts(what, type_),
1187 PreColumnType::UserDefinedType { name, .. } => what(name),
1188 }
1189 }
1190
1191 let mut indegs = udts
1193 .drain(..)
1194 .map(|def| {
1195 (
1196 (def.keyspace_name.clone(), def.type_name.clone()),
1197 (def, Cell::new(0u32)),
1198 )
1199 })
1200 .collect::<HashMap<_, _>>();
1201
1202 for (def, _) in indegs.values() {
1204 let mut increment_referred_udts = |type_name: &str| {
1205 let deg = indegs
1206 .get(&(def.keyspace_name.clone(), type_name.to_string()))
1207 .map(|(_, count)| count);
1208
1209 if let Some(deg_cell) = deg {
1210 deg_cell.set(deg_cell.get() + 1);
1211 }
1212 };
1213
1214 for field_type in def.field_types.iter() {
1216 do_with_referenced_udts(&mut increment_referred_udts, field_type);
1217 }
1218 }
1219
1220 let mut sorted = Vec::with_capacity(indegs.len());
1221 let mut next_idx = 0;
1222
1223 for (key, _) in indegs.iter().filter(|(_, (_, deg))| deg.get() == 0) {
1225 sorted.push(key);
1226 }
1227
1228 while let Some(key @ (keyspace, _type_name)) = sorted.get(next_idx).copied() {
1229 next_idx += 1;
1230 let mut decrement_referred_udts = |type_name: &str| {
1233 let key_value = indegs.get_key_value(&(keyspace.clone(), type_name.to_string()));
1234
1235 if let Some((ref_key, (_, cnt))) = key_value {
1236 let new_cnt = cnt.get() - 1;
1237 cnt.set(new_cnt);
1238 if new_cnt == 0 {
1239 sorted.push(ref_key);
1240 }
1241 }
1242 };
1243
1244 let def = &indegs.get(key).unwrap().0;
1245 for field_type in def.field_types.iter() {
1247 do_with_referenced_udts(&mut decrement_referred_udts, field_type);
1248 }
1249 }
1250
1251 if sorted.len() < indegs.len() {
1252 return Err(UdtMetadataError::CircularTypeDependency);
1254 }
1255
1256 let owned_sorted = sorted.into_iter().cloned().collect::<Vec<_>>();
1257 assert!(udts.is_empty());
1258 for key in owned_sorted.into_iter().rev() {
1259 udts.push(indegs.remove(&key).unwrap().0);
1260 }
1261
1262 Ok(())
1263}
1264
1265#[cfg(test)]
1266mod toposort_tests {
1267 use crate::test_utils::setup_tracing;
1268
1269 use super::{topo_sort_udts, UdtRow, UdtRowWithParsedFieldTypes};
1270
1271 const KEYSPACE1: &str = "KEYSPACE1";
1272 const KEYSPACE2: &str = "KEYSPACE2";
1273
1274 fn make_udt_row(
1275 keyspace_name: String,
1276 type_name: String,
1277 field_types: Vec<String>,
1278 ) -> UdtRowWithParsedFieldTypes {
1279 UdtRow {
1280 keyspace_name,
1281 type_name,
1282 field_names: vec!["udt_field".into(); field_types.len()],
1283 field_types,
1284 }
1285 .try_into()
1286 .unwrap()
1287 }
1288
1289 fn get_udt_idx(
1290 toposorted: &[UdtRowWithParsedFieldTypes],
1291 keyspace_name: &str,
1292 type_name: &str,
1293 ) -> usize {
1294 toposorted
1295 .iter()
1296 .enumerate()
1297 .find_map(|(idx, def)| {
1298 (def.type_name == type_name && def.keyspace_name == keyspace_name).then_some(idx)
1299 })
1300 .unwrap()
1301 }
1302
1303 #[test]
1304 #[ntest::timeout(1000)]
1305 fn test_udt_topo_sort_valid_case() {
1306 setup_tracing();
1307 let mut udts = vec![
1325 make_udt_row(KEYSPACE1.into(), "A".into(), vec!["blob".into()]),
1326 make_udt_row(KEYSPACE1.into(), "B".into(), vec!["A".into(), "D".into()]),
1327 make_udt_row(
1328 KEYSPACE1.into(),
1329 "C".into(),
1330 vec!["blob".into(), "B".into(), "list<map<F, text>>".into()],
1331 ),
1332 make_udt_row(
1333 KEYSPACE1.into(),
1334 "D".into(),
1335 vec!["A".into(), "blob".into()],
1336 ),
1337 make_udt_row(KEYSPACE1.into(), "E".into(), vec!["blob".into()]),
1338 make_udt_row(
1339 KEYSPACE1.into(),
1340 "F".into(),
1341 vec!["B".into(), "blob".into()],
1342 ),
1343 make_udt_row(
1344 KEYSPACE2.into(),
1345 "A".into(),
1346 vec!["B".into(), "tuple<E, E>".into()],
1347 ),
1348 make_udt_row(KEYSPACE2.into(), "B".into(), vec!["map<text, D>".into()]),
1349 make_udt_row(KEYSPACE2.into(), "C".into(), vec!["frozen<A>".into()]),
1350 make_udt_row(KEYSPACE2.into(), "D".into(), vec!["blob".into()]),
1351 make_udt_row(KEYSPACE2.into(), "E".into(), vec!["D".into()]),
1352 ];
1353
1354 topo_sort_udts(&mut udts).unwrap();
1355
1356 assert!(get_udt_idx(&udts, KEYSPACE1, "A") < get_udt_idx(&udts, KEYSPACE1, "B"));
1357 assert!(get_udt_idx(&udts, KEYSPACE1, "A") < get_udt_idx(&udts, KEYSPACE1, "D"));
1358 assert!(get_udt_idx(&udts, KEYSPACE1, "B") < get_udt_idx(&udts, KEYSPACE1, "C"));
1359 assert!(get_udt_idx(&udts, KEYSPACE1, "B") < get_udt_idx(&udts, KEYSPACE1, "F"));
1360 assert!(get_udt_idx(&udts, KEYSPACE1, "F") < get_udt_idx(&udts, KEYSPACE1, "C"));
1361 assert!(get_udt_idx(&udts, KEYSPACE1, "D") < get_udt_idx(&udts, KEYSPACE1, "B"));
1362
1363 assert!(get_udt_idx(&udts, KEYSPACE2, "B") < get_udt_idx(&udts, KEYSPACE2, "A"));
1364 assert!(get_udt_idx(&udts, KEYSPACE2, "D") < get_udt_idx(&udts, KEYSPACE2, "B"));
1365 assert!(get_udt_idx(&udts, KEYSPACE2, "D") < get_udt_idx(&udts, KEYSPACE2, "E"));
1366 assert!(get_udt_idx(&udts, KEYSPACE2, "E") < get_udt_idx(&udts, KEYSPACE2, "A"));
1367 assert!(get_udt_idx(&udts, KEYSPACE2, "A") < get_udt_idx(&udts, KEYSPACE2, "C"));
1368 }
1369
1370 #[test]
1371 #[ntest::timeout(1000)]
1372 fn test_udt_topo_sort_detects_cycles() {
1373 setup_tracing();
1374 const KEYSPACE1: &str = "KEYSPACE1";
1375 let tests = [
1376 vec![make_udt_row(
1379 KEYSPACE1.into(),
1380 "A".into(),
1381 vec!["blob".into(), "A".into()],
1382 )],
1383 vec![
1386 make_udt_row(
1387 KEYSPACE1.into(),
1388 "A".into(),
1389 vec!["blob".into(), "B".into()],
1390 ),
1391 make_udt_row(
1392 KEYSPACE1.into(),
1393 "B".into(),
1394 vec!["int".into(), "map<text, A>".into()],
1395 ),
1396 make_udt_row(KEYSPACE1.into(), "E".into(), vec!["text".into()]),
1397 ],
1398 ];
1399
1400 for mut udts in tests {
1401 topo_sort_udts(&mut udts).unwrap_err();
1402 }
1403 }
1404
1405 #[test]
1406 #[ntest::timeout(1000)]
1407 fn test_udt_topo_sort_ignores_invalid_metadata() {
1408 setup_tracing();
1409 let mut udts = vec![
1411 make_udt_row(
1412 KEYSPACE1.into(),
1413 "A".into(),
1414 vec!["blob".into(), "B".into()],
1415 ),
1416 make_udt_row(
1417 KEYSPACE1.into(),
1418 "B".into(),
1419 vec!["int".into(), "map<text, C>".into()],
1420 ),
1421 make_udt_row(KEYSPACE1.into(), "E".into(), vec!["text".into()]),
1422 ];
1423
1424 topo_sort_udts(&mut udts).unwrap();
1425
1426 assert!(get_udt_idx(&udts, KEYSPACE1, "B") < get_udt_idx(&udts, KEYSPACE1, "A"));
1427 }
1428}
1429
1430impl ControlConnection {
1431 async fn query_tables(
1432 &self,
1433 keyspaces_to_fetch: &[String],
1434 tables: &mut PerKsTableResult<Table, SingleKeyspaceMetadataError>,
1435 ) -> Result<PerKeyspaceResult<PerTable<Table>, SingleKeyspaceMetadataError>, MetadataError>
1436 {
1437 let rows = self
1438 .query_filter_keyspace_name::<(String, String)>(
1439 "SELECT keyspace_name, table_name FROM system_schema.tables",
1440 keyspaces_to_fetch,
1441 )
1442 .map_err(|error| MetadataFetchError {
1443 error,
1444 table: "system_schema.tables",
1445 });
1446 let mut result = HashMap::new();
1447
1448 rows.map(|row_result| {
1449 let keyspace_and_table_name = row_result?;
1450
1451 let table = tables.remove(&keyspace_and_table_name).unwrap_or(Ok(Table {
1452 columns: HashMap::new(),
1453 partition_key: vec![],
1454 clustering_key: vec![],
1455 partitioner: None,
1456 pk_column_specs: vec![],
1457 }));
1458
1459 let mut entry = result
1460 .entry(keyspace_and_table_name.0)
1461 .or_insert_with(|| Ok(HashMap::new()));
1462 match (&mut entry, table) {
1463 (Ok(tables), Ok(table)) => {
1464 let _ = tables.insert(keyspace_and_table_name.1, table);
1465 }
1466 (Err(_), _) => (),
1467 (Ok(_), Err(e)) => *entry = Err(e),
1468 };
1469
1470 Ok::<_, MetadataError>(())
1471 })
1472 .try_for_each(|_| future::ok(()))
1473 .await?;
1474
1475 Ok(result)
1476 }
1477
1478 async fn query_views(
1479 &self,
1480 keyspaces_to_fetch: &[String],
1481 tables: &mut PerKsTableResult<Table, SingleKeyspaceMetadataError>,
1482 ) -> Result<
1483 PerKeyspaceResult<PerTable<MaterializedView>, SingleKeyspaceMetadataError>,
1484 MetadataError,
1485 > {
1486 let rows = self
1487 .query_filter_keyspace_name::<(String, String, String)>(
1488 "SELECT keyspace_name, view_name, base_table_name FROM system_schema.views",
1489 keyspaces_to_fetch,
1490 )
1491 .map_err(|error| MetadataFetchError {
1492 error,
1493 table: "system_schema.views",
1494 });
1495
1496 let mut result = HashMap::new();
1497
1498 rows.map(|row_result| {
1499 let (keyspace_name, view_name, base_table_name) = row_result?;
1500
1501 let keyspace_and_view_name = (keyspace_name, view_name);
1502
1503 let materialized_view = tables
1504 .remove(&keyspace_and_view_name)
1505 .unwrap_or(Ok(Table {
1506 columns: HashMap::new(),
1507 partition_key: vec![],
1508 clustering_key: vec![],
1509 partitioner: None,
1510 pk_column_specs: vec![],
1511 }))
1512 .map(|table| MaterializedView {
1513 view_metadata: table,
1514 base_table_name,
1515 });
1516
1517 let mut entry = result
1518 .entry(keyspace_and_view_name.0)
1519 .or_insert_with(|| Ok(HashMap::new()));
1520
1521 match (&mut entry, materialized_view) {
1522 (Ok(views), Ok(view)) => {
1523 let _ = views.insert(keyspace_and_view_name.1, view);
1524 }
1525 (Err(_), _) => (),
1526 (Ok(_), Err(e)) => *entry = Err(e),
1527 };
1528
1529 Ok::<_, MetadataError>(())
1530 })
1531 .try_for_each(|_| future::ok(()))
1532 .await?;
1533
1534 Ok(result)
1535 }
1536
1537 async fn query_tables_schema(
1538 &self,
1539 keyspaces_to_fetch: &[String],
1540 udts: &PerKeyspaceResult<PerTable<Arc<UserDefinedType<'static>>>, MissingUserDefinedType>,
1541 ) -> Result<PerKsTableResult<Table, SingleKeyspaceMetadataError>, MetadataError> {
1542 const THRIFT_EMPTY_TYPE: &str = "empty";
1546
1547 type RowType = (String, String, String, String, i32, String);
1548
1549 let rows = self.query_filter_keyspace_name::<RowType>(
1550 "select keyspace_name, table_name, column_name, kind, position, type from system_schema.columns",
1551 keyspaces_to_fetch
1552 ).map_err(|error| MetadataFetchError {
1553 error,
1554 table: "system_schema.columns",
1555 });
1556
1557 let empty_ok_map = Ok(HashMap::new());
1558
1559 let mut tables_schema: HashMap<_, Result<_, SingleKeyspaceMetadataError>> = HashMap::new();
1560
1561 rows.map(|row_result| {
1562 let (keyspace_name, table_name, column_name, kind, position, type_) = row_result?;
1563
1564 if type_ == THRIFT_EMPTY_TYPE {
1565 return Ok::<_, MetadataError>(());
1566 }
1567
1568 let keyspace_udts: &PerTable<Arc<UserDefinedType<'static>>> =
1569 match udts.get(&keyspace_name).unwrap_or(&empty_ok_map) {
1570 Ok(udts) => udts,
1571 Err(e) => {
1572 tables_schema.insert(
1594 (keyspace_name, table_name),
1595 Err(SingleKeyspaceMetadataError::MissingUDT(e.clone())),
1596 );
1597 return Ok::<_, MetadataError>(());
1598 }
1599 };
1600 let pre_cql_type = map_string_to_cql_type(&type_).map_err(|err: InvalidCqlType| {
1601 TablesMetadataError::InvalidCqlType {
1602 typ: err.typ,
1603 position: err.position,
1604 reason: err.reason,
1605 }
1606 })?;
1607 let cql_type = match pre_cql_type.into_cql_type(&keyspace_name, keyspace_udts) {
1608 Ok(t) => t,
1609 Err(e) => {
1610 tables_schema.insert(
1611 (keyspace_name, table_name),
1612 Err(SingleKeyspaceMetadataError::MissingUDT(e)),
1613 );
1614 return Ok::<_, MetadataError>(());
1615 }
1616 };
1617
1618 let kind = ColumnKind::from_str(&kind).map_err(|_| {
1619 TablesMetadataError::UnknownColumnKind {
1620 keyspace_name: keyspace_name.clone(),
1621 table_name: table_name.clone(),
1622 column_name: column_name.clone(),
1623 column_kind: kind,
1624 }
1625 })?;
1626
1627 let Ok(entry) = tables_schema
1628 .entry((keyspace_name, table_name))
1629 .or_insert(Ok((
1630 HashMap::new(), Vec::new(), Vec::new(), )))
1634 else {
1635 return Ok::<_, MetadataError>(());
1637 };
1638
1639 if kind == ColumnKind::PartitionKey || kind == ColumnKind::Clustering {
1640 let key_list: &mut Vec<(i32, String)> = if kind == ColumnKind::PartitionKey {
1641 entry.1.borrow_mut()
1642 } else {
1643 entry.2.borrow_mut()
1644 };
1645 key_list.push((position, column_name.clone()));
1646 }
1647
1648 entry.0.insert(
1649 column_name,
1650 Column {
1651 typ: cql_type,
1652 kind,
1653 },
1654 );
1655
1656 Ok::<_, MetadataError>(())
1657 })
1658 .try_for_each(|_| future::ok(()))
1659 .await?;
1660
1661 let mut all_partitioners = self.query_table_partitioners().await?;
1662 let mut result = HashMap::new();
1663
1664 'tables_loop: for ((keyspace_name, table_name), table_result) in tables_schema {
1665 let keyspace_and_table_name = (keyspace_name, table_name);
1666
1667 #[allow(clippy::type_complexity)]
1668 let (columns, partition_key_columns, clustering_key_columns): (
1669 HashMap<String, Column>,
1670 Vec<(i32, String)>,
1671 Vec<(i32, String)>,
1672 ) = match table_result {
1673 Ok(table) => table,
1674 Err(e) => {
1675 let _ = result.insert(keyspace_and_table_name, Err(e));
1676 continue;
1677 }
1678 };
1679
1680 fn validate_key_columns(
1681 mut key_columns: Vec<(i32, String)>,
1682 ) -> Result<Vec<String>, i32> {
1683 key_columns.sort_unstable_by_key(|(position, _)| *position);
1684
1685 key_columns
1686 .into_iter()
1687 .enumerate()
1688 .map(|(idx, (position, column_name))| {
1689 let idx: i32 = idx.try_into().unwrap();
1692 if idx == position {
1693 Ok(column_name)
1694 } else {
1695 Err(idx)
1696 }
1697 })
1698 .collect::<Result<Vec<_>, _>>()
1699 }
1700
1701 let partition_key = match validate_key_columns(partition_key_columns) {
1702 Ok(partition_key_columns) => partition_key_columns,
1703 Err(position) => {
1704 result.insert(
1705 keyspace_and_table_name,
1706 Err(SingleKeyspaceMetadataError::IncompletePartitionKey(
1707 position,
1708 )),
1709 );
1710 continue 'tables_loop;
1711 }
1712 };
1713
1714 let clustering_key = match validate_key_columns(clustering_key_columns) {
1715 Ok(clustering_key_columns) => clustering_key_columns,
1716 Err(position) => {
1717 result.insert(
1718 keyspace_and_table_name,
1719 Err(SingleKeyspaceMetadataError::IncompleteClusteringKey(
1720 position,
1721 )),
1722 );
1723 continue 'tables_loop;
1724 }
1725 };
1726
1727 let partitioner = all_partitioners
1728 .remove(&keyspace_and_table_name)
1729 .unwrap_or_default();
1730
1731 let pk_column_specs = partition_key
1734 .iter()
1735 .map(|column_name| (column_name, columns.get(column_name).unwrap().clone().typ))
1736 .map(|(name, typ)| {
1737 let table_spec = TableSpec::owned(
1738 keyspace_and_table_name.0.clone(),
1739 keyspace_and_table_name.1.clone(),
1740 );
1741 ColumnSpec::owned(name.to_owned(), typ, table_spec)
1742 })
1743 .collect();
1744
1745 result.insert(
1746 keyspace_and_table_name,
1747 Ok(Table {
1748 columns,
1749 partition_key,
1750 clustering_key,
1751 partitioner,
1752 pk_column_specs,
1753 }),
1754 );
1755 }
1756
1757 Ok(result)
1758 }
1759}
1760
1761fn map_string_to_cql_type(typ: &str) -> Result<PreColumnType, InvalidCqlType> {
1762 match parse_cql_type(ParserState::new(typ)) {
1763 Err(err) => Err(InvalidCqlType {
1764 typ: typ.to_string(),
1765 position: err.calculate_position(typ).unwrap_or(0),
1766 reason: err.get_cause().to_string(),
1767 }),
1768 Ok((_, p)) if !p.is_at_eof() => Err(InvalidCqlType {
1769 typ: typ.to_string(),
1770 position: p.calculate_position(typ).unwrap_or(0),
1771 reason: "leftover characters".to_string(),
1772 }),
1773 Ok((typ, _)) => Ok(typ),
1774 }
1775}
1776
1777fn parse_cql_type(p: ParserState<'_>) -> ParseResult<(PreColumnType, ParserState<'_>)> {
1778 if let Ok(p) = p.accept("frozen<") {
1779 let (inner_type, p) = parse_cql_type(p)?;
1780 let p = p.accept(">")?;
1781
1782 let frozen_type = freeze_type(inner_type);
1783
1784 Ok((frozen_type, p))
1785 } else if let Ok(p) = p.accept("map<") {
1786 let (key, p) = parse_cql_type(p)?;
1787 let p = p.accept(",")?.skip_white();
1788 let (value, p) = parse_cql_type(p)?;
1789 let p = p.accept(">")?;
1790
1791 let typ = PreColumnType::Collection {
1792 frozen: false,
1793 typ: PreCollectionType::Map(Box::new(key), Box::new(value)),
1794 };
1795
1796 Ok((typ, p))
1797 } else if let Ok(p) = p.accept("list<") {
1798 let (inner_type, p) = parse_cql_type(p)?;
1799 let p = p.accept(">")?;
1800
1801 let typ = PreColumnType::Collection {
1802 frozen: false,
1803 typ: PreCollectionType::List(Box::new(inner_type)),
1804 };
1805
1806 Ok((typ, p))
1807 } else if let Ok(p) = p.accept("set<") {
1808 let (inner_type, p) = parse_cql_type(p)?;
1809 let p = p.accept(">")?;
1810
1811 let typ = PreColumnType::Collection {
1812 frozen: false,
1813 typ: PreCollectionType::Set(Box::new(inner_type)),
1814 };
1815
1816 Ok((typ, p))
1817 } else if let Ok(p) = p.accept("tuple<") {
1818 let mut types = Vec::new();
1819 let p = p.parse_while(|p| {
1820 let (inner_type, p) = parse_cql_type(p)?;
1821 types.push(inner_type);
1822
1823 if let Ok(p) = p.accept(",") {
1824 let p = p.skip_white();
1825 Ok((true, p))
1826 } else if let Ok(p) = p.accept(">") {
1827 Ok((false, p))
1828 } else {
1829 Err(p.error(ParseErrorCause::Other("expected \",\" or \">\"")))
1830 }
1831 })?;
1832
1833 Ok((PreColumnType::Tuple(types), p))
1834 } else if let Ok(p) = p.accept("vector<") {
1835 let (inner_type, p) = parse_cql_type(p)?;
1836
1837 let p = p.skip_white();
1838 let p = p.accept(",")?;
1839 let p = p.skip_white();
1840 let (size, p) = p.parse_u16()?;
1841 let p = p.skip_white();
1842 let p = p.accept(">")?;
1843
1844 let typ = PreColumnType::Vector {
1845 typ: Box::new(inner_type),
1846 dimensions: size,
1847 };
1848
1849 Ok((typ, p))
1850 } else if let Ok((typ, p)) = parse_native_type(p) {
1851 Ok((PreColumnType::Native(typ), p))
1852 } else if let Ok((name, p)) = parse_user_defined_type(p) {
1853 let typ = PreColumnType::UserDefinedType {
1854 frozen: false,
1855 name: name.to_string(),
1856 };
1857 Ok((typ, p))
1858 } else {
1859 Err(p.error(ParseErrorCause::Other("invalid cql type")))
1860 }
1861}
1862
1863fn parse_native_type(p: ParserState) -> ParseResult<(NativeType, ParserState)> {
1864 let (tok, p) = p.take_while(|c| c.is_alphanumeric() || c == '_');
1865 let typ = match tok {
1866 "ascii" => NativeType::Ascii,
1867 "boolean" => NativeType::Boolean,
1868 "blob" => NativeType::Blob,
1869 "counter" => NativeType::Counter,
1870 "date" => NativeType::Date,
1871 "decimal" => NativeType::Decimal,
1872 "double" => NativeType::Double,
1873 "duration" => NativeType::Duration,
1874 "float" => NativeType::Float,
1875 "int" => NativeType::Int,
1876 "bigint" => NativeType::BigInt,
1877 "text" => NativeType::Text,
1878 "timestamp" => NativeType::Timestamp,
1879 "inet" => NativeType::Inet,
1880 "smallint" => NativeType::SmallInt,
1881 "tinyint" => NativeType::TinyInt,
1882 "time" => NativeType::Time,
1883 "timeuuid" => NativeType::Timeuuid,
1884 "uuid" => NativeType::Uuid,
1885 "varint" => NativeType::Varint,
1886 _ => return Err(p.error(ParseErrorCause::Other("invalid native type"))),
1887 };
1888 Ok((typ, p))
1889}
1890
1891fn parse_user_defined_type(p: ParserState) -> ParseResult<(&str, ParserState)> {
1892 let (tok, p) = p.take_while(|c| c.is_alphanumeric() || c == '.' || c == '_' || c == '$');
1896 if tok.is_empty() {
1897 return Err(p.error(ParseErrorCause::Other("invalid user defined type")));
1898 }
1899 Ok((tok, p))
1900}
1901
1902fn freeze_type(typ: PreColumnType) -> PreColumnType {
1903 match typ {
1904 PreColumnType::Collection { typ: type_, .. } => PreColumnType::Collection {
1905 frozen: true,
1906 typ: type_,
1907 },
1908 PreColumnType::UserDefinedType { name, .. } => {
1909 PreColumnType::UserDefinedType { frozen: true, name }
1910 }
1911 other => other,
1912 }
1913}
1914
1915impl ControlConnection {
1916 async fn query_table_partitioners(
1917 &self,
1918 ) -> Result<PerKsTable<Option<String>>, MetadataFetchError> {
1919 fn create_err(err: impl Into<MetadataFetchErrorKind>) -> MetadataFetchError {
1920 MetadataFetchError {
1921 error: err.into(),
1922 table: "system_schema.scylla_tables",
1923 }
1924 }
1925
1926 let mut partitioner_query = Statement::new(
1927 "select keyspace_name, table_name, partitioner from system_schema.scylla_tables",
1928 );
1929 partitioner_query.set_page_size(METADATA_QUERY_PAGE_SIZE);
1930
1931 let rows = self
1932 .query_iter(partitioner_query)
1933 .map(|pager_res| {
1934 let pager = pager_res.map_err(create_err)?;
1935 let stream = pager
1936 .rows_stream::<(String, String, Option<String>)>()
1937 .map_err(create_err)?
1939 .map_err(create_err);
1941 Ok::<_, MetadataFetchError>(stream)
1942 })
1943 .into_stream()
1944 .try_flatten();
1945
1946 let result = rows
1947 .map(|row_result| {
1948 let (keyspace_name, table_name, partitioner) = row_result?;
1949 Ok::<_, MetadataFetchError>(((keyspace_name, table_name), partitioner))
1950 })
1951 .try_collect::<HashMap<_, _>>()
1952 .await;
1953
1954 match result {
1955 Err(MetadataFetchError {
1960 error:
1961 MetadataFetchErrorKind::NextRowError(NextRowError::NextPageError(
1962 NextPageError::RequestFailure(RequestError::LastAttemptError(
1963 RequestAttemptError::DbError(DbError::Invalid, _),
1964 )),
1965 )),
1966 ..
1967 }) => Ok(HashMap::new()),
1968 result => result,
1969 }
1970 }
1971}
1972
1973fn strategy_from_string_map(
1974 mut strategy_map: HashMap<String, String>,
1975) -> Result<Strategy, KeyspaceStrategyError> {
1976 let strategy_name: String = strategy_map
1977 .remove("class")
1978 .ok_or(KeyspaceStrategyError::MissingClassForStrategyDefinition)?;
1979
1980 let strategy: Strategy = match strategy_name.as_str() {
1981 "org.apache.cassandra.locator.SimpleStrategy" | "SimpleStrategy" => {
1982 let rep_factor_str: String = strategy_map
1983 .remove("replication_factor")
1984 .ok_or(KeyspaceStrategyError::MissingReplicationFactorForSimpleStrategy)?;
1985
1986 let replication_factor: usize = usize::from_str(&rep_factor_str)
1987 .map_err(KeyspaceStrategyError::ReplicationFactorParseError)?;
1988
1989 Strategy::SimpleStrategy { replication_factor }
1990 }
1991 "org.apache.cassandra.locator.NetworkTopologyStrategy" | "NetworkTopologyStrategy" => {
1992 let mut datacenter_repfactors: HashMap<String, usize> =
1993 HashMap::with_capacity(strategy_map.len());
1994
1995 for (key, value) in strategy_map.drain() {
1996 let rep_factor: usize = usize::from_str(&value).map_err(|_| {
1997 KeyspaceStrategyError::UnexpectedNetworkTopologyStrategyOption {
2001 key: key.clone(),
2002 value,
2003 }
2004 })?;
2005
2006 datacenter_repfactors.insert(key, rep_factor);
2007 }
2008
2009 Strategy::NetworkTopologyStrategy {
2010 datacenter_repfactors,
2011 }
2012 }
2013 "org.apache.cassandra.locator.LocalStrategy" | "LocalStrategy" => Strategy::LocalStrategy,
2014 _ => Strategy::Other {
2015 name: strategy_name,
2016 data: strategy_map,
2017 },
2018 };
2019
2020 Ok(strategy)
2021}
2022
2023#[cfg(test)]
2024mod tests {
2025 use crate::test_utils::setup_tracing;
2026
2027 use super::*;
2028
2029 #[test]
2030 fn test_cql_type_parsing() {
2031 setup_tracing();
2032 let test_cases = [
2033 ("bigint", PreColumnType::Native(NativeType::BigInt)),
2034 (
2035 "list<int>",
2036 PreColumnType::Collection {
2037 frozen: false,
2038 typ: PreCollectionType::List(Box::new(PreColumnType::Native(NativeType::Int))),
2039 },
2040 ),
2041 (
2042 "set<ascii>",
2043 PreColumnType::Collection {
2044 frozen: false,
2045 typ: PreCollectionType::Set(Box::new(PreColumnType::Native(NativeType::Ascii))),
2046 },
2047 ),
2048 (
2049 "map<blob, boolean>",
2050 PreColumnType::Collection {
2051 frozen: false,
2052 typ: PreCollectionType::Map(
2053 Box::new(PreColumnType::Native(NativeType::Blob)),
2054 Box::new(PreColumnType::Native(NativeType::Boolean)),
2055 ),
2056 },
2057 ),
2058 (
2059 "frozen<map<text, text>>",
2060 PreColumnType::Collection {
2061 frozen: true,
2062 typ: PreCollectionType::Map(
2063 Box::new(PreColumnType::Native(NativeType::Text)),
2064 Box::new(PreColumnType::Native(NativeType::Text)),
2065 ),
2066 },
2067 ),
2068 (
2069 "tuple<tinyint, smallint, int, bigint, varint>",
2070 PreColumnType::Tuple(vec![
2071 PreColumnType::Native(NativeType::TinyInt),
2072 PreColumnType::Native(NativeType::SmallInt),
2073 PreColumnType::Native(NativeType::Int),
2074 PreColumnType::Native(NativeType::BigInt),
2075 PreColumnType::Native(NativeType::Varint),
2076 ]),
2077 ),
2078 (
2079 "vector<int, 5>",
2080 PreColumnType::Vector {
2081 typ: Box::new(PreColumnType::Native(NativeType::Int)),
2082 dimensions: 5,
2083 },
2084 ),
2085 (
2086 "vector<text, 1234>",
2087 PreColumnType::Vector {
2088 typ: Box::new(PreColumnType::Native(NativeType::Text)),
2089 dimensions: 1234,
2090 },
2091 ),
2092 (
2093 "com.scylladb.types.AwesomeType",
2094 PreColumnType::UserDefinedType {
2095 frozen: false,
2096 name: "com.scylladb.types.AwesomeType".to_string(),
2097 },
2098 ),
2099 (
2100 "frozen<ks.my_udt>",
2101 PreColumnType::UserDefinedType {
2102 frozen: true,
2103 name: "ks.my_udt".to_string(),
2104 },
2105 ),
2106 (
2107 "map<text, frozen<map<text, text>>>",
2108 PreColumnType::Collection {
2109 frozen: false,
2110 typ: PreCollectionType::Map(
2111 Box::new(PreColumnType::Native(NativeType::Text)),
2112 Box::new(PreColumnType::Collection {
2113 frozen: true,
2114 typ: PreCollectionType::Map(
2115 Box::new(PreColumnType::Native(NativeType::Text)),
2116 Box::new(PreColumnType::Native(NativeType::Text)),
2117 ),
2118 }),
2119 ),
2120 },
2121 ),
2122 (
2123 "map<\
2124 frozen<list<int>>, \
2125 set<\
2126 list<\
2127 tuple<\
2128 list<list<text>>, \
2129 map<text, map<ks.my_type, blob>>, \
2130 frozen<set<set<int>>>\
2131 >\
2132 >\
2133 >\
2134 >",
2135 PreColumnType::Collection {
2137 frozen: false,
2138 typ: PreCollectionType::Map(
2139 Box::new(PreColumnType::Collection {
2140 frozen: true,
2142 typ: PreCollectionType::List(Box::new(PreColumnType::Native(
2143 NativeType::Int,
2144 ))),
2145 }),
2146 Box::new(PreColumnType::Collection {
2147 frozen: false,
2149 typ: PreCollectionType::Set(Box::new(PreColumnType::Collection {
2150 frozen: false,
2152 typ: PreCollectionType::List(Box::new(PreColumnType::Tuple(vec![
2153 PreColumnType::Collection {
2154 frozen: false,
2156 typ: PreCollectionType::List(Box::new(
2157 PreColumnType::Collection {
2158 frozen: false,
2159 typ: PreCollectionType::List(Box::new(
2160 PreColumnType::Native(NativeType::Text),
2161 )),
2162 },
2163 )),
2164 },
2165 PreColumnType::Collection {
2166 frozen: false,
2168 typ: PreCollectionType::Map(
2169 Box::new(PreColumnType::Native(NativeType::Text)),
2170 Box::new(PreColumnType::Collection {
2171 frozen: false,
2172 typ: PreCollectionType::Map(
2173 Box::new(PreColumnType::UserDefinedType {
2174 frozen: false,
2175 name: "ks.my_type".to_string(),
2176 }),
2177 Box::new(PreColumnType::Native(
2178 NativeType::Blob,
2179 )),
2180 ),
2181 }),
2182 ),
2183 },
2184 PreColumnType::Collection {
2185 frozen: true,
2187 typ: PreCollectionType::Set(Box::new(
2188 PreColumnType::Collection {
2189 frozen: false,
2190 typ: PreCollectionType::Set(Box::new(
2191 PreColumnType::Native(NativeType::Int),
2192 )),
2193 },
2194 )),
2195 },
2196 ]))),
2197 })),
2198 }),
2199 ),
2200 },
2201 ),
2202 ];
2203
2204 for (s, expected) in test_cases {
2205 let parsed = map_string_to_cql_type(s).unwrap();
2206 assert_eq!(parsed, expected);
2207 }
2208 }
2209}