1use crate::errors::{ClusterStateTokenError, ConnectionPoolError};
2use crate::network::{Connection, PoolConfig, VerifiedKeyspaceName};
3#[cfg(feature = "metrics")]
4use crate::observability::metrics::Metrics;
5use crate::policies::host_filter::HostFilter;
6use crate::routing::locator::tablets::{RawTablet, Tablet, TabletsInfo};
7use crate::routing::locator::ReplicaLocator;
8use crate::routing::partitioner::{calculate_token_for_partition_key, PartitionerName};
9use crate::routing::{Shard, Token};
10
11use itertools::Itertools;
12use scylla_cql::frame::response::result::TableSpec;
13use scylla_cql::serialize::row::{RowSerializationContext, SerializeRow, SerializedValues};
14use std::collections::{HashMap, HashSet};
15use std::sync::Arc;
16use tracing::{debug, warn};
17use uuid::Uuid;
18
19use super::metadata::{Keyspace, Metadata, Strategy};
20use super::node::{Node, NodeRef};
21
22#[derive(Clone)]
23pub struct ClusterState {
24 pub(crate) known_peers: HashMap<Uuid, Arc<Node>>, pub(crate) all_nodes: Vec<Arc<Node>>,
36
37 pub(crate) keyspaces: HashMap<String, Keyspace>,
40
41 pub(crate) locator: ReplicaLocator,
45}
46
47pub(crate) struct ClusterStateNeatDebug<'a>(pub(crate) &'a Arc<ClusterState>);
50impl std::fmt::Debug for ClusterStateNeatDebug<'_> {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 let cluster_state = &self.0;
53
54 f.debug_struct("ClusterState")
55 .field("known_peers", &cluster_state.known_peers)
56 .field("ring", {
57 struct RingSizePrinter(usize);
58 impl std::fmt::Debug for RingSizePrinter {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 write!(f, "<size={}>", self.0)
61 }
62 }
63 &RingSizePrinter(cluster_state.locator.ring().len())
64 })
65 .field("keyspaces", &cluster_state.keyspaces.keys())
66 .finish_non_exhaustive()
67 }
68}
69
70impl ClusterState {
71 pub(crate) async fn wait_until_all_pools_are_initialized(&self) {
72 for node in self.locator.unique_nodes_in_global_ring().iter() {
73 node.wait_until_pool_initialized().await;
74 }
75 }
76
77 #[allow(clippy::too_many_arguments)]
80 pub(crate) async fn new(
81 metadata: Metadata,
82 pool_config: &PoolConfig,
83 known_peers: &HashMap<Uuid, Arc<Node>>,
84 used_keyspace: &Option<VerifiedKeyspaceName>,
85 host_filter: Option<&dyn HostFilter>,
86 mut tablets: TabletsInfo,
87 old_keyspaces: &HashMap<String, Keyspace>,
88 #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
89 ) -> Self {
90 let mut new_known_peers: HashMap<Uuid, Arc<Node>> =
92 HashMap::with_capacity(metadata.peers.len());
93 let mut ring: Vec<(Token, Arc<Node>)> = Vec::new();
94
95 for peer in metadata.peers {
96 let peer_host_id = peer.host_id;
100 let peer_address = peer.address;
101 let peer_tokens;
102
103 let node: Arc<Node> = match known_peers.get(&peer_host_id) {
104 Some(node) if node.datacenter == peer.datacenter && node.rack == peer.rack => {
105 let (peer_endpoint, tokens) = peer.into_peer_endpoint_and_tokens();
106 peer_tokens = tokens;
107 if node.address == peer_address {
108 node.clone()
109 } else {
110 Arc::new(Node::inherit_with_ip_changed(node, peer_endpoint))
112 }
113 }
114 _ => {
115 let is_enabled = host_filter.map_or(true, |f| f.accept(&peer));
116 let (peer_endpoint, tokens) = peer.into_peer_endpoint_and_tokens();
117 peer_tokens = tokens;
118 Arc::new(Node::new(
119 peer_endpoint,
120 pool_config,
121 used_keyspace.clone(),
122 is_enabled,
123 #[cfg(feature = "metrics")]
124 Arc::clone(metrics),
125 ))
126 }
127 };
128
129 new_known_peers.insert(peer_host_id, node.clone());
130
131 for token in peer_tokens {
132 ring.push((token, node.clone()));
133 }
134 }
135
136 let keyspaces: HashMap<String, Keyspace> = metadata
137 .keyspaces
138 .into_iter()
139 .filter_map(|(ks_name, ks)| match ks {
140 Ok(ks) => Some((ks_name, ks)),
141 Err(e) => {
142 if let Some(old_ks) = old_keyspaces.get(&ks_name) {
143 warn!(
144 "Encountered an error while processing\
145 metadata of keyspace \"{ks_name}\": {e}.\
146 Re-using older version of this keyspace metadata"
147 );
148 Some((ks_name, old_ks.clone()))
149 } else {
150 warn!(
151 "Encountered an error while processing metadata\
152 of keyspace \"{ks_name}\": {e}.\
153 No previous version of this keyspace metadata found, so it will not be\
154 present in ClusterData until next refresh."
155 );
156 None
157 }
158 }
159 })
160 .collect();
161
162 {
163 let removed_nodes = {
164 let mut removed_nodes = HashSet::new();
165 for old_peer in known_peers {
166 if !new_known_peers.contains_key(old_peer.0) {
167 removed_nodes.insert(*old_peer.0);
168 }
169 }
170
171 removed_nodes
172 };
173
174 let table_predicate = |spec: &TableSpec| {
175 if let Some(ks) = keyspaces.get(spec.ks_name()) {
176 ks.tables.contains_key(spec.table_name())
177 } else {
178 false
179 }
180 };
181
182 let recreated_nodes = {
183 let mut recreated_nodes = HashMap::new();
184 for (old_peer_id, old_peer_node) in known_peers {
185 if let Some(new_peer_node) = new_known_peers.get(old_peer_id) {
186 if !Arc::ptr_eq(old_peer_node, new_peer_node) {
187 recreated_nodes.insert(*old_peer_id, Arc::clone(new_peer_node));
188 }
189 }
190 }
191
192 recreated_nodes
193 };
194
195 tablets.perform_maintenance(
196 &table_predicate,
197 &removed_nodes,
198 &new_known_peers,
199 &recreated_nodes,
200 )
201 }
202
203 let (locator, keyspaces) = tokio::task::spawn_blocking(move || {
204 let keyspace_strategies = keyspaces.values().map(|ks| &ks.strategy);
205 let locator = ReplicaLocator::new(ring.into_iter(), keyspace_strategies, tablets);
206 (locator, keyspaces)
207 })
208 .await
209 .unwrap();
210
211 ClusterState {
212 all_nodes: new_known_peers.values().cloned().collect(),
213 known_peers: new_known_peers,
214 keyspaces,
215 locator,
216 }
217 }
218
219 pub fn get_keyspace(&self, keyspace: impl AsRef<str>) -> Option<&Keyspace> {
221 self.keyspaces.get(keyspace.as_ref())
222 }
223
224 pub fn keyspaces_iter(&self) -> impl Iterator<Item = (&str, &Keyspace)> {
226 self.keyspaces.iter().map(|(k, v)| (k.as_str(), v))
227 }
228
229 pub fn get_nodes_info(&self) -> &[Arc<Node>] {
231 &self.all_nodes
232 }
233
234 pub fn compute_token(
242 &self,
243 keyspace: &str,
244 table: &str,
245 partition_key: &dyn SerializeRow,
246 ) -> Result<Token, ClusterStateTokenError> {
247 let Some(table) = self
248 .keyspaces
249 .get(keyspace)
250 .and_then(|k| k.tables.get(table))
251 else {
252 return Err(ClusterStateTokenError::UnknownTable {
253 keyspace: keyspace.to_owned(),
254 table: table.to_owned(),
255 });
256 };
257 let values = SerializedValues::from_serializable(
258 &RowSerializationContext::from_specs(table.pk_column_specs.as_slice()),
259 partition_key,
260 )?;
261 let partitioner = table
262 .partitioner
263 .as_deref()
264 .and_then(PartitionerName::from_str)
265 .unwrap_or_default();
266 calculate_token_for_partition_key(&values, &partitioner)
267 .map_err(ClusterStateTokenError::TokenCalculation)
268 }
269
270 pub fn get_token_endpoints(
272 &self,
273 keyspace: &str,
274 table: &str,
275 token: Token,
276 ) -> Vec<(Arc<Node>, Shard)> {
277 let table_spec = TableSpec::borrowed(keyspace, table);
278 self.get_token_endpoints_iter(&table_spec, token)
279 .map(|(node, shard)| (node.clone(), shard))
280 .collect()
281 }
282
283 pub(crate) fn get_token_endpoints_iter(
284 &self,
285 table_spec: &TableSpec,
286 token: Token,
287 ) -> impl Iterator<Item = (NodeRef<'_>, Shard)> + Clone {
288 let keyspace = self.keyspaces.get(table_spec.ks_name());
289 let strategy = keyspace
290 .map(|k| &k.strategy)
291 .unwrap_or(&Strategy::LocalStrategy);
292 let replica_set = self
293 .replica_locator()
294 .replicas_for_token(token, strategy, None, table_spec);
295
296 replica_set.into_iter()
297 }
298
299 pub fn get_endpoints(
307 &self,
308 keyspace: &str,
309 table: &str,
310 partition_key: &dyn SerializeRow,
311 ) -> Result<Vec<(Arc<Node>, Shard)>, ClusterStateTokenError> {
312 let token = self.compute_token(keyspace, table, partition_key)?;
313 Ok(self.get_token_endpoints(keyspace, table, token))
314 }
315
316 pub fn replica_locator(&self) -> &ReplicaLocator {
318 &self.locator
319 }
320
321 pub(crate) fn iter_working_connections(
323 &self,
324 ) -> Result<impl Iterator<Item = Arc<Connection>> + '_, ConnectionPoolError> {
325 assert!(!self.known_peers.is_empty());
327 let mut peers_iter = self.known_peers.values();
328
329 let first_working_pool = peers_iter
332 .by_ref()
333 .map(|node| node.get_working_connections())
334 .find_or_first(Result::is_ok)
335 .expect("impossible: known_peers was asserted to be nonempty")?;
336
337 let remaining_pools_iter = peers_iter
338 .map(|node| node.get_working_connections())
339 .flatten_ok()
340 .flatten();
341
342 Ok(first_working_pool.into_iter().chain(remaining_pools_iter))
343 }
346
347 pub(super) fn update_tablets(&mut self, raw_tablets: Vec<(TableSpec<'static>, RawTablet)>) {
348 let replica_translator = |uuid: Uuid| self.known_peers.get(&uuid).cloned();
349
350 for (table, raw_tablet) in raw_tablets.into_iter() {
351 let tablet = match Tablet::from_raw_tablet(raw_tablet, replica_translator) {
359 Ok(t) => t,
360 Err((t, f)) => {
361 debug!("Nodes ({}) that are replicas for a tablet {{ks: {}, table: {}, range: [{}. {}]}} not present in current ClusterState.known_peers. \
362 Skipping these replicas until topology refresh",
363 f.iter().format(", "), table.ks_name(), table.table_name(), t.range().0.value(), t.range().1.value());
364 t
365 }
366 };
367 self.locator.tablets.add_tablet(table, tablet);
368 }
369 }
370}