1use itertools::Itertools;
2use tokio::net::lookup_host;
3use tracing::warn;
4use uuid::Uuid;
5
6use crate::errors::{ConnectionPoolError, UseKeyspaceError};
7use crate::network::Connection;
8use crate::network::VerifiedKeyspaceName;
9use crate::network::{NodeConnectionPool, PoolConfig};
10#[cfg(feature = "metrics")]
11use crate::observability::metrics::Metrics;
12use crate::routing::{Shard, Sharder};
14
15use std::fmt::Display;
16use std::io;
17use std::net::IpAddr;
18use std::{
19 hash::{Hash, Hasher},
20 net::SocketAddr,
21 sync::{
22 atomic::{AtomicBool, Ordering},
23 Arc,
24 },
25};
26
27use crate::cluster::metadata::{PeerEndpoint, UntranslatedEndpoint};
28
29#[non_exhaustive]
37#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
38pub enum NodeAddr {
39 Translatable(SocketAddr),
41 Untranslatable(SocketAddr),
43}
44
45impl NodeAddr {
46 pub(crate) fn into_inner(self) -> SocketAddr {
47 match self {
48 NodeAddr::Translatable(addr) | NodeAddr::Untranslatable(addr) => addr,
49 }
50 }
51 pub(crate) fn inner_mut(&mut self) -> &mut SocketAddr {
52 match self {
53 NodeAddr::Translatable(addr) | NodeAddr::Untranslatable(addr) => addr,
54 }
55 }
56 pub fn ip(&self) -> IpAddr {
57 self.into_inner().ip()
58 }
59 pub fn port(&self) -> u16 {
60 self.into_inner().port()
61 }
62}
63
64impl Display for NodeAddr {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 write!(f, "{}", self.into_inner())
67 }
68}
69
70#[derive(Debug)]
76pub struct Node {
77 pub host_id: Uuid,
78 pub address: NodeAddr,
79 pub datacenter: Option<String>,
80 pub rack: Option<String>,
81
82 pool: Option<NodeConnectionPool>,
84
85 down_marker: AtomicBool,
86}
87
88pub type NodeRef<'a> = &'a Arc<Node>;
90
91impl Node {
92 pub(crate) fn new(
94 peer: PeerEndpoint,
95 pool_config: &PoolConfig,
96 keyspace_name: Option<VerifiedKeyspaceName>,
97 enabled: bool,
98 #[cfg(feature = "metrics")] metrics: Arc<Metrics>,
99 ) -> Self {
100 let host_id = peer.host_id;
101 let address = peer.address;
102 let datacenter = peer.datacenter.clone();
103 let rack = peer.rack.clone();
104
105 let (pool_empty_notifier, _) = tokio::sync::broadcast::channel(1);
107 let pool = enabled.then(|| {
108 NodeConnectionPool::new(
109 UntranslatedEndpoint::Peer(peer),
110 pool_config,
111 keyspace_name,
112 pool_empty_notifier,
113 #[cfg(feature = "metrics")]
114 metrics,
115 )
116 });
117
118 Node {
119 host_id,
120 address,
121 datacenter,
122 rack,
123 pool,
124 down_marker: false.into(),
125 }
126 }
127
128 pub(crate) fn inherit_with_ip_changed(node: &Node, endpoint: PeerEndpoint) -> Self {
137 let address = endpoint.address;
138 if let Some(ref pool) = node.pool {
139 pool.update_endpoint(endpoint);
140 }
141 Self {
142 address,
143 down_marker: false.into(),
144 datacenter: node.datacenter.clone(),
145 rack: node.rack.clone(),
146 host_id: node.host_id,
147 pool: node.pool.clone(),
148 }
149 }
150
151 pub fn sharder(&self) -> Option<Sharder> {
152 self.pool.as_ref()?.sharder()
153 }
154
155 pub(crate) async fn connection_for_shard(
158 &self,
159 shard: Shard,
160 ) -> Result<Arc<Connection>, ConnectionPoolError> {
161 self.get_pool()?.connection_for_shard(shard)
162 }
163
164 #[allow(unused)]
170 pub(crate) fn is_down(&self) -> bool {
171 self.down_marker.load(Ordering::Relaxed)
172 }
173
174 pub fn is_connected(&self) -> bool {
177 let Ok(pool) = self.get_pool() else {
178 return false;
179 };
180 pool.is_connected()
181 }
182
183 pub fn is_enabled(&self) -> bool {
187 self.pool.is_some()
188 }
189
190 pub(crate) fn change_down_marker(&self, is_down: bool) {
191 self.down_marker.store(is_down, Ordering::Relaxed);
192 }
193
194 pub(crate) async fn use_keyspace(
195 &self,
196 keyspace_name: VerifiedKeyspaceName,
197 ) -> Result<(), UseKeyspaceError> {
198 if let Some(pool) = &self.pool {
199 pool.use_keyspace(keyspace_name).await?;
200 }
201 Ok(())
202 }
203
204 pub(crate) fn get_working_connections(
205 &self,
206 ) -> Result<Vec<Arc<Connection>>, ConnectionPoolError> {
207 self.get_pool()?.get_working_connections()
208 }
209
210 pub(crate) async fn wait_until_pool_initialized(&self) {
211 if let Some(pool) = &self.pool {
212 pool.wait_until_initialized().await;
213 }
214 }
215
216 fn get_pool(&self) -> Result<&NodeConnectionPool, ConnectionPoolError> {
217 self.pool
218 .as_ref()
219 .ok_or(ConnectionPoolError::NodeDisabledByHostFilter)
220 }
221}
222
223impl PartialEq for Node {
224 fn eq(&self, other: &Self) -> bool {
225 self.host_id == other.host_id
226 }
227}
228
229impl Eq for Node {}
230
231impl Hash for Node {
232 fn hash<H: Hasher>(&self, state: &mut H) {
233 self.host_id.hash(state);
234 }
235}
236
237#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
241#[non_exhaustive]
242pub enum KnownNode {
243 Hostname(String),
244 Address(SocketAddr),
245}
246
247#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
252pub(crate) enum InternalKnownNode {
253 Hostname(String),
254 Address(SocketAddr),
255 #[cfg(feature = "unstable-cloud")]
256 CloudEndpoint(CloudEndpoint),
257}
258
259impl From<KnownNode> for InternalKnownNode {
260 fn from(value: KnownNode) -> Self {
261 match value {
262 KnownNode::Hostname(s) => InternalKnownNode::Hostname(s),
263 KnownNode::Address(s) => InternalKnownNode::Address(s),
264 }
265 }
266}
267
268#[cfg(feature = "unstable-cloud")]
270#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
271pub(crate) struct CloudEndpoint {
272 pub(crate) hostname: String,
273 pub(crate) datacenter: String,
274}
275
276#[derive(Debug, Clone)]
278pub(crate) struct ResolvedContactPoint {
279 pub(crate) address: SocketAddr,
280 #[cfg_attr(not(feature = "unstable-cloud"), allow(unused))]
281 pub(crate) datacenter: Option<String>,
282}
283
284pub(crate) async fn resolve_hostname(hostname: &str) -> Result<SocketAddr, io::Error> {
288 let addrs = match lookup_host(hostname).await {
289 Ok(addrs) => itertools::Either::Left(addrs),
290 Err(e) => {
292 let addrs = lookup_host((hostname, 9042)).await.or(Err(e))?;
293 itertools::Either::Right(addrs)
294 }
295 };
296
297 addrs
298 .find_or_last(|addr| matches!(addr, SocketAddr::V4(_)))
299 .ok_or_else(|| {
300 io::Error::new(
301 io::ErrorKind::Other,
302 format!("Empty address list returned by DNS for {}", hostname),
303 )
304 })
305}
306
307pub(crate) async fn resolve_contact_points(
312 known_nodes: &[InternalKnownNode],
313) -> (Vec<ResolvedContactPoint>, Vec<String>) {
314 let mut initial_peers: Vec<ResolvedContactPoint> = Vec::with_capacity(known_nodes.len());
316
317 let mut to_resolve: Vec<(&String, Option<String>)> = Vec::new();
318 let mut hostnames: Vec<String> = Vec::new();
319
320 for node in known_nodes.iter() {
321 match node {
322 InternalKnownNode::Hostname(hostname) => {
323 to_resolve.push((hostname, None));
324 hostnames.push(hostname.clone());
325 }
326 InternalKnownNode::Address(address) => initial_peers.push(ResolvedContactPoint {
327 address: *address,
328 datacenter: None,
329 }),
330 #[cfg(feature = "unstable-cloud")]
331 InternalKnownNode::CloudEndpoint(CloudEndpoint {
332 hostname,
333 datacenter,
334 }) => to_resolve.push((hostname, Some(datacenter.clone()))),
335 };
336 }
337 let resolve_futures = to_resolve
338 .into_iter()
339 .map(|(hostname, datacenter)| async move {
340 match resolve_hostname(hostname).await {
341 Ok(address) => Some(ResolvedContactPoint {
342 address,
343 datacenter,
344 }),
345 Err(e) => {
346 warn!("Hostname resolution failed for {}: {}", hostname, &e);
347 None
348 }
349 }
350 });
351 let resolved: Vec<_> = futures::future::join_all(resolve_futures).await;
352 initial_peers.extend(resolved.into_iter().flatten());
353
354 (initial_peers, hostnames)
355}
356
357#[cfg(test)]
358mod tests {
359 use super::*;
360
361 impl Node {
362 pub(crate) fn new_for_test(
363 id: Option<Uuid>,
364 address: Option<NodeAddr>,
365 datacenter: Option<String>,
366 rack: Option<String>,
367 ) -> Self {
368 Self {
369 host_id: id.unwrap_or(Uuid::new_v4()),
370 address: address.unwrap_or(NodeAddr::Translatable(SocketAddr::from((
371 [255, 255, 255, 255],
372 0,
373 )))),
374 datacenter,
375 rack,
376 pool: None,
377 down_marker: false.into(),
378 }
379 }
380 }
381}