scylla/cluster/
state.rs

1use crate::errors::{ClusterStateTokenError, ConnectionPoolError};
2use crate::network::{Connection, PoolConfig, VerifiedKeyspaceName};
3#[cfg(feature = "metrics")]
4use crate::observability::metrics::Metrics;
5use crate::policies::host_filter::HostFilter;
6use crate::routing::locator::tablets::{RawTablet, Tablet, TabletsInfo};
7use crate::routing::locator::ReplicaLocator;
8use crate::routing::partitioner::{calculate_token_for_partition_key, PartitionerName};
9use crate::routing::{Shard, Token};
10
11use itertools::Itertools;
12use scylla_cql::frame::response::result::TableSpec;
13use scylla_cql::serialize::row::{RowSerializationContext, SerializeRow, SerializedValues};
14use std::collections::{HashMap, HashSet};
15use std::sync::Arc;
16use tracing::{debug, warn};
17use uuid::Uuid;
18
19use super::metadata::{Keyspace, Metadata, Strategy};
20use super::node::{Node, NodeRef};
21
22#[derive(Clone)]
23pub struct ClusterState {
24    /// All nodes known to be part of the cluster, accessible by their host ID.
25    /// Often refered to as "topology metadata".
26    pub(crate) known_peers: HashMap<Uuid, Arc<Node>>, // Invariant: nonempty after Cluster::new()
27
28    /// Contains the same set of nodes as `known_peers`.
29    ///
30    /// Introduced to fix the bug that zero-token nodes were missing from
31    /// `ClusterState::get_nodes_info()` slice, because the slice was borrowed
32    /// from `ReplicaLocator`, which only contains nodes with some tokens assigned.
33    // TODO: in 2.0, make `get_nodes_info()` return `Iterator` instead of a slice.
34    // Then, remove this field.
35    pub(crate) all_nodes: Vec<Arc<Node>>,
36
37    /// All keyspaces in the cluster, accessible by their name.
38    /// Often refered to as "schema metadata".
39    pub(crate) keyspaces: HashMap<String, Keyspace>,
40
41    /// The entity which provides a way to find the set of owning nodes (+shards, in case of ScyllaDB)
42    /// for a given (token, replication strategy, table) tuple.
43    /// It relies on both topology and schema metadata.
44    pub(crate) locator: ReplicaLocator,
45}
46
47/// Enables printing [ClusterState] struct in a neat way, skipping the clutter involved by
48/// [ClusterState::ring] being large and [Self::keyspaces] debug print being very verbose by default.
49pub(crate) struct ClusterStateNeatDebug<'a>(pub(crate) &'a Arc<ClusterState>);
50impl std::fmt::Debug for ClusterStateNeatDebug<'_> {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        let cluster_state = &self.0;
53
54        f.debug_struct("ClusterState")
55            .field("known_peers", &cluster_state.known_peers)
56            .field("ring", {
57                struct RingSizePrinter(usize);
58                impl std::fmt::Debug for RingSizePrinter {
59                    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60                        write!(f, "<size={}>", self.0)
61                    }
62                }
63                &RingSizePrinter(cluster_state.locator.ring().len())
64            })
65            .field("keyspaces", &cluster_state.keyspaces.keys())
66            .finish_non_exhaustive()
67    }
68}
69
70impl ClusterState {
71    pub(crate) async fn wait_until_all_pools_are_initialized(&self) {
72        for node in self.locator.unique_nodes_in_global_ring().iter() {
73            node.wait_until_pool_initialized().await;
74        }
75    }
76
77    /// Creates new ClusterState using information about topology held in `metadata`.
78    /// Uses provided `known_peers` hashmap to recycle nodes if possible.
79    #[allow(clippy::too_many_arguments)]
80    pub(crate) async fn new(
81        metadata: Metadata,
82        pool_config: &PoolConfig,
83        known_peers: &HashMap<Uuid, Arc<Node>>,
84        used_keyspace: &Option<VerifiedKeyspaceName>,
85        host_filter: Option<&dyn HostFilter>,
86        mut tablets: TabletsInfo,
87        old_keyspaces: &HashMap<String, Keyspace>,
88        #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
89    ) -> Self {
90        // Create new updated known_peers and ring
91        let mut new_known_peers: HashMap<Uuid, Arc<Node>> =
92            HashMap::with_capacity(metadata.peers.len());
93        let mut ring: Vec<(Token, Arc<Node>)> = Vec::new();
94
95        for peer in metadata.peers {
96            // Take existing Arc<Node> if possible, otherwise create new one
97            // Changing rack/datacenter but not ip address seems improbable
98            // so we can just create new node and connections then
99            let peer_host_id = peer.host_id;
100            let peer_address = peer.address;
101            let peer_tokens;
102
103            let node: Arc<Node> = match known_peers.get(&peer_host_id) {
104                Some(node) if node.datacenter == peer.datacenter && node.rack == peer.rack => {
105                    let (peer_endpoint, tokens) = peer.into_peer_endpoint_and_tokens();
106                    peer_tokens = tokens;
107                    if node.address == peer_address {
108                        node.clone()
109                    } else {
110                        // If IP changes, the Node struct is recreated, but the underlying pool is preserved and notified about the IP change.
111                        Arc::new(Node::inherit_with_ip_changed(node, peer_endpoint))
112                    }
113                }
114                _ => {
115                    let is_enabled = host_filter.map_or(true, |f| f.accept(&peer));
116                    let (peer_endpoint, tokens) = peer.into_peer_endpoint_and_tokens();
117                    peer_tokens = tokens;
118                    Arc::new(Node::new(
119                        peer_endpoint,
120                        pool_config,
121                        used_keyspace.clone(),
122                        is_enabled,
123                        #[cfg(feature = "metrics")]
124                        Arc::clone(metrics),
125                    ))
126                }
127            };
128
129            new_known_peers.insert(peer_host_id, node.clone());
130
131            for token in peer_tokens {
132                ring.push((token, node.clone()));
133            }
134        }
135
136        let keyspaces: HashMap<String, Keyspace> = metadata
137            .keyspaces
138            .into_iter()
139            .filter_map(|(ks_name, ks)| match ks {
140                Ok(ks) => Some((ks_name, ks)),
141                Err(e) => {
142                    if let Some(old_ks) = old_keyspaces.get(&ks_name) {
143                        warn!(
144                            "Encountered an error while processing\
145                            metadata of keyspace \"{ks_name}\": {e}.\
146                            Re-using older version of this keyspace metadata"
147                        );
148                        Some((ks_name, old_ks.clone()))
149                    } else {
150                        warn!(
151                            "Encountered an error while processing metadata\
152                            of keyspace \"{ks_name}\": {e}.\
153                            No previous version of this keyspace metadata found, so it will not be\
154                            present in ClusterData until next refresh."
155                        );
156                        None
157                    }
158                }
159            })
160            .collect();
161
162        {
163            let removed_nodes = {
164                let mut removed_nodes = HashSet::new();
165                for old_peer in known_peers {
166                    if !new_known_peers.contains_key(old_peer.0) {
167                        removed_nodes.insert(*old_peer.0);
168                    }
169                }
170
171                removed_nodes
172            };
173
174            let table_predicate = |spec: &TableSpec| {
175                if let Some(ks) = keyspaces.get(spec.ks_name()) {
176                    ks.tables.contains_key(spec.table_name())
177                } else {
178                    false
179                }
180            };
181
182            let recreated_nodes = {
183                let mut recreated_nodes = HashMap::new();
184                for (old_peer_id, old_peer_node) in known_peers {
185                    if let Some(new_peer_node) = new_known_peers.get(old_peer_id) {
186                        if !Arc::ptr_eq(old_peer_node, new_peer_node) {
187                            recreated_nodes.insert(*old_peer_id, Arc::clone(new_peer_node));
188                        }
189                    }
190                }
191
192                recreated_nodes
193            };
194
195            tablets.perform_maintenance(
196                &table_predicate,
197                &removed_nodes,
198                &new_known_peers,
199                &recreated_nodes,
200            )
201        }
202
203        let (locator, keyspaces) = tokio::task::spawn_blocking(move || {
204            let keyspace_strategies = keyspaces.values().map(|ks| &ks.strategy);
205            let locator = ReplicaLocator::new(ring.into_iter(), keyspace_strategies, tablets);
206            (locator, keyspaces)
207        })
208        .await
209        .unwrap();
210
211        ClusterState {
212            all_nodes: new_known_peers.values().cloned().collect(),
213            known_peers: new_known_peers,
214            keyspaces,
215            locator,
216        }
217    }
218
219    /// Access keyspace details collected by the driver.
220    pub fn get_keyspace(&self, keyspace: impl AsRef<str>) -> Option<&Keyspace> {
221        self.keyspaces.get(keyspace.as_ref())
222    }
223
224    /// Returns an iterator over keyspaces.
225    pub fn keyspaces_iter(&self) -> impl Iterator<Item = (&str, &Keyspace)> {
226        self.keyspaces.iter().map(|(k, v)| (k.as_str(), v))
227    }
228
229    /// Access details about nodes known to the driver
230    pub fn get_nodes_info(&self) -> &[Arc<Node>] {
231        &self.all_nodes
232    }
233
234    /// Compute token of a table partition key
235    ///
236    /// `partition_key` argument contains the values of all partition key
237    /// columns. You can use both unnamed values like a tuple (e.g. `(1, 5, 5)`)
238    /// or named values (e.g. struct that derives `SerializeRow`), as you would
239    /// when executing a request. No additional values are allowed besides values
240    /// for primary key columns.
241    pub fn compute_token(
242        &self,
243        keyspace: &str,
244        table: &str,
245        partition_key: &dyn SerializeRow,
246    ) -> Result<Token, ClusterStateTokenError> {
247        let Some(table) = self
248            .keyspaces
249            .get(keyspace)
250            .and_then(|k| k.tables.get(table))
251        else {
252            return Err(ClusterStateTokenError::UnknownTable {
253                keyspace: keyspace.to_owned(),
254                table: table.to_owned(),
255            });
256        };
257        let values = SerializedValues::from_serializable(
258            &RowSerializationContext::from_specs(table.pk_column_specs.as_slice()),
259            partition_key,
260        )?;
261        let partitioner = table
262            .partitioner
263            .as_deref()
264            .and_then(PartitionerName::from_str)
265            .unwrap_or_default();
266        calculate_token_for_partition_key(&values, &partitioner)
267            .map_err(ClusterStateTokenError::TokenCalculation)
268    }
269
270    /// Access to replicas owning a given token
271    pub fn get_token_endpoints(
272        &self,
273        keyspace: &str,
274        table: &str,
275        token: Token,
276    ) -> Vec<(Arc<Node>, Shard)> {
277        let table_spec = TableSpec::borrowed(keyspace, table);
278        self.get_token_endpoints_iter(&table_spec, token)
279            .map(|(node, shard)| (node.clone(), shard))
280            .collect()
281    }
282
283    pub(crate) fn get_token_endpoints_iter(
284        &self,
285        table_spec: &TableSpec,
286        token: Token,
287    ) -> impl Iterator<Item = (NodeRef<'_>, Shard)> + Clone {
288        let keyspace = self.keyspaces.get(table_spec.ks_name());
289        let strategy = keyspace
290            .map(|k| &k.strategy)
291            .unwrap_or(&Strategy::LocalStrategy);
292        let replica_set = self
293            .replica_locator()
294            .replicas_for_token(token, strategy, None, table_spec);
295
296        replica_set.into_iter()
297    }
298
299    /// Access to replicas owning a given partition key (similar to `nodetool getendpoints`)
300    ///
301    /// `partition_key` argument contains the values of all partition key
302    /// columns. You can use both unnamed values like a tuple (e.g. `(1, 5, 5)`)
303    /// or named values (e.g. struct that derives `SerializeRow`), as you would
304    /// when executing a request. No additional values are allowed besides values
305    /// for primary key columns.
306    pub fn get_endpoints(
307        &self,
308        keyspace: &str,
309        table: &str,
310        partition_key: &dyn SerializeRow,
311    ) -> Result<Vec<(Arc<Node>, Shard)>, ClusterStateTokenError> {
312        let token = self.compute_token(keyspace, table, partition_key)?;
313        Ok(self.get_token_endpoints(keyspace, table, token))
314    }
315
316    /// Access replica location info
317    pub fn replica_locator(&self) -> &ReplicaLocator {
318        &self.locator
319    }
320
321    /// Returns nonempty iterator of working connections to all shards.
322    pub(crate) fn iter_working_connections(
323        &self,
324    ) -> Result<impl Iterator<Item = Arc<Connection>> + '_, ConnectionPoolError> {
325        // The returned iterator is nonempty by nonemptiness invariant of `self.known_peers`.
326        assert!(!self.known_peers.is_empty());
327        let mut peers_iter = self.known_peers.values();
328
329        // First we try to find the first working pool of connections.
330        // If none is found, return error.
331        let first_working_pool = peers_iter
332            .by_ref()
333            .map(|node| node.get_working_connections())
334            .find_or_first(Result::is_ok)
335            .expect("impossible: known_peers was asserted to be nonempty")?;
336
337        let remaining_pools_iter = peers_iter
338            .map(|node| node.get_working_connections())
339            .flatten_ok()
340            .flatten();
341
342        Ok(first_working_pool.into_iter().chain(remaining_pools_iter))
343        // By an invariant `self.known_peers` is nonempty, so the returned iterator
344        // is nonempty, too.
345    }
346
347    pub(super) fn update_tablets(&mut self, raw_tablets: Vec<(TableSpec<'static>, RawTablet)>) {
348        let replica_translator = |uuid: Uuid| self.known_peers.get(&uuid).cloned();
349
350        for (table, raw_tablet) in raw_tablets.into_iter() {
351            // Should we skip tablets that belong to a keyspace not present in
352            // self.keyspaces? The keyspace could have been, without driver's knowledge:
353            // 1. Dropped - in which case we'll remove its info soon (when refreshing
354            // topology) anyway.
355            // 2. Created - no harm in storing the info now.
356            //
357            // So I think we can safely skip checking keyspace presence.
358            let tablet = match Tablet::from_raw_tablet(raw_tablet, replica_translator) {
359                Ok(t) => t,
360                Err((t, f)) => {
361                    debug!("Nodes ({}) that are replicas for a tablet {{ks: {}, table: {}, range: [{}. {}]}} not present in current ClusterState.known_peers. \
362                       Skipping these replicas until topology refresh",
363                       f.iter().format(", "), table.ks_name(), table.table_name(), t.range().0.value(), t.range().1.value());
364                    t
365                }
366            };
367            self.locator.tablets.add_tablet(table, tablet);
368        }
369    }
370}