scylla/cluster/
node.rs

1use itertools::Itertools;
2use tokio::net::lookup_host;
3use tracing::warn;
4use uuid::Uuid;
5
6use crate::errors::{ConnectionPoolError, UseKeyspaceError};
7use crate::network::Connection;
8use crate::network::VerifiedKeyspaceName;
9use crate::network::{NodeConnectionPool, PoolConfig};
10#[cfg(feature = "metrics")]
11use crate::observability::metrics::Metrics;
12/// Node represents a cluster node along with it's data and connections
13use crate::routing::{Shard, Sharder};
14
15use std::fmt::Display;
16use std::io;
17use std::net::IpAddr;
18use std::{
19    hash::{Hash, Hasher},
20    net::SocketAddr,
21    sync::{
22        atomic::{AtomicBool, Ordering},
23        Arc,
24    },
25};
26
27use crate::cluster::metadata::{PeerEndpoint, UntranslatedEndpoint};
28
29/// This enum is introduced to support address translation only upon opening a connection,
30/// as well as to cope with a bug present in older Cassandra and Scylla releases.
31/// The bug involves misconfiguration of rpc_address and/or broadcast_rpc_address
32/// in system.local to 0.0.0.0. Mitigation involves replacing the faulty address
33/// with connection's address, but then that address must not be subject to `AddressTranslator`,
34/// so we carry that information using this enum. Address translation is never performed
35/// on `Untranslatable` variant.
36#[non_exhaustive]
37#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
38pub enum NodeAddr {
39    /// Fetched in Metadata with `query_peers()` (broadcast by a node itself).
40    Translatable(SocketAddr),
41    /// Built from control connection's address upon `query_peers()` in order to mitigate the bug described above.
42    Untranslatable(SocketAddr),
43}
44
45impl NodeAddr {
46    pub(crate) fn into_inner(self) -> SocketAddr {
47        match self {
48            NodeAddr::Translatable(addr) | NodeAddr::Untranslatable(addr) => addr,
49        }
50    }
51    pub(crate) fn inner_mut(&mut self) -> &mut SocketAddr {
52        match self {
53            NodeAddr::Translatable(addr) | NodeAddr::Untranslatable(addr) => addr,
54        }
55    }
56    pub fn ip(&self) -> IpAddr {
57        self.into_inner().ip()
58    }
59    pub fn port(&self) -> u16 {
60        self.into_inner().port()
61    }
62}
63
64impl Display for NodeAddr {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        write!(f, "{}", self.into_inner())
67    }
68}
69
70/// Node represents a cluster node along with its data and connections
71///
72/// Note: if a Node changes its broadcast address, then it is not longer
73/// represented by the same instance of Node struct, but instead
74/// a new instance is created (for implementation reasons).
75#[derive(Debug)]
76pub struct Node {
77    pub host_id: Uuid,
78    pub address: NodeAddr,
79    pub datacenter: Option<String>,
80    pub rack: Option<String>,
81
82    // If the node is filtered out by the host filter, this will be None
83    pool: Option<NodeConnectionPool>,
84
85    down_marker: AtomicBool,
86}
87
88/// A way that Nodes are often passed and accessed in the driver's code.
89pub type NodeRef<'a> = &'a Arc<Node>;
90
91impl Node {
92    /// Creates a new node which starts connecting in the background.
93    pub(crate) fn new(
94        peer: PeerEndpoint,
95        pool_config: &PoolConfig,
96        keyspace_name: Option<VerifiedKeyspaceName>,
97        enabled: bool,
98        #[cfg(feature = "metrics")] metrics: Arc<Metrics>,
99    ) -> Self {
100        let host_id = peer.host_id;
101        let address = peer.address;
102        let datacenter = peer.datacenter.clone();
103        let rack = peer.rack.clone();
104
105        // We aren't interested in the fact that the pool becomes empty, so we immediately drop the receiving part.
106        let (pool_empty_notifier, _) = tokio::sync::broadcast::channel(1);
107        let pool = enabled.then(|| {
108            NodeConnectionPool::new(
109                UntranslatedEndpoint::Peer(peer),
110                pool_config,
111                keyspace_name,
112                pool_empty_notifier,
113                #[cfg(feature = "metrics")]
114                metrics,
115            )
116        });
117
118        Node {
119            host_id,
120            address,
121            datacenter,
122            rack,
123            pool,
124            down_marker: false.into(),
125        }
126    }
127
128    /// Recreates a Node after it changes its IP, preserving the pool.
129    ///
130    /// All settings except address are inherited from `node`.
131    /// The underlying pool is preserved and notified about the IP change.
132    /// # Arguments
133    ///
134    /// - `node` - previous definition of that node
135    /// - `address` - new address to connect to
136    pub(crate) fn inherit_with_ip_changed(node: &Node, endpoint: PeerEndpoint) -> Self {
137        let address = endpoint.address;
138        if let Some(ref pool) = node.pool {
139            pool.update_endpoint(endpoint);
140        }
141        Self {
142            address,
143            down_marker: false.into(),
144            datacenter: node.datacenter.clone(),
145            rack: node.rack.clone(),
146            host_id: node.host_id,
147            pool: node.pool.clone(),
148        }
149    }
150
151    pub fn sharder(&self) -> Option<Sharder> {
152        self.pool.as_ref()?.sharder()
153    }
154
155    /// Get a connection targetting the given shard
156    /// If such connection is broken, get any random connection to this `Node`
157    pub(crate) async fn connection_for_shard(
158        &self,
159        shard: Shard,
160    ) -> Result<Arc<Connection>, ConnectionPoolError> {
161        self.get_pool()?.connection_for_shard(shard)
162    }
163
164    /// Is the node down according to CQL events?
165    /// This status is unreliable and should not be used.
166    /// See [Node::is_connected] for a better way of checking node availability.
167    // TODO: When control connection is broken, we should mark
168    // all nodes as being up.
169    #[allow(unused)]
170    pub(crate) fn is_down(&self) -> bool {
171        self.down_marker.load(Ordering::Relaxed)
172    }
173
174    /// Returns true if the driver has any open connections in the pool for this
175    /// node.
176    pub fn is_connected(&self) -> bool {
177        let Ok(pool) = self.get_pool() else {
178            return false;
179        };
180        pool.is_connected()
181    }
182
183    /// Returns a boolean which indicates whether this node was is enabled.
184    /// Only enabled nodes will have connections open. For disabled nodes,
185    /// no connections will be opened.
186    pub fn is_enabled(&self) -> bool {
187        self.pool.is_some()
188    }
189
190    pub(crate) fn change_down_marker(&self, is_down: bool) {
191        self.down_marker.store(is_down, Ordering::Relaxed);
192    }
193
194    pub(crate) async fn use_keyspace(
195        &self,
196        keyspace_name: VerifiedKeyspaceName,
197    ) -> Result<(), UseKeyspaceError> {
198        if let Some(pool) = &self.pool {
199            pool.use_keyspace(keyspace_name).await?;
200        }
201        Ok(())
202    }
203
204    pub(crate) fn get_working_connections(
205        &self,
206    ) -> Result<Vec<Arc<Connection>>, ConnectionPoolError> {
207        self.get_pool()?.get_working_connections()
208    }
209
210    pub(crate) async fn wait_until_pool_initialized(&self) {
211        if let Some(pool) = &self.pool {
212            pool.wait_until_initialized().await;
213        }
214    }
215
216    fn get_pool(&self) -> Result<&NodeConnectionPool, ConnectionPoolError> {
217        self.pool
218            .as_ref()
219            .ok_or(ConnectionPoolError::NodeDisabledByHostFilter)
220    }
221}
222
223impl PartialEq for Node {
224    fn eq(&self, other: &Self) -> bool {
225        self.host_id == other.host_id
226    }
227}
228
229impl Eq for Node {}
230
231impl Hash for Node {
232    fn hash<H: Hasher>(&self, state: &mut H) {
233        self.host_id.hash(state);
234    }
235}
236
237/// Describes a database server known on `Session` startup.
238///
239/// The name derives from SessionBuilder's `known_node()` family of methods.
240#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
241#[non_exhaustive]
242pub enum KnownNode {
243    Hostname(String),
244    Address(SocketAddr),
245}
246
247/// Describes a database server known on `Session` startup.
248/// It is similar to [KnownNode] but includes also `CloudEndpoint` variant,
249/// which is created by the driver if session is configured to connect to
250/// serverless cluster.
251#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
252pub(crate) enum InternalKnownNode {
253    Hostname(String),
254    Address(SocketAddr),
255    #[cfg(feature = "unstable-cloud")]
256    CloudEndpoint(CloudEndpoint),
257}
258
259impl From<KnownNode> for InternalKnownNode {
260    fn from(value: KnownNode) -> Self {
261        match value {
262            KnownNode::Hostname(s) => InternalKnownNode::Hostname(s),
263            KnownNode::Address(s) => InternalKnownNode::Address(s),
264        }
265    }
266}
267
268/// Describes a database server in the serverless Scylla Cloud.
269#[cfg(feature = "unstable-cloud")]
270#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
271pub(crate) struct CloudEndpoint {
272    pub(crate) hostname: String,
273    pub(crate) datacenter: String,
274}
275
276/// Describes a database server known on Session startup, with already resolved address.
277#[derive(Debug, Clone)]
278pub(crate) struct ResolvedContactPoint {
279    pub(crate) address: SocketAddr,
280    #[cfg_attr(not(feature = "unstable-cloud"), allow(unused))]
281    pub(crate) datacenter: Option<String>,
282}
283
284// Resolve the given hostname using a DNS lookup if necessary.
285// The resolution may return multiple IPs and the function returns one of them.
286// It prefers to return IPv4s first, and only if there are none, IPv6s.
287pub(crate) async fn resolve_hostname(hostname: &str) -> Result<SocketAddr, io::Error> {
288    let addrs = match lookup_host(hostname).await {
289        Ok(addrs) => itertools::Either::Left(addrs),
290        // Use a default port in case of error, but propagate the original error on failure
291        Err(e) => {
292            let addrs = lookup_host((hostname, 9042)).await.or(Err(e))?;
293            itertools::Either::Right(addrs)
294        }
295    };
296
297    addrs
298        .find_or_last(|addr| matches!(addr, SocketAddr::V4(_)))
299        .ok_or_else(|| {
300            io::Error::new(
301                io::ErrorKind::Other,
302                format!("Empty address list returned by DNS for {}", hostname),
303            )
304        })
305}
306
307/// Transforms the given [`InternalKnownNode`]s into [`ContactPoint`]s.
308///
309/// In case of a hostname, resolves it using a DNS lookup.
310/// In case of a plain IP address, parses it and uses straight.
311pub(crate) async fn resolve_contact_points(
312    known_nodes: &[InternalKnownNode],
313) -> (Vec<ResolvedContactPoint>, Vec<String>) {
314    // Find IP addresses of all known nodes passed in the config
315    let mut initial_peers: Vec<ResolvedContactPoint> = Vec::with_capacity(known_nodes.len());
316
317    let mut to_resolve: Vec<(&String, Option<String>)> = Vec::new();
318    let mut hostnames: Vec<String> = Vec::new();
319
320    for node in known_nodes.iter() {
321        match node {
322            InternalKnownNode::Hostname(hostname) => {
323                to_resolve.push((hostname, None));
324                hostnames.push(hostname.clone());
325            }
326            InternalKnownNode::Address(address) => initial_peers.push(ResolvedContactPoint {
327                address: *address,
328                datacenter: None,
329            }),
330            #[cfg(feature = "unstable-cloud")]
331            InternalKnownNode::CloudEndpoint(CloudEndpoint {
332                hostname,
333                datacenter,
334            }) => to_resolve.push((hostname, Some(datacenter.clone()))),
335        };
336    }
337    let resolve_futures = to_resolve
338        .into_iter()
339        .map(|(hostname, datacenter)| async move {
340            match resolve_hostname(hostname).await {
341                Ok(address) => Some(ResolvedContactPoint {
342                    address,
343                    datacenter,
344                }),
345                Err(e) => {
346                    warn!("Hostname resolution failed for {}: {}", hostname, &e);
347                    None
348                }
349            }
350        });
351    let resolved: Vec<_> = futures::future::join_all(resolve_futures).await;
352    initial_peers.extend(resolved.into_iter().flatten());
353
354    (initial_peers, hostnames)
355}
356
357#[cfg(test)]
358mod tests {
359    use super::*;
360
361    impl Node {
362        pub(crate) fn new_for_test(
363            id: Option<Uuid>,
364            address: Option<NodeAddr>,
365            datacenter: Option<String>,
366            rack: Option<String>,
367        ) -> Self {
368            Self {
369                host_id: id.unwrap_or(Uuid::new_v4()),
370                address: address.unwrap_or(NodeAddr::Translatable(SocketAddr::from((
371                    [255, 255, 255, 255],
372                    0,
373                )))),
374                datacenter,
375                rack,
376                pool: None,
377                down_marker: false.into(),
378            }
379        }
380    }
381}