scylla/client/
session_builder.rs

1//! SessionBuilder provides an easy way to create new Sessions
2
3#[cfg(feature = "unstable-cloud")]
4use super::execution_profile::ExecutionProfile;
5use super::execution_profile::ExecutionProfileHandle;
6use super::session::{Session, SessionConfig};
7use super::{Compression, PoolSize, SelfIdentity, WriteCoalescingDelay};
8use crate::authentication::{AuthenticatorProvider, PlainTextAuthenticator};
9use crate::client::session::TlsContext;
10#[cfg(feature = "unstable-cloud")]
11use crate::cloud::{CloudConfig, CloudConfigError, CloudTlsProvider};
12use crate::errors::NewSessionError;
13use crate::policies::address_translator::AddressTranslator;
14use crate::policies::host_filter::HostFilter;
15use crate::policies::timestamp_generator::TimestampGenerator;
16use crate::routing::ShardAwarePortRange;
17use crate::statement::Consistency;
18use std::borrow::Borrow;
19use std::marker::PhantomData;
20use std::net::{IpAddr, SocketAddr};
21use std::num::NonZeroU32;
22#[cfg(feature = "unstable-cloud")]
23use std::path::Path;
24use std::sync::Arc;
25use std::time::Duration;
26use tracing::warn;
27
28mod sealed {
29    // This is a sealed trait - its whole purpose is to be unnameable.
30    // This means we need to disable the check.
31    #[allow(unknown_lints)] // Rust 1.66 doesn't know this lint
32    #[allow(unnameable_types)]
33    pub trait Sealed {}
34}
35pub trait SessionBuilderKind: sealed::Sealed + Clone {}
36
37#[derive(Clone)]
38pub enum DefaultMode {}
39impl sealed::Sealed for DefaultMode {}
40impl SessionBuilderKind for DefaultMode {}
41
42pub type SessionBuilder = GenericSessionBuilder<DefaultMode>;
43
44#[cfg(feature = "unstable-cloud")]
45#[derive(Clone)]
46pub enum CloudMode {}
47#[cfg(feature = "unstable-cloud")]
48impl sealed::Sealed for CloudMode {}
49#[cfg(feature = "unstable-cloud")]
50impl SessionBuilderKind for CloudMode {}
51
52#[cfg(feature = "unstable-cloud")]
53pub type CloudSessionBuilder = GenericSessionBuilder<CloudMode>;
54
55/// SessionBuilder is used to create new Session instances
56/// # Example
57///
58/// ```
59/// # use scylla::client::session::Session;
60/// # use scylla::client::session_builder::SessionBuilder;
61/// # use scylla::client::Compression;
62/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
63/// let session: Session = SessionBuilder::new()
64///     .known_node("127.0.0.1:9042")
65///     .compression(Some(Compression::Snappy))
66///     .build()
67///     .await?;
68/// # Ok(())
69/// # }
70/// ```
71#[derive(Clone)]
72pub struct GenericSessionBuilder<Kind: SessionBuilderKind> {
73    pub config: SessionConfig,
74    kind: PhantomData<Kind>,
75}
76
77// NOTE: this `impl` block contains configuration options specific for **non-Cloud** [`Session`].
78// This means that if an option fits both non-Cloud and Cloud `Session`s, it should NOT be put
79// here, but rather in `impl<K> GenericSessionBuilder<K>` block.
80impl GenericSessionBuilder<DefaultMode> {
81    /// Creates new SessionBuilder with default configuration
82    /// # Default configuration
83    /// * Compression: None
84    ///
85    pub fn new() -> Self {
86        SessionBuilder {
87            config: SessionConfig::new(),
88            kind: PhantomData,
89        }
90    }
91
92    /// Add a known node with a hostname
93    /// # Examples
94    /// ```
95    /// # use scylla::client::session::Session;
96    /// # use scylla::client::session_builder::SessionBuilder;
97    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
98    /// let session: Session = SessionBuilder::new()
99    ///     .known_node("127.0.0.1:9042")
100    ///     .build()
101    ///     .await?;
102    /// # Ok(())
103    /// # }
104    /// ```
105    ///
106    /// ```
107    /// # use scylla::client::session::Session;
108    /// # use scylla::client::session_builder::SessionBuilder;
109    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
110    /// let session: Session = SessionBuilder::new()
111    ///     .known_node("db1.example.com")
112    ///     .build()
113    ///     .await?;
114    /// # Ok(())
115    /// # }
116    /// ```
117    pub fn known_node(mut self, hostname: impl AsRef<str>) -> Self {
118        self.config.add_known_node(hostname);
119        self
120    }
121
122    /// Add a known node with an IP address
123    /// # Example
124    /// ```
125    /// # use scylla::client::session::Session;
126    /// # use scylla::client::session_builder::SessionBuilder;
127    /// # use std::net::{SocketAddr, IpAddr, Ipv4Addr};
128    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
129    /// let session: Session = SessionBuilder::new()
130    ///     .known_node_addr(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9042))
131    ///     .build()
132    ///     .await?;
133    /// # Ok(())
134    /// # }
135    /// ```
136    pub fn known_node_addr(mut self, node_addr: SocketAddr) -> Self {
137        self.config.add_known_node_addr(node_addr);
138        self
139    }
140
141    /// Add a list of known nodes with hostnames
142    /// # Example
143    /// ```
144    /// # use scylla::client::session::Session;
145    /// # use scylla::client::session_builder::SessionBuilder;
146    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
147    /// let session: Session = SessionBuilder::new()
148    ///     .known_nodes(["127.0.0.1:9042", "db1.example.com"])
149    ///     .build()
150    ///     .await?;
151    /// # Ok(())
152    /// # }
153    /// ```
154    pub fn known_nodes(mut self, hostnames: impl IntoIterator<Item = impl AsRef<str>>) -> Self {
155        self.config.add_known_nodes(hostnames);
156        self
157    }
158
159    /// Add a list of known nodes with IP addresses
160    /// # Example
161    /// ```
162    /// # use scylla::client::session::Session;
163    /// # use scylla::client::session_builder::SessionBuilder;
164    /// # use std::net::{SocketAddr, IpAddr, Ipv4Addr};
165    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
166    /// let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 17, 0, 3)), 9042);
167    /// let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 17, 0, 4)), 9042);
168    ///
169    /// let session: Session = SessionBuilder::new()
170    ///     .known_nodes_addr([addr1, addr2])
171    ///     .build()
172    ///     .await?;
173    /// # Ok(())
174    /// # }
175    /// ```
176    pub fn known_nodes_addr(
177        mut self,
178        node_addrs: impl IntoIterator<Item = impl Borrow<SocketAddr>>,
179    ) -> Self {
180        self.config.add_known_nodes_addr(node_addrs);
181        self
182    }
183
184    /// Set username and password for plain text authentication.\
185    /// If the database server will require authentication\
186    ///
187    /// # Example
188    /// ```
189    /// # use scylla::client::session::Session;
190    /// # use scylla::client::session_builder::SessionBuilder;
191    /// # use scylla::client::Compression;
192    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
193    /// let session: Session = SessionBuilder::new()
194    ///     .known_node("127.0.0.1:9042")
195    ///     .use_keyspace("my_keyspace_name", false)
196    ///     .user("cassandra", "cassandra")
197    ///     .build()
198    ///     .await?;
199    /// # Ok(())
200    /// # }
201    /// ```
202    pub fn user(mut self, username: impl Into<String>, passwd: impl Into<String>) -> Self {
203        self.config.authenticator = Some(Arc::new(PlainTextAuthenticator::new(
204            username.into(),
205            passwd.into(),
206        )));
207        self
208    }
209
210    /// Set custom authenticator provider to create an authenticator instance during a session creation.
211    ///
212    /// # Example
213    /// ```
214    /// # use std::sync::Arc;
215    /// use bytes::Bytes;
216    /// # use scylla::client::session::Session;
217    /// # use scylla::client::session_builder::SessionBuilder;
218    /// use async_trait::async_trait;
219    /// use scylla::authentication::{AuthenticatorProvider, AuthenticatorSession, AuthError};
220    /// # use scylla::client::Compression;
221    ///
222    /// struct CustomAuthenticator;
223    ///
224    /// #[async_trait]
225    /// impl AuthenticatorSession for CustomAuthenticator {
226    ///     async fn evaluate_challenge(&mut self, token: Option<&[u8]>) -> Result<Option<Vec<u8>>, AuthError> {
227    ///         Ok(None)
228    ///     }
229    ///
230    ///     async fn success(&mut self, token: Option<&[u8]>) -> Result<(), AuthError> {
231    ///         Ok(())
232    ///     }
233    /// }
234    ///
235    /// struct CustomAuthenticatorProvider;
236    ///
237    /// #[async_trait]
238    /// impl AuthenticatorProvider for CustomAuthenticatorProvider {
239    ///     async fn start_authentication_session(&self, _authenticator_name: &str) -> Result<(Option<Vec<u8>>, Box<dyn AuthenticatorSession>), AuthError> {
240    ///         Ok((None, Box::new(CustomAuthenticator)))
241    ///     }
242    /// }
243    ///
244    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
245    /// let session: Session = SessionBuilder::new()
246    ///     .known_node("127.0.0.1:9042")
247    ///     .use_keyspace("my_keyspace_name", false)
248    ///     .user("cassandra", "cassandra")
249    ///     .authenticator_provider(Arc::new(CustomAuthenticatorProvider))
250    ///     .build()
251    ///     .await?;
252    /// # Ok(())
253    /// # }
254    /// ```
255    pub fn authenticator_provider(
256        mut self,
257        authenticator_provider: Arc<dyn AuthenticatorProvider>,
258    ) -> Self {
259        self.config.authenticator = Some(authenticator_provider);
260        self
261    }
262
263    /// Uses a custom address translator for peer addresses retrieved from the cluster.
264    /// By default, no translation is performed.
265    ///
266    /// # Example
267    /// ```
268    /// # use async_trait::async_trait;
269    /// # use std::net::SocketAddr;
270    /// # use std::sync::Arc;
271    /// # use scylla::client::session::Session;
272    /// # use scylla::client::session_builder::SessionBuilder;
273    /// # use scylla::errors::TranslationError;
274    /// # use scylla::policies::address_translator::{AddressTranslator, UntranslatedPeer};
275    /// struct IdentityTranslator;
276    ///
277    /// #[async_trait]
278    /// impl AddressTranslator for IdentityTranslator {
279    ///     async fn translate_address(
280    ///         &self,
281    ///         untranslated_peer: &UntranslatedPeer,
282    ///     ) -> Result<SocketAddr, TranslationError> {
283    ///         Ok(untranslated_peer.untranslated_address())
284    ///     }
285    /// }
286    ///
287    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
288    /// let session: Session = SessionBuilder::new()
289    ///     .known_node("127.0.0.1:9042")
290    ///     .address_translator(Arc::new(IdentityTranslator))
291    ///     .build()
292    ///     .await?;
293    /// # Ok(())
294    /// # }
295    /// ```
296    /// # Example
297    /// ```
298    /// # use std::net::SocketAddr;
299    /// # use std::sync::Arc;
300    /// # use std::collections::HashMap;
301    /// # use std::str::FromStr;
302    /// # use scylla::client::session::Session;
303    /// # use scylla::client::session_builder::SessionBuilder;
304    /// # use scylla::errors::TranslationError;
305    /// # use scylla::policies::address_translator::AddressTranslator;
306    /// #
307    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
308    /// let mut translation_rules = HashMap::new();
309    /// let addr_before_translation = SocketAddr::from_str("192.168.0.42:19042").unwrap();
310    /// let addr_after_translation = SocketAddr::from_str("157.123.12.42:23203").unwrap();
311    /// translation_rules.insert(addr_before_translation, addr_after_translation);
312    /// let session: Session = SessionBuilder::new()
313    ///     .known_node("127.0.0.1:9042")
314    ///     .address_translator(Arc::new(translation_rules))
315    ///     .build()
316    ///     .await?;
317    /// #    Ok(())
318    /// # }
319    /// ```
320    pub fn address_translator(mut self, translator: Arc<dyn AddressTranslator>) -> Self {
321        self.config.address_translator = Some(translator);
322        self
323    }
324
325    /// TLS feature
326    ///
327    /// Provide SessionBuilder with TlsContext that will be
328    /// used to create a TLS connection to the database.
329    /// If set to None TLS connection won't be used.
330    ///
331    /// Default is None.
332    ///
333    /// # Example
334    /// ```
335    /// # use std::fs;
336    /// # use std::path::PathBuf;
337    /// # use scylla::client::session::Session;
338    /// # use scylla::client::session_builder::SessionBuilder;
339    /// # use openssl::ssl::{SslContextBuilder, SslVerifyMode, SslMethod, SslFiletype};
340    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
341    /// let certdir = fs::canonicalize(PathBuf::from("./examples/certs/scylla.crt"))?;
342    /// let mut context_builder = SslContextBuilder::new(SslMethod::tls())?;
343    /// context_builder.set_certificate_file(certdir.as_path(), SslFiletype::PEM)?;
344    /// context_builder.set_verify(SslVerifyMode::NONE);
345    ///
346    /// let session: Session = SessionBuilder::new()
347    ///     .known_node("127.0.0.1:9042")
348    ///     .tls_context(Some(context_builder.build()))
349    ///     .build()
350    ///     .await?;
351    /// # Ok(())
352    /// # }
353    /// ```
354    pub fn tls_context(mut self, tls_context: Option<impl Into<TlsContext>>) -> Self {
355        self.config.tls_context = tls_context.map(|t| t.into());
356        self
357    }
358}
359
360// NOTE: this `impl` block contains configuration options specific for **Cloud** [`Session`].
361// This means that if an option fits both non-Cloud and Cloud `Session`s, it should NOT be put
362// here, but rather in `impl<K> GenericSessionBuilder<K>` block.
363#[cfg(feature = "unstable-cloud")]
364impl CloudSessionBuilder {
365    /// Creates a new SessionBuilder with default configuration, based
366    /// on the provided [`CloudConfig`].
367    pub fn from_config(cloud_config: CloudConfig) -> Self {
368        let mut config = SessionConfig::new();
369        let mut exec_profile_builder = ExecutionProfile::builder();
370        if let Some(default_consistency) = cloud_config.get_default_consistency() {
371            exec_profile_builder = exec_profile_builder.consistency(default_consistency);
372        }
373        if let Some(default_serial_consistency) = cloud_config.get_default_serial_consistency() {
374            exec_profile_builder =
375                exec_profile_builder.serial_consistency(Some(default_serial_consistency));
376        }
377        config.default_execution_profile_handle = exec_profile_builder.build().into_handle();
378        config.cloud_config = Some(Arc::new(cloud_config));
379        CloudSessionBuilder {
380            config,
381            kind: PhantomData,
382        }
383    }
384
385    /// Creates a new SessionBuilder with default configuration,
386    /// based on provided path to Scylla Cloud Config yaml.
387    pub fn new(
388        cloud_config: impl AsRef<Path>,
389        tls_provider: CloudTlsProvider,
390    ) -> Result<Self, CloudConfigError> {
391        let cloud_config = CloudConfig::read_from_yaml(cloud_config, tls_provider)?;
392        Ok(Self::from_config(cloud_config))
393    }
394}
395
396// This block contains configuration options that make sense both for Cloud and non-Cloud
397// `Session`s. If an option fit only one of them, it should be put in a specialised block.
398impl<K: SessionBuilderKind> GenericSessionBuilder<K> {
399    /// Sets the local ip address all TCP sockets are bound to.
400    ///
401    /// By default, this option is set to `None`, which is equivalent to:
402    /// - `INADDR_ANY` for IPv4 ([`Ipv4Addr::UNSPECIFIED`][std::net::Ipv4Addr::UNSPECIFIED])
403    /// - `in6addr_any` for IPv6 ([`Ipv6Addr::UNSPECIFIED`][std::net::Ipv6Addr::UNSPECIFIED])
404    ///
405    /// # Example
406    /// ```
407    /// # use scylla::client::session::Session;
408    /// # use scylla::client::session_builder::SessionBuilder;
409    /// # use std::net::Ipv4Addr;
410    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
411    /// let session: Session = SessionBuilder::new()
412    ///     .known_node("127.0.0.1:9042")
413    ///     .local_ip_address(Some(Ipv4Addr::new(192, 168, 0, 1)))
414    ///     .build()
415    ///     .await?;
416    /// # Ok(())
417    /// # }
418    /// ```
419    pub fn local_ip_address(mut self, local_ip_address: Option<impl Into<IpAddr>>) -> Self {
420        self.config.local_ip_address = local_ip_address.map(Into::into);
421        self
422    }
423
424    /// Specifies the local port range used for shard-aware connections.
425    ///
426    /// A possible use case is when you want to have multiple [`Session`] objects and do not want
427    /// them to compete for the ports within the same range. It is then advised to assign
428    /// mutually non-overlapping port ranges to each session object.
429    ///
430    /// By default this option is set to [`ShardAwarePortRange::EPHEMERAL_PORT_RANGE`].
431    ///
432    /// For details, see [`ShardAwarePortRange`] documentation.
433    ///
434    /// # Example
435    /// ```
436    /// # use scylla::client::session::Session;
437    /// # use scylla::client::session_builder::SessionBuilder;
438    /// # use scylla::routing::ShardAwarePortRange;
439    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
440    /// let session: Session = SessionBuilder::new()
441    ///     .known_node("127.0.0.1:9042")
442    ///     .shard_aware_local_port_range(ShardAwarePortRange::new(49200..=50000)?)
443    ///     .build()
444    ///     .await?;
445    /// # Ok(())
446    /// # }
447    /// ```
448    pub fn shard_aware_local_port_range(mut self, port_range: ShardAwarePortRange) -> Self {
449        self.config.shard_aware_local_port_range = port_range;
450        self
451    }
452
453    /// Set preferred Compression algorithm.
454    /// The default is no compression.
455    /// If it is not supported by database server Session will fall back to no encryption.
456    ///
457    /// # Example
458    /// ```
459    /// # use scylla::client::session::Session;
460    /// # use scylla::client::session_builder::SessionBuilder;
461    /// # use scylla::client::Compression;
462    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
463    /// let session: Session = SessionBuilder::new()
464    ///     .known_node("127.0.0.1:9042")
465    ///     .compression(Some(Compression::Snappy))
466    ///     .build()
467    ///     .await?;
468    /// # Ok(())
469    /// # }
470    /// ```
471    pub fn compression(mut self, compression: Option<Compression>) -> Self {
472        self.config.compression = compression;
473        self
474    }
475
476    /// Set the delay for schema agreement check. How often driver should ask if schema is in agreement
477    /// The default is 200 milliseconds.
478    ///
479    /// # Example
480    /// ```
481    /// # use scylla::client::session::Session;
482    /// # use scylla::client::session_builder::SessionBuilder;
483    /// # use std::time::Duration;
484    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
485    /// let session: Session = SessionBuilder::new()
486    ///     .known_node("127.0.0.1:9042")
487    ///     .schema_agreement_interval(Duration::from_secs(5))
488    ///     .build()
489    ///     .await?;
490    /// # Ok(())
491    /// # }
492    /// ```
493    pub fn schema_agreement_interval(mut self, timeout: Duration) -> Self {
494        self.config.schema_agreement_interval = timeout;
495        self
496    }
497
498    /// Set the default execution profile using its handle
499    ///
500    /// # Example
501    /// ```
502    /// # use scylla::statement::Consistency;
503    /// # use scylla::client::execution_profile::ExecutionProfile;
504    /// # use scylla::client::session::Session;
505    /// # use scylla::client::session_builder::SessionBuilder;
506    /// # use std::time::Duration;
507    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
508    /// let execution_profile = ExecutionProfile::builder()
509    ///     .consistency(Consistency::All)
510    ///     .request_timeout(Some(Duration::from_secs(2)))
511    ///     .build();
512    /// let session: Session = SessionBuilder::new()
513    ///     .known_node("127.0.0.1:9042")
514    ///     .default_execution_profile_handle(execution_profile.into_handle())
515    ///     .build()
516    ///     .await?;
517    /// # Ok(())
518    /// # }
519    /// ```
520    pub fn default_execution_profile_handle(
521        mut self,
522        profile_handle: ExecutionProfileHandle,
523    ) -> Self {
524        self.config.default_execution_profile_handle = profile_handle;
525        self
526    }
527
528    /// Set the nodelay TCP flag.
529    /// The default is true.
530    ///
531    /// # Example
532    /// ```
533    /// # use scylla::client::session::Session;
534    /// # use scylla::client::session_builder::SessionBuilder;
535    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
536    /// let session: Session = SessionBuilder::new()
537    ///     .known_node("127.0.0.1:9042")
538    ///     .tcp_nodelay(true)
539    ///     .build()
540    ///     .await?;
541    /// # Ok(())
542    /// # }
543    /// ```
544    pub fn tcp_nodelay(mut self, nodelay: bool) -> Self {
545        self.config.tcp_nodelay = nodelay;
546        self
547    }
548
549    /// Set the TCP keepalive interval.
550    /// The default is `None`, which implies that no keepalive messages
551    /// are sent **on TCP layer** when a connection is idle.
552    /// Note: CQL-layer keepalives are configured separately,
553    /// with `Self::keepalive_interval`.
554    ///
555    /// # Example
556    /// ```
557    /// # use scylla::client::session::Session;
558    /// # use scylla::client::session_builder::SessionBuilder;
559    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
560    /// let session: Session = SessionBuilder::new()
561    ///     .known_node("127.0.0.1:9042")
562    ///     .tcp_keepalive_interval(std::time::Duration::from_secs(42))
563    ///     .build()
564    ///     .await?;
565    /// # Ok(())
566    /// # }
567    /// ```
568    pub fn tcp_keepalive_interval(mut self, interval: Duration) -> Self {
569        if interval <= Duration::from_secs(1) {
570            warn!(
571                "Setting the TCP keepalive interval to low values ({:?}) is not recommended as it can have a negative impact on performance. Consider setting it above 1 second.",
572                interval
573            );
574        }
575
576        self.config.tcp_keepalive_interval = Some(interval);
577        self
578    }
579
580    /// Set keyspace to be used on all connections.\
581    /// Each connection will send `"USE <keyspace_name>"` before sending any requests.\
582    /// This can be later changed with [`crate::client::session::Session::use_keyspace`]
583    ///
584    /// # Example
585    /// ```
586    /// # use scylla::client::session::Session;
587    /// # use scylla::client::session_builder::SessionBuilder;
588    /// # use scylla::client::Compression;
589    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
590    /// let session: Session = SessionBuilder::new()
591    ///     .known_node("127.0.0.1:9042")
592    ///     .use_keyspace("my_keyspace_name", false)
593    ///     .build()
594    ///     .await?;
595    /// # Ok(())
596    /// # }
597    /// ```
598    pub fn use_keyspace(mut self, keyspace_name: impl Into<String>, case_sensitive: bool) -> Self {
599        self.config.used_keyspace = Some(keyspace_name.into());
600        self.config.keyspace_case_sensitive = case_sensitive;
601        self
602    }
603
604    /// Builds the Session after setting all the options.
605    ///
606    /// # Example
607    /// ```
608    /// # use scylla::client::session::Session;
609    /// # use scylla::client::session_builder::SessionBuilder;
610    /// # use scylla::client::Compression;
611    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
612    /// let session: Session = SessionBuilder::new()
613    ///     .known_node("127.0.0.1:9042")
614    ///     .compression(Some(Compression::Snappy))
615    ///     .build() // Turns SessionBuilder into Session
616    ///     .await?;
617    /// # Ok(())
618    /// # }
619    /// ```
620    pub async fn build(&self) -> Result<Session, NewSessionError> {
621        Session::connect(self.config.clone()).await
622    }
623
624    /// Changes connection timeout
625    /// The default is 5 seconds.
626    /// If it's higher than underlying os's default connection timeout it won't effect.
627    ///
628    /// # Example
629    /// ```
630    /// # use scylla::client::session::Session;
631    /// # use scylla::client::session_builder::SessionBuilder;
632    /// # use std::time::Duration;
633    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
634    /// let session: Session = SessionBuilder::new()
635    ///     .known_node("127.0.0.1:9042")
636    ///     .connection_timeout(Duration::from_secs(30))
637    ///     .build() // Turns SessionBuilder into Session
638    ///     .await?;
639    /// # Ok(())
640    /// # }
641    /// ```
642    pub fn connection_timeout(mut self, duration: Duration) -> Self {
643        self.config.connect_timeout = duration;
644        self
645    }
646
647    /// Sets the per-node connection pool size.
648    /// The default is one connection per shard, which is the recommended setting for Scylla.
649    ///
650    /// # Example
651    /// ```
652    /// # use scylla::client::session::Session;
653    /// # use scylla::client::session_builder::SessionBuilder;
654    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
655    /// use std::num::NonZeroUsize;
656    /// use scylla::client::PoolSize;
657    ///
658    /// // This session will establish 4 connections to each node.
659    /// // For Scylla clusters, this number will be divided across shards
660    /// let session: Session = SessionBuilder::new()
661    ///     .known_node("127.0.0.1:9042")
662    ///     .pool_size(PoolSize::PerHost(NonZeroUsize::new(4).unwrap()))
663    ///     .build()
664    ///     .await?;
665    /// # Ok(())
666    /// # }
667    /// ```
668    pub fn pool_size(mut self, size: PoolSize) -> Self {
669        self.config.connection_pool_size = size;
670        self
671    }
672
673    /// If true, prevents the driver from connecting to the shard-aware port, even if the node supports it.
674    ///
675    /// _This is a Scylla-specific option_. It has no effect on Cassandra clusters.
676    ///
677    /// By default, connecting to the shard-aware port is __allowed__ and, in general, this setting
678    /// _should not be changed_. The shard-aware port (19042 or 19142) makes the process of
679    /// establishing connection per shard more robust compared to the regular transport port
680    /// (9042 or 9142). With the shard-aware port, the driver is able to choose which shard
681    /// will be assigned to the connection.
682    ///
683    /// In order to be able to use the shard-aware port effectively, the port needs to be
684    /// reachable and not behind a NAT which changes source ports (the driver uses the source port
685    /// to tell Scylla which shard to assign). However, the driver is designed to behave in a robust
686    /// way if those conditions are not met - if the driver fails to connect to the port or gets
687    /// a connection to the wrong shard, it will re-attempt the connection to the regular transport port.
688    ///
689    /// The only cost of misconfigured shard-aware port should be a slightly longer reconnection time.
690    /// If it is unacceptable to you or suspect that it causes you some other problems,
691    /// you can use this option to disable the shard-aware port feature completely.
692    /// However, __you should use it as a last resort__. Before you do that, we strongly recommend
693    /// that you consider fixing the network issues.
694    ///
695    /// # Example
696    /// ```
697    /// # use scylla::client::session::Session;
698    /// # use scylla::client::session_builder::SessionBuilder;
699    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
700    /// let session: Session = SessionBuilder::new()
701    ///     .known_node("127.0.0.1:9042")
702    ///     .disallow_shard_aware_port(true)
703    ///     .build()
704    ///     .await?;
705    /// # Ok(())
706    /// # }
707    /// ```
708    pub fn disallow_shard_aware_port(mut self, disallow: bool) -> Self {
709        self.config.disallow_shard_aware_port = disallow;
710        self
711    }
712
713    /// Set the timestamp generator that will generate timestamps on the client-side.
714    ///
715    /// # Example
716    /// ```
717    /// # use scylla::client::session::Session;
718    /// # use scylla::client::session_builder::SessionBuilder;
719    /// # use scylla::policies::timestamp_generator::SimpleTimestampGenerator;
720    /// # use std::sync::Arc;
721    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
722    /// let session: Session = SessionBuilder::new()
723    ///     .known_node("127.0.0.1:9042")
724    ///     .timestamp_generator(Arc::new(SimpleTimestampGenerator::new()))
725    ///     .build()
726    ///     .await?;
727    /// # Ok(())
728    /// # }
729    /// ```
730    pub fn timestamp_generator(mut self, timestamp_generator: Arc<dyn TimestampGenerator>) -> Self {
731        self.config.timestamp_generator = Some(timestamp_generator);
732        self
733    }
734
735    /// Set the keyspaces to be fetched, to retrieve their strategy, and schema metadata if enabled
736    /// No keyspaces, the default value, means all the keyspaces will be fetched.
737    ///
738    /// # Example
739    /// ```
740    /// # use scylla::client::session::Session;
741    /// # use scylla::client::session_builder::SessionBuilder;
742    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
743    /// let session: Session = SessionBuilder::new()
744    ///     .known_node("127.0.0.1:9042")
745    ///     .keyspaces_to_fetch(["my_keyspace"])
746    ///     .build()
747    ///     .await?;
748    /// # Ok(())
749    /// # }
750    /// ```
751    pub fn keyspaces_to_fetch(
752        mut self,
753        keyspaces: impl IntoIterator<Item = impl Into<String>>,
754    ) -> Self {
755        self.config.keyspaces_to_fetch = keyspaces.into_iter().map(Into::into).collect();
756        self
757    }
758
759    /// Set the fetch schema metadata flag.
760    /// The default is true.
761    ///
762    /// # Example
763    /// ```
764    /// # use scylla::client::session::Session;
765    /// # use scylla::client::session_builder::SessionBuilder;
766    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
767    /// let session: Session = SessionBuilder::new()
768    ///     .known_node("127.0.0.1:9042")
769    ///     .fetch_schema_metadata(true)
770    ///     .build()
771    ///     .await?;
772    /// # Ok(())
773    /// # }
774    /// ```
775    pub fn fetch_schema_metadata(mut self, fetch: bool) -> Self {
776        self.config.fetch_schema_metadata = fetch;
777        self
778    }
779
780    /// Set the server-side timeout for metadata queries.
781    /// The default is `Some(Duration::from_secs(2))`. It means that
782    /// the all metadata queries will be set the 2 seconds timeout
783    /// no matter what timeout is set as a cluster default.
784    /// This prevents timeouts of schema queries when the schema is large
785    /// and the default timeout is configured as tight.
786    ///
787    /// # Example
788    /// ```
789    /// # use scylla::client::session::Session;
790    /// # use scylla::client::session_builder::SessionBuilder;
791    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
792    /// let session: Session = SessionBuilder::new()
793    ///     .known_node("127.0.0.1:9042")
794    ///     .metadata_request_serverside_timeout(std::time::Duration::from_secs(5))
795    ///     .build()
796    ///     .await?;
797    /// # Ok(())
798    /// # }
799    /// ```
800    pub fn metadata_request_serverside_timeout(mut self, timeout: Duration) -> Self {
801        self.config.metadata_request_serverside_timeout = Some(timeout);
802        self
803    }
804
805    /// Set the keepalive interval.
806    /// The default is `Some(Duration::from_secs(30))`, which corresponds
807    /// to keepalive CQL messages being sent every 30 seconds.
808    /// Note: this configures CQL-layer keepalives. See also:
809    /// `Self::tcp_keepalive_interval`.
810    ///
811    /// # Example
812    /// ```
813    /// # use scylla::client::session::Session;
814    /// # use scylla::client::session_builder::SessionBuilder;
815    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
816    /// let session: Session = SessionBuilder::new()
817    ///     .known_node("127.0.0.1:9042")
818    ///     .keepalive_interval(std::time::Duration::from_secs(42))
819    ///     .build()
820    ///     .await?;
821    /// # Ok(())
822    /// # }
823    /// ```
824    pub fn keepalive_interval(mut self, interval: Duration) -> Self {
825        if interval <= Duration::from_secs(1) {
826            warn!(
827                "Setting the keepalive interval to low values ({:?}) is not recommended as it can have a negative impact on performance. Consider setting it above 1 second.",
828                interval
829            );
830        }
831
832        self.config.keepalive_interval = Some(interval);
833        self
834    }
835
836    /// Set the keepalive timeout.
837    /// The default is `Some(Duration::from_secs(30))`. It means that
838    /// the connection will be closed if time between sending a keepalive
839    /// and receiving a response to any keepalive (not necessarily the same -
840    /// it may be one sent later) exceeds 30 seconds.
841    ///
842    /// # Example
843    /// ```
844    /// # use scylla::client::session::Session;
845    /// # use scylla::client::session_builder::SessionBuilder;
846    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
847    /// let session: Session = SessionBuilder::new()
848    ///     .known_node("127.0.0.1:9042")
849    ///     .keepalive_timeout(std::time::Duration::from_secs(42))
850    ///     .build()
851    ///     .await?;
852    /// # Ok(())
853    /// # }
854    /// ```
855    pub fn keepalive_timeout(mut self, timeout: Duration) -> Self {
856        if timeout <= Duration::from_secs(1) {
857            warn!(
858                "Setting the keepalive timeout to low values ({:?}) is not recommended as it may aggressively close connections. Consider setting it above 5 seconds.",
859                timeout
860            );
861        }
862
863        self.config.keepalive_timeout = Some(timeout);
864        self
865    }
866
867    /// Sets the timeout for waiting for schema agreement.
868    /// By default, the timeout is 60 seconds.
869    ///
870    /// # Example
871    /// ```
872    /// # use scylla::client::session::Session;
873    /// # use scylla::client::session_builder::SessionBuilder;
874    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
875    /// let session: Session = SessionBuilder::new()
876    ///     .known_node("127.0.0.1:9042")
877    ///     .schema_agreement_timeout(std::time::Duration::from_secs(120))
878    ///     .build()
879    ///     .await?;
880    /// # Ok(())
881    /// # }
882    /// ```
883    pub fn schema_agreement_timeout(mut self, timeout: Duration) -> Self {
884        self.config.schema_agreement_timeout = timeout;
885        self
886    }
887
888    /// Controls automatic waiting for schema agreement after a schema-altering
889    /// statement is sent. By default, it is enabled.
890    ///
891    /// # Example
892    /// ```
893    /// # use scylla::client::session::Session;
894    /// # use scylla::client::session_builder::SessionBuilder;
895    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
896    /// let session: Session = SessionBuilder::new()
897    ///     .known_node("127.0.0.1:9042")
898    ///     .auto_await_schema_agreement(false)
899    ///     .build()
900    ///     .await?;
901    /// # Ok(())
902    /// # }
903    /// ```
904    pub fn auto_await_schema_agreement(mut self, enabled: bool) -> Self {
905        self.config.schema_agreement_automatic_waiting = enabled;
906        self
907    }
908
909    /// Sets the host filter. The host filter decides whether any connections
910    /// should be opened to the node or not. The driver will also avoid
911    /// those nodes when re-establishing the control connection.
912    ///
913    /// See the [host filter](crate::policies::host_filter) module for a list
914    /// of pre-defined filters. It is also possible to provide a custom filter
915    /// by implementing the HostFilter trait.
916    ///
917    /// # Example
918    /// ```
919    /// # use async_trait::async_trait;
920    /// # use std::net::SocketAddr;
921    /// # use std::sync::Arc;
922    /// # use scylla::client::session::Session;
923    /// # use scylla::client::session_builder::SessionBuilder;
924    /// # use scylla::errors::TranslationError;
925    /// # use scylla::policies::address_translator::AddressTranslator;
926    /// # use scylla::policies::host_filter::DcHostFilter;
927    ///
928    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
929    /// // The session will only connect to nodes from "my-local-dc"
930    /// let session: Session = SessionBuilder::new()
931    ///     .known_node("127.0.0.1:9042")
932    ///     .host_filter(Arc::new(DcHostFilter::new("my-local-dc".to_string())))
933    ///     .build()
934    ///     .await?;
935    /// # Ok(())
936    /// # }
937    /// ```
938    pub fn host_filter(mut self, filter: Arc<dyn HostFilter>) -> Self {
939        self.config.host_filter = Some(filter);
940        self
941    }
942
943    /// Set the refresh metadata on schema agreement flag.
944    /// The default is true.
945    ///
946    /// # Example
947    /// ```
948    /// # use scylla::client::session::Session;
949    /// # use scylla::client::session_builder::SessionBuilder;
950    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
951    /// let session: Session = SessionBuilder::new()
952    ///     .known_node("127.0.0.1:9042")
953    ///     .refresh_metadata_on_auto_schema_agreement(true)
954    ///     .build()
955    ///     .await?;
956    /// # Ok(())
957    /// # }
958    /// ```
959    pub fn refresh_metadata_on_auto_schema_agreement(mut self, refresh_metadata: bool) -> Self {
960        self.config.refresh_metadata_on_auto_schema_agreement = refresh_metadata;
961        self
962    }
963
964    /// Set the number of attempts to fetch [TracingInfo](crate::observability::tracing::TracingInfo)
965    /// in [`Session::get_tracing_info`](crate::client::session::Session::get_tracing_info).
966    /// The default is 5 attempts.
967    ///
968    /// Tracing info might not be available immediately on queried node - that's why
969    /// the driver performs a few attempts with sleeps in between.
970    ///
971    /// Cassandra users may want to increase this value - the default is good
972    /// for Scylla, but Cassandra sometimes needs more time for the data to
973    /// appear in tracing table.
974    ///
975    /// # Example
976    /// ```
977    /// # use scylla::client::session::Session;
978    /// # use scylla::client::session_builder::SessionBuilder;
979    /// # use std::num::NonZeroU32;
980    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
981    /// let session: Session = SessionBuilder::new()
982    ///     .known_node("127.0.0.1:9042")
983    ///     .tracing_info_fetch_attempts(NonZeroU32::new(10).unwrap())
984    ///     .build()
985    ///     .await?;
986    /// # Ok(())
987    /// # }
988    /// ```
989    pub fn tracing_info_fetch_attempts(mut self, attempts: NonZeroU32) -> Self {
990        self.config.tracing_info_fetch_attempts = attempts;
991        self
992    }
993
994    /// Set the delay between attempts to fetch [TracingInfo](crate::observability::tracing::TracingInfo)
995    /// in [`Session::get_tracing_info`](crate::client::session::Session::get_tracing_info).
996    /// The default is 3 milliseconds.
997    ///
998    /// Tracing info might not be available immediately on queried node - that's why
999    /// the driver performs a few attempts with sleeps in between.
1000    ///
1001    /// Cassandra users may want to increase this value - the default is good
1002    /// for Scylla, but Cassandra sometimes needs more time for the data to
1003    /// appear in tracing table.
1004    ///
1005    /// # Example
1006    /// ```
1007    /// # use scylla::client::session::Session;
1008    /// # use scylla::client::session_builder::SessionBuilder;
1009    /// # use std::time::Duration;
1010    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1011    /// let session: Session = SessionBuilder::new()
1012    ///     .known_node("127.0.0.1:9042")
1013    ///     .tracing_info_fetch_interval(Duration::from_millis(50))
1014    ///     .build()
1015    ///     .await?;
1016    /// # Ok(())
1017    /// # }
1018    /// ```
1019    pub fn tracing_info_fetch_interval(mut self, interval: Duration) -> Self {
1020        self.config.tracing_info_fetch_interval = interval;
1021        self
1022    }
1023
1024    /// Set the consistency level of fetching [TracingInfo](crate::observability::tracing::TracingInfo)
1025    /// in [`Session::get_tracing_info`](crate::client::session::Session::get_tracing_info).
1026    /// The default is [`Consistency::One`].
1027    ///
1028    /// # Example
1029    /// ```
1030    /// # use scylla::statement::Consistency;
1031    /// # use scylla::client::session::Session;
1032    /// # use scylla::client::session_builder::SessionBuilder;
1033    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1034    /// let session: Session = SessionBuilder::new()
1035    ///     .known_node("127.0.0.1:9042")
1036    ///     .tracing_info_fetch_consistency(Consistency::One)
1037    ///     .build()
1038    ///     .await?;
1039    /// # Ok(())
1040    /// # }
1041    /// ```
1042    pub fn tracing_info_fetch_consistency(mut self, consistency: Consistency) -> Self {
1043        self.config.tracing_info_fetch_consistency = consistency;
1044        self
1045    }
1046
1047    /// If true, the driver will inject a delay controlled by [SessionBuilder::write_coalescing_delay()]
1048    /// before flushing data to the socket.
1049    /// This gives the driver an opportunity to collect more write requests
1050    /// and write them in a single syscall, increasing the efficiency.
1051    ///
1052    /// However, this optimization may worsen latency if the rate of requests
1053    /// issued by the application is low, but otherwise the application is
1054    /// heavily loaded with other tasks on the same tokio executor.
1055    /// Please do performance measurements before committing to disabling
1056    /// this option.
1057    ///
1058    /// This option is true by default.
1059    ///
1060    /// # Example
1061    /// ```
1062    /// # use scylla::client::session::Session;
1063    /// # use scylla::client::session_builder::SessionBuilder;
1064    /// # use scylla::client::Compression;
1065    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1066    /// let session: Session = SessionBuilder::new()
1067    ///     .known_node("127.0.0.1:9042")
1068    ///     .write_coalescing(false) // Enabled by default
1069    ///     .build()
1070    ///     .await?;
1071    /// # Ok(())
1072    /// # }
1073    /// ```
1074    pub fn write_coalescing(mut self, enable: bool) -> Self {
1075        self.config.enable_write_coalescing = enable;
1076        self
1077    }
1078
1079    /// Controls the write coalescing delay (if enabled).
1080    ///
1081    /// This option has no effect if [`SessionBuilder::write_coalescing()`] is set to false.
1082    ///
1083    /// This option is [`WriteCoalescingDelay::SmallNondeterministic`] by default.
1084    ///
1085    /// # Example
1086    /// ```
1087    /// # use scylla::client::session::Session;
1088    /// # use scylla::client::session_builder::SessionBuilder;
1089    /// # use scylla::client::WriteCoalescingDelay;
1090    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1091    /// let session: Session = SessionBuilder::new()
1092    ///     .known_node("127.0.0.1:9042")
1093    ///     .write_coalescing_delay(WriteCoalescingDelay::SmallNondeterministic)
1094    ///     .build()
1095    ///     .await?;
1096    /// # Ok(())
1097    /// # }
1098    /// ```
1099    pub fn write_coalescing_delay(mut self, delay: WriteCoalescingDelay) -> Self {
1100        self.config.write_coalescing_delay = delay;
1101        self
1102    }
1103
1104    /// Set the interval at which the driver refreshes the cluster metadata which contains information
1105    /// about the cluster topology as well as the cluster schema.
1106    ///
1107    /// The default is 60 seconds.
1108    ///
1109    /// In the given example, we have set the duration value to 20 seconds, which
1110    /// means that the metadata is refreshed every 20 seconds.
1111    /// # Example
1112    /// ```
1113    /// # use scylla::client::session::Session;
1114    /// # use scylla::client::session_builder::SessionBuilder;
1115    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1116    ///     let session: Session = SessionBuilder::new()
1117    ///         .known_node("127.0.0.1:9042")
1118    ///         .cluster_metadata_refresh_interval(std::time::Duration::from_secs(20))
1119    ///         .build()
1120    ///         .await?;
1121    /// #   Ok(())
1122    /// # }
1123    /// ```
1124    pub fn cluster_metadata_refresh_interval(mut self, interval: Duration) -> Self {
1125        self.config.cluster_metadata_refresh_interval = interval;
1126        self
1127    }
1128
1129    /// Set the custom identity of the driver/application/instance,
1130    /// to be sent as options in STARTUP message.
1131    ///
1132    /// By default driver name and version are sent;
1133    /// application name and version and client id are not sent.
1134    ///
1135    /// # Example
1136    /// ```
1137    /// # use scylla::client::session::Session;
1138    /// # use scylla::client::session_builder::SessionBuilder;
1139    /// # use scylla::client::SelfIdentity;
1140    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1141    ///     let (app_major, app_minor, app_patch) = (2, 1, 3);
1142    ///     let app_version = format!("{app_major}.{app_minor}.{app_patch}");
1143    ///
1144    ///     let session: Session = SessionBuilder::new()
1145    ///         .known_node("127.0.0.1:9042")
1146    ///         .custom_identity(
1147    ///             SelfIdentity::new()
1148    ///                 .with_custom_driver_version("0.13.0-custom_build_17")
1149    ///                 .with_application_name("my-app")
1150    ///                 .with_application_version(app_version)
1151    ///         )
1152    ///         .build()
1153    ///         .await?;
1154    /// #   Ok(())
1155    /// # }
1156    /// ```
1157    pub fn custom_identity(mut self, identity: SelfIdentity<'static>) -> Self {
1158        self.config.identity = identity;
1159        self
1160    }
1161}
1162
1163/// Creates a [`SessionBuilder`] with default configuration, same as [`SessionBuilder::new`]
1164impl Default for SessionBuilder {
1165    fn default() -> Self {
1166        SessionBuilder::new()
1167    }
1168}
1169
1170#[cfg(test)]
1171mod tests {
1172    use scylla_cql::frame::types::SerialConsistency;
1173    use scylla_cql::Consistency;
1174
1175    use super::super::Compression;
1176    use super::SessionBuilder;
1177    use crate::client::execution_profile::{defaults, ExecutionProfile};
1178    use crate::cluster::node::KnownNode;
1179    use crate::test_utils::setup_tracing;
1180    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
1181    use std::time::Duration;
1182
1183    #[test]
1184    fn default_session_builder() {
1185        setup_tracing();
1186        let builder = SessionBuilder::new();
1187
1188        assert!(builder.config.known_nodes.is_empty());
1189        assert_eq!(builder.config.compression, None);
1190    }
1191
1192    #[test]
1193    fn add_known_node() {
1194        setup_tracing();
1195        let mut builder = SessionBuilder::new();
1196
1197        builder = builder.known_node("test_hostname");
1198
1199        assert_eq!(
1200            builder.config.known_nodes,
1201            vec![KnownNode::Hostname("test_hostname".into())]
1202        );
1203        assert_eq!(builder.config.compression, None);
1204    }
1205
1206    #[test]
1207    fn add_known_node_addr() {
1208        setup_tracing();
1209        let mut builder = SessionBuilder::new();
1210
1211        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 17, 0, 3)), 1357);
1212        builder = builder.known_node_addr(addr);
1213
1214        assert_eq!(builder.config.known_nodes, vec![KnownNode::Address(addr)]);
1215        assert_eq!(builder.config.compression, None);
1216    }
1217
1218    #[test]
1219    fn add_known_nodes() {
1220        setup_tracing();
1221        let mut builder = SessionBuilder::new();
1222
1223        builder = builder.known_nodes(["test_hostname1", "test_hostname2"]);
1224
1225        assert_eq!(
1226            builder.config.known_nodes,
1227            vec![
1228                KnownNode::Hostname("test_hostname1".into()),
1229                KnownNode::Hostname("test_hostname2".into())
1230            ]
1231        );
1232        assert_eq!(builder.config.compression, None);
1233    }
1234
1235    #[test]
1236    fn add_known_nodes_addr() {
1237        setup_tracing();
1238        let mut builder = SessionBuilder::new();
1239
1240        let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 17, 0, 3)), 1357);
1241        let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 17, 0, 4)), 9090);
1242
1243        builder = builder.known_nodes_addr([addr1, addr2]);
1244
1245        assert_eq!(
1246            builder.config.known_nodes,
1247            vec![KnownNode::Address(addr1), KnownNode::Address(addr2)]
1248        );
1249        assert_eq!(builder.config.compression, None);
1250    }
1251
1252    #[test]
1253    fn compression() {
1254        setup_tracing();
1255        let mut builder = SessionBuilder::new();
1256        assert_eq!(builder.config.compression, None);
1257
1258        builder = builder.compression(Some(Compression::Lz4));
1259        assert_eq!(builder.config.compression, Some(Compression::Lz4));
1260
1261        builder = builder.compression(Some(Compression::Snappy));
1262        assert_eq!(builder.config.compression, Some(Compression::Snappy));
1263
1264        builder = builder.compression(None);
1265        assert_eq!(builder.config.compression, None);
1266    }
1267
1268    #[test]
1269    fn tcp_nodelay() {
1270        setup_tracing();
1271        let mut builder = SessionBuilder::new();
1272        assert!(builder.config.tcp_nodelay);
1273
1274        builder = builder.tcp_nodelay(false);
1275        assert!(!builder.config.tcp_nodelay);
1276
1277        builder = builder.tcp_nodelay(true);
1278        assert!(builder.config.tcp_nodelay);
1279    }
1280
1281    #[test]
1282    fn use_keyspace() {
1283        setup_tracing();
1284        let mut builder = SessionBuilder::new();
1285        assert_eq!(builder.config.used_keyspace, None);
1286        assert!(!builder.config.keyspace_case_sensitive);
1287
1288        builder = builder.use_keyspace("ks_name_1", true);
1289        assert_eq!(builder.config.used_keyspace, Some("ks_name_1".to_string()));
1290        assert!(builder.config.keyspace_case_sensitive);
1291
1292        builder = builder.use_keyspace("ks_name_2", false);
1293        assert_eq!(builder.config.used_keyspace, Some("ks_name_2".to_string()));
1294        assert!(!builder.config.keyspace_case_sensitive);
1295    }
1296
1297    #[test]
1298    fn connection_timeout() {
1299        setup_tracing();
1300        let mut builder = SessionBuilder::new();
1301        assert_eq!(
1302            builder.config.connect_timeout,
1303            std::time::Duration::from_secs(5)
1304        );
1305
1306        builder = builder.connection_timeout(std::time::Duration::from_secs(10));
1307        assert_eq!(
1308            builder.config.connect_timeout,
1309            std::time::Duration::from_secs(10)
1310        );
1311    }
1312
1313    #[test]
1314    fn fetch_schema_metadata() {
1315        setup_tracing();
1316        let mut builder = SessionBuilder::new();
1317        assert!(builder.config.fetch_schema_metadata);
1318
1319        builder = builder.fetch_schema_metadata(false);
1320        assert!(!builder.config.fetch_schema_metadata);
1321
1322        builder = builder.fetch_schema_metadata(true);
1323        assert!(builder.config.fetch_schema_metadata);
1324    }
1325
1326    // LatencyAwarePolicy, which is used in the test, requires presence of Tokio runtime.
1327    #[tokio::test]
1328    async fn execution_profile() {
1329        setup_tracing();
1330        let default_builder = SessionBuilder::new();
1331        let default_execution_profile = default_builder
1332            .config
1333            .default_execution_profile_handle
1334            .access();
1335        assert_eq!(
1336            default_execution_profile.consistency,
1337            defaults::consistency()
1338        );
1339        assert_eq!(
1340            default_execution_profile.serial_consistency,
1341            defaults::serial_consistency()
1342        );
1343        assert_eq!(
1344            default_execution_profile.request_timeout,
1345            defaults::request_timeout()
1346        );
1347        assert_eq!(
1348            default_execution_profile.load_balancing_policy.name(),
1349            defaults::load_balancing_policy().name()
1350        );
1351
1352        let custom_consistency = Consistency::Any;
1353        let custom_serial_consistency = Some(SerialConsistency::Serial);
1354        let custom_timeout = Some(Duration::from_secs(1));
1355        let execution_profile_handle = ExecutionProfile::builder()
1356            .consistency(custom_consistency)
1357            .serial_consistency(custom_serial_consistency)
1358            .request_timeout(custom_timeout)
1359            .build()
1360            .into_handle();
1361        let builder_with_profile =
1362            default_builder.default_execution_profile_handle(execution_profile_handle.clone());
1363        let execution_profile = execution_profile_handle.access();
1364
1365        let profile_in_builder = builder_with_profile
1366            .config
1367            .default_execution_profile_handle
1368            .access();
1369        assert_eq!(
1370            profile_in_builder.consistency,
1371            execution_profile.consistency
1372        );
1373        assert_eq!(
1374            profile_in_builder.serial_consistency,
1375            execution_profile.serial_consistency
1376        );
1377        assert_eq!(
1378            profile_in_builder.request_timeout,
1379            execution_profile.request_timeout
1380        );
1381        assert_eq!(
1382            profile_in_builder.load_balancing_policy.name(),
1383            execution_profile.load_balancing_policy.name()
1384        );
1385    }
1386
1387    #[test]
1388    fn cluster_metadata_refresh_interval() {
1389        setup_tracing();
1390        let builder = SessionBuilder::new();
1391        assert_eq!(
1392            builder.config.cluster_metadata_refresh_interval,
1393            std::time::Duration::from_secs(60)
1394        );
1395    }
1396
1397    #[test]
1398    fn all_features() {
1399        setup_tracing();
1400        let mut builder = SessionBuilder::new();
1401
1402        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 17, 0, 3)), 8465);
1403        let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 17, 0, 3)), 1357);
1404        let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 17, 0, 4)), 9090);
1405
1406        builder = builder.known_node("hostname_test");
1407        builder = builder.known_node_addr(addr);
1408        builder = builder.known_nodes(["hostname_test1", "hostname_test2"]);
1409        builder = builder.known_nodes_addr([addr1, addr2]);
1410        builder = builder.compression(Some(Compression::Snappy));
1411        builder = builder.tcp_nodelay(true);
1412        builder = builder.use_keyspace("ks_name", true);
1413        builder = builder.fetch_schema_metadata(false);
1414        builder = builder.cluster_metadata_refresh_interval(Duration::from_secs(1));
1415
1416        assert_eq!(
1417            builder.config.known_nodes,
1418            vec![
1419                KnownNode::Hostname("hostname_test".into()),
1420                KnownNode::Address(addr),
1421                KnownNode::Hostname("hostname_test1".into()),
1422                KnownNode::Hostname("hostname_test2".into()),
1423                KnownNode::Address(addr1),
1424                KnownNode::Address(addr2),
1425            ]
1426        );
1427
1428        assert_eq!(builder.config.compression, Some(Compression::Snappy));
1429        assert!(builder.config.tcp_nodelay);
1430        assert_eq!(
1431            builder.config.cluster_metadata_refresh_interval,
1432            Duration::from_secs(1)
1433        );
1434
1435        assert_eq!(builder.config.used_keyspace, Some("ks_name".to_string()));
1436
1437        assert!(builder.config.keyspace_case_sensitive);
1438        assert!(!builder.config.fetch_schema_metadata);
1439    }
1440
1441    // This is to assert that #705 does not break the API (i.e. it merely extends it).
1442    fn _check_known_nodes_compatibility(
1443        hostnames: &[impl AsRef<str>],
1444        host_addresses: &[SocketAddr],
1445    ) {
1446        let mut sb: SessionBuilder = SessionBuilder::new();
1447        sb = sb.known_nodes(hostnames);
1448        sb = sb.known_nodes_addr(host_addresses);
1449
1450        let mut config = sb.config;
1451        config.add_known_nodes(hostnames);
1452        config.add_known_nodes_addr(host_addresses);
1453    }
1454}