use itertools::Itertools;
use tokio::net::lookup_host;
use tracing::warn;
use uuid::Uuid;
use crate::routing::{Shard, Sharder};
use crate::transport::connection::Connection;
use crate::transport::connection::VerifiedKeyspaceName;
use crate::transport::connection_pool::{NodeConnectionPool, PoolConfig};
use crate::transport::errors::{ConnectionPoolError, QueryError};
use std::fmt::Display;
use std::io;
use std::net::IpAddr;
use std::{
hash::{Hash, Hasher},
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use super::topology::{PeerEndpoint, UntranslatedEndpoint};
#[non_exhaustive]
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub enum NodeAddr {
Translatable(SocketAddr),
Untranslatable(SocketAddr),
}
impl NodeAddr {
pub(crate) fn into_inner(self) -> SocketAddr {
match self {
NodeAddr::Translatable(addr) | NodeAddr::Untranslatable(addr) => addr,
}
}
pub(crate) fn inner_mut(&mut self) -> &mut SocketAddr {
match self {
NodeAddr::Translatable(addr) | NodeAddr::Untranslatable(addr) => addr,
}
}
pub fn ip(&self) -> IpAddr {
self.into_inner().ip()
}
pub fn port(&self) -> u16 {
self.into_inner().port()
}
}
impl Display for NodeAddr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.into_inner())
}
}
#[derive(Debug)]
pub struct Node {
pub host_id: Uuid,
pub address: NodeAddr,
pub datacenter: Option<String>,
pub rack: Option<String>,
pool: Option<NodeConnectionPool>,
down_marker: AtomicBool,
}
pub type NodeRef<'a> = &'a Arc<Node>;
impl Node {
pub(crate) fn new(
peer: PeerEndpoint,
pool_config: PoolConfig,
keyspace_name: Option<VerifiedKeyspaceName>,
enabled: bool,
) -> Self {
let host_id = peer.host_id;
let address = peer.address;
let datacenter = peer.datacenter.clone();
let rack = peer.rack.clone();
let (pool_empty_notifier, _) = tokio::sync::broadcast::channel(1);
let pool = enabled.then(|| {
NodeConnectionPool::new(
UntranslatedEndpoint::Peer(peer),
pool_config,
keyspace_name,
pool_empty_notifier,
)
});
Node {
host_id,
address,
datacenter,
rack,
pool,
down_marker: false.into(),
}
}
pub(crate) fn inherit_with_ip_changed(node: &Node, endpoint: PeerEndpoint) -> Self {
let address = endpoint.address;
if let Some(ref pool) = node.pool {
pool.update_endpoint(endpoint);
}
Self {
address,
down_marker: false.into(),
datacenter: node.datacenter.clone(),
rack: node.rack.clone(),
host_id: node.host_id,
pool: node.pool.clone(),
}
}
pub fn sharder(&self) -> Option<Sharder> {
self.pool.as_ref()?.sharder()
}
pub(crate) async fn connection_for_shard(
&self,
shard: Shard,
) -> Result<Arc<Connection>, ConnectionPoolError> {
self.get_pool()?.connection_for_shard(shard)
}
pub fn is_down(&self) -> bool {
self.down_marker.load(Ordering::Relaxed)
}
pub fn is_enabled(&self) -> bool {
self.pool.is_some()
}
pub(crate) fn change_down_marker(&self, is_down: bool) {
self.down_marker.store(is_down, Ordering::Relaxed);
}
pub(crate) async fn use_keyspace(
&self,
keyspace_name: VerifiedKeyspaceName,
) -> Result<(), QueryError> {
if let Some(pool) = &self.pool {
pool.use_keyspace(keyspace_name).await?;
}
Ok(())
}
pub(crate) fn get_working_connections(
&self,
) -> Result<Vec<Arc<Connection>>, ConnectionPoolError> {
self.get_pool()?.get_working_connections()
}
pub(crate) async fn wait_until_pool_initialized(&self) {
if let Some(pool) = &self.pool {
pool.wait_until_initialized().await;
}
}
fn get_pool(&self) -> Result<&NodeConnectionPool, ConnectionPoolError> {
self.pool
.as_ref()
.ok_or(ConnectionPoolError::NodeDisabledByHostFilter)
}
}
impl PartialEq for Node {
fn eq(&self, other: &Self) -> bool {
self.host_id == other.host_id
}
}
impl Eq for Node {}
impl Hash for Node {
fn hash<H: Hasher>(&self, state: &mut H) {
self.host_id.hash(state);
}
}
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
#[non_exhaustive]
pub enum KnownNode {
Hostname(String),
Address(SocketAddr),
}
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
pub(crate) enum InternalKnownNode {
Hostname(String),
Address(SocketAddr),
#[cfg(feature = "cloud")]
CloudEndpoint(CloudEndpoint),
}
impl From<KnownNode> for InternalKnownNode {
fn from(value: KnownNode) -> Self {
match value {
KnownNode::Hostname(s) => InternalKnownNode::Hostname(s),
KnownNode::Address(s) => InternalKnownNode::Address(s),
}
}
}
#[cfg(feature = "cloud")]
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
pub(crate) struct CloudEndpoint {
pub(crate) hostname: String,
pub(crate) datacenter: String,
}
#[derive(Debug, Clone)]
pub(crate) struct ResolvedContactPoint {
pub(crate) address: SocketAddr,
#[cfg_attr(not(feature = "cloud"), allow(unused))]
pub(crate) datacenter: Option<String>,
}
pub(crate) async fn resolve_hostname(hostname: &str) -> Result<SocketAddr, io::Error> {
let addrs = match lookup_host(hostname).await {
Ok(addrs) => itertools::Either::Left(addrs),
Err(e) => {
let addrs = lookup_host((hostname, 9042)).await.or(Err(e))?;
itertools::Either::Right(addrs)
}
};
addrs
.find_or_last(|addr| matches!(addr, SocketAddr::V4(_)))
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
format!("Empty address list returned by DNS for {}", hostname),
)
})
}
pub(crate) async fn resolve_contact_points(
known_nodes: &[InternalKnownNode],
) -> (Vec<ResolvedContactPoint>, Vec<String>) {
let mut initial_peers: Vec<ResolvedContactPoint> = Vec::with_capacity(known_nodes.len());
let mut to_resolve: Vec<(&String, Option<String>)> = Vec::new();
let mut hostnames: Vec<String> = Vec::new();
for node in known_nodes.iter() {
match node {
InternalKnownNode::Hostname(hostname) => {
to_resolve.push((hostname, None));
hostnames.push(hostname.clone());
}
InternalKnownNode::Address(address) => initial_peers.push(ResolvedContactPoint {
address: *address,
datacenter: None,
}),
#[cfg(feature = "cloud")]
InternalKnownNode::CloudEndpoint(CloudEndpoint {
hostname,
datacenter,
}) => to_resolve.push((hostname, Some(datacenter.clone()))),
};
}
let resolve_futures = to_resolve
.into_iter()
.map(|(hostname, datacenter)| async move {
match resolve_hostname(hostname).await {
Ok(address) => Some(ResolvedContactPoint {
address,
datacenter,
}),
Err(e) => {
warn!("Hostname resolution failed for {}: {}", hostname, &e);
None
}
}
});
let resolved: Vec<_> = futures::future::join_all(resolve_futures).await;
initial_peers.extend(resolved.into_iter().flatten());
(initial_peers, hostnames)
}
#[cfg(test)]
mod tests {
use super::*;
impl Node {
pub(crate) fn new_for_test(
id: Option<Uuid>,
address: Option<NodeAddr>,
datacenter: Option<String>,
rack: Option<String>,
) -> Self {
Self {
host_id: id.unwrap_or(Uuid::new_v4()),
address: address.unwrap_or(NodeAddr::Translatable(SocketAddr::from((
[255, 255, 255, 255],
0,
)))),
datacenter,
rack,
pool: None,
down_marker: false.into(),
}
}
}
}