scylla/client/session_builder.rs
1//! SessionBuilder provides an easy way to create new Sessions
2
3#[cfg(feature = "unstable-cloud")]
4use super::execution_profile::ExecutionProfile;
5use super::execution_profile::ExecutionProfileHandle;
6use super::session::{Session, SessionConfig};
7use super::{Compression, PoolSize, SelfIdentity, WriteCoalescingDelay};
8use crate::authentication::{AuthenticatorProvider, PlainTextAuthenticator};
9use crate::client::session::TlsContext;
10#[cfg(feature = "unstable-cloud")]
11use crate::cloud::{CloudConfig, CloudConfigError, CloudTlsProvider};
12use crate::errors::NewSessionError;
13use crate::policies::address_translator::AddressTranslator;
14use crate::policies::host_filter::HostFilter;
15use crate::policies::timestamp_generator::TimestampGenerator;
16use crate::routing::ShardAwarePortRange;
17use crate::statement::Consistency;
18use std::borrow::Borrow;
19use std::marker::PhantomData;
20use std::net::{IpAddr, SocketAddr};
21use std::num::NonZeroU32;
22#[cfg(feature = "unstable-cloud")]
23use std::path::Path;
24use std::sync::Arc;
25use std::time::Duration;
26use tracing::warn;
27
28mod sealed {
29 // This is a sealed trait - its whole purpose is to be unnameable.
30 // This means we need to disable the check.
31 #[allow(unknown_lints)] // Rust 1.66 doesn't know this lint
32 #[allow(unnameable_types)]
33 pub trait Sealed {}
34}
35pub trait SessionBuilderKind: sealed::Sealed + Clone {}
36
37#[derive(Clone)]
38pub enum DefaultMode {}
39impl sealed::Sealed for DefaultMode {}
40impl SessionBuilderKind for DefaultMode {}
41
42pub type SessionBuilder = GenericSessionBuilder<DefaultMode>;
43
44#[cfg(feature = "unstable-cloud")]
45#[derive(Clone)]
46pub enum CloudMode {}
47#[cfg(feature = "unstable-cloud")]
48impl sealed::Sealed for CloudMode {}
49#[cfg(feature = "unstable-cloud")]
50impl SessionBuilderKind for CloudMode {}
51
52#[cfg(feature = "unstable-cloud")]
53pub type CloudSessionBuilder = GenericSessionBuilder<CloudMode>;
54
55/// SessionBuilder is used to create new Session instances
56/// # Example
57///
58/// ```
59/// # use scylla::client::session::Session;
60/// # use scylla::client::session_builder::SessionBuilder;
61/// # use scylla::client::Compression;
62/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
63/// let session: Session = SessionBuilder::new()
64/// .known_node("127.0.0.1:9042")
65/// .compression(Some(Compression::Snappy))
66/// .build()
67/// .await?;
68/// # Ok(())
69/// # }
70/// ```
71#[derive(Clone)]
72pub struct GenericSessionBuilder<Kind: SessionBuilderKind> {
73 pub config: SessionConfig,
74 kind: PhantomData<Kind>,
75}
76
77// NOTE: this `impl` block contains configuration options specific for **non-Cloud** [`Session`].
78// This means that if an option fits both non-Cloud and Cloud `Session`s, it should NOT be put
79// here, but rather in `impl<K> GenericSessionBuilder<K>` block.
80impl GenericSessionBuilder<DefaultMode> {
81 /// Creates new SessionBuilder with default configuration
82 /// # Default configuration
83 /// * Compression: None
84 ///
85 pub fn new() -> Self {
86 SessionBuilder {
87 config: SessionConfig::new(),
88 kind: PhantomData,
89 }
90 }
91
92 /// Add a known node with a hostname
93 /// # Examples
94 /// ```
95 /// # use scylla::client::session::Session;
96 /// # use scylla::client::session_builder::SessionBuilder;
97 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
98 /// let session: Session = SessionBuilder::new()
99 /// .known_node("127.0.0.1:9042")
100 /// .build()
101 /// .await?;
102 /// # Ok(())
103 /// # }
104 /// ```
105 ///
106 /// ```
107 /// # use scylla::client::session::Session;
108 /// # use scylla::client::session_builder::SessionBuilder;
109 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
110 /// let session: Session = SessionBuilder::new()
111 /// .known_node("db1.example.com")
112 /// .build()
113 /// .await?;
114 /// # Ok(())
115 /// # }
116 /// ```
117 pub fn known_node(mut self, hostname: impl AsRef<str>) -> Self {
118 self.config.add_known_node(hostname);
119 self
120 }
121
122 /// Add a known node with an IP address
123 /// # Example
124 /// ```
125 /// # use scylla::client::session::Session;
126 /// # use scylla::client::session_builder::SessionBuilder;
127 /// # use std::net::{SocketAddr, IpAddr, Ipv4Addr};
128 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
129 /// let session: Session = SessionBuilder::new()
130 /// .known_node_addr(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9042))
131 /// .build()
132 /// .await?;
133 /// # Ok(())
134 /// # }
135 /// ```
136 pub fn known_node_addr(mut self, node_addr: SocketAddr) -> Self {
137 self.config.add_known_node_addr(node_addr);
138 self
139 }
140
141 /// Add a list of known nodes with hostnames
142 /// # Example
143 /// ```
144 /// # use scylla::client::session::Session;
145 /// # use scylla::client::session_builder::SessionBuilder;
146 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
147 /// let session: Session = SessionBuilder::new()
148 /// .known_nodes(["127.0.0.1:9042", "db1.example.com"])
149 /// .build()
150 /// .await?;
151 /// # Ok(())
152 /// # }
153 /// ```
154 pub fn known_nodes(mut self, hostnames: impl IntoIterator<Item = impl AsRef<str>>) -> Self {
155 self.config.add_known_nodes(hostnames);
156 self
157 }
158
159 /// Add a list of known nodes with IP addresses
160 /// # Example
161 /// ```
162 /// # use scylla::client::session::Session;
163 /// # use scylla::client::session_builder::SessionBuilder;
164 /// # use std::net::{SocketAddr, IpAddr, Ipv4Addr};
165 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
166 /// let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 17, 0, 3)), 9042);
167 /// let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 17, 0, 4)), 9042);
168 ///
169 /// let session: Session = SessionBuilder::new()
170 /// .known_nodes_addr([addr1, addr2])
171 /// .build()
172 /// .await?;
173 /// # Ok(())
174 /// # }
175 /// ```
176 pub fn known_nodes_addr(
177 mut self,
178 node_addrs: impl IntoIterator<Item = impl Borrow<SocketAddr>>,
179 ) -> Self {
180 self.config.add_known_nodes_addr(node_addrs);
181 self
182 }
183
184 /// Set username and password for plain text authentication.\
185 /// If the database server will require authentication\
186 ///
187 /// # Example
188 /// ```
189 /// # use scylla::client::session::Session;
190 /// # use scylla::client::session_builder::SessionBuilder;
191 /// # use scylla::client::Compression;
192 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
193 /// let session: Session = SessionBuilder::new()
194 /// .known_node("127.0.0.1:9042")
195 /// .use_keyspace("my_keyspace_name", false)
196 /// .user("cassandra", "cassandra")
197 /// .build()
198 /// .await?;
199 /// # Ok(())
200 /// # }
201 /// ```
202 pub fn user(mut self, username: impl Into<String>, passwd: impl Into<String>) -> Self {
203 self.config.authenticator = Some(Arc::new(PlainTextAuthenticator::new(
204 username.into(),
205 passwd.into(),
206 )));
207 self
208 }
209
210 /// Set custom authenticator provider to create an authenticator instance during a session creation.
211 ///
212 /// # Example
213 /// ```
214 /// # use std::sync::Arc;
215 /// use bytes::Bytes;
216 /// # use scylla::client::session::Session;
217 /// # use scylla::client::session_builder::SessionBuilder;
218 /// use async_trait::async_trait;
219 /// use scylla::authentication::{AuthenticatorProvider, AuthenticatorSession, AuthError};
220 /// # use scylla::client::Compression;
221 ///
222 /// struct CustomAuthenticator;
223 ///
224 /// #[async_trait]
225 /// impl AuthenticatorSession for CustomAuthenticator {
226 /// async fn evaluate_challenge(&mut self, token: Option<&[u8]>) -> Result<Option<Vec<u8>>, AuthError> {
227 /// Ok(None)
228 /// }
229 ///
230 /// async fn success(&mut self, token: Option<&[u8]>) -> Result<(), AuthError> {
231 /// Ok(())
232 /// }
233 /// }
234 ///
235 /// struct CustomAuthenticatorProvider;
236 ///
237 /// #[async_trait]
238 /// impl AuthenticatorProvider for CustomAuthenticatorProvider {
239 /// async fn start_authentication_session(&self, _authenticator_name: &str) -> Result<(Option<Vec<u8>>, Box<dyn AuthenticatorSession>), AuthError> {
240 /// Ok((None, Box::new(CustomAuthenticator)))
241 /// }
242 /// }
243 ///
244 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
245 /// let session: Session = SessionBuilder::new()
246 /// .known_node("127.0.0.1:9042")
247 /// .use_keyspace("my_keyspace_name", false)
248 /// .user("cassandra", "cassandra")
249 /// .authenticator_provider(Arc::new(CustomAuthenticatorProvider))
250 /// .build()
251 /// .await?;
252 /// # Ok(())
253 /// # }
254 /// ```
255 pub fn authenticator_provider(
256 mut self,
257 authenticator_provider: Arc<dyn AuthenticatorProvider>,
258 ) -> Self {
259 self.config.authenticator = Some(authenticator_provider);
260 self
261 }
262
263 /// Uses a custom address translator for peer addresses retrieved from the cluster.
264 /// By default, no translation is performed.
265 ///
266 /// # Example
267 /// ```
268 /// # use async_trait::async_trait;
269 /// # use std::net::SocketAddr;
270 /// # use std::sync::Arc;
271 /// # use scylla::client::session::Session;
272 /// # use scylla::client::session_builder::SessionBuilder;
273 /// # use scylla::errors::TranslationError;
274 /// # use scylla::policies::address_translator::{AddressTranslator, UntranslatedPeer};
275 /// struct IdentityTranslator;
276 ///
277 /// #[async_trait]
278 /// impl AddressTranslator for IdentityTranslator {
279 /// async fn translate_address(
280 /// &self,
281 /// untranslated_peer: &UntranslatedPeer,
282 /// ) -> Result<SocketAddr, TranslationError> {
283 /// Ok(untranslated_peer.untranslated_address())
284 /// }
285 /// }
286 ///
287 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
288 /// let session: Session = SessionBuilder::new()
289 /// .known_node("127.0.0.1:9042")
290 /// .address_translator(Arc::new(IdentityTranslator))
291 /// .build()
292 /// .await?;
293 /// # Ok(())
294 /// # }
295 /// ```
296 /// # Example
297 /// ```
298 /// # use std::net::SocketAddr;
299 /// # use std::sync::Arc;
300 /// # use std::collections::HashMap;
301 /// # use std::str::FromStr;
302 /// # use scylla::client::session::Session;
303 /// # use scylla::client::session_builder::SessionBuilder;
304 /// # use scylla::errors::TranslationError;
305 /// # use scylla::policies::address_translator::AddressTranslator;
306 /// #
307 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
308 /// let mut translation_rules = HashMap::new();
309 /// let addr_before_translation = SocketAddr::from_str("192.168.0.42:19042").unwrap();
310 /// let addr_after_translation = SocketAddr::from_str("157.123.12.42:23203").unwrap();
311 /// translation_rules.insert(addr_before_translation, addr_after_translation);
312 /// let session: Session = SessionBuilder::new()
313 /// .known_node("127.0.0.1:9042")
314 /// .address_translator(Arc::new(translation_rules))
315 /// .build()
316 /// .await?;
317 /// # Ok(())
318 /// # }
319 /// ```
320 pub fn address_translator(mut self, translator: Arc<dyn AddressTranslator>) -> Self {
321 self.config.address_translator = Some(translator);
322 self
323 }
324
325 /// TLS feature
326 ///
327 /// Provide SessionBuilder with TlsContext that will be
328 /// used to create a TLS connection to the database.
329 /// If set to None TLS connection won't be used.
330 ///
331 /// Default is None.
332 ///
333 /// # Example
334 /// ```
335 /// # use std::fs;
336 /// # use std::path::PathBuf;
337 /// # use scylla::client::session::Session;
338 /// # use scylla::client::session_builder::SessionBuilder;
339 /// # use openssl::ssl::{SslContextBuilder, SslVerifyMode, SslMethod, SslFiletype};
340 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
341 /// let certdir = fs::canonicalize(PathBuf::from("./examples/certs/scylla.crt"))?;
342 /// let mut context_builder = SslContextBuilder::new(SslMethod::tls())?;
343 /// context_builder.set_certificate_file(certdir.as_path(), SslFiletype::PEM)?;
344 /// context_builder.set_verify(SslVerifyMode::NONE);
345 ///
346 /// let session: Session = SessionBuilder::new()
347 /// .known_node("127.0.0.1:9042")
348 /// .tls_context(Some(context_builder.build()))
349 /// .build()
350 /// .await?;
351 /// # Ok(())
352 /// # }
353 /// ```
354 pub fn tls_context(mut self, tls_context: Option<impl Into<TlsContext>>) -> Self {
355 self.config.tls_context = tls_context.map(|t| t.into());
356 self
357 }
358}
359
360// NOTE: this `impl` block contains configuration options specific for **Cloud** [`Session`].
361// This means that if an option fits both non-Cloud and Cloud `Session`s, it should NOT be put
362// here, but rather in `impl<K> GenericSessionBuilder<K>` block.
363#[cfg(feature = "unstable-cloud")]
364impl CloudSessionBuilder {
365 /// Creates a new SessionBuilder with default configuration, based
366 /// on the provided [`CloudConfig`].
367 pub fn from_config(cloud_config: CloudConfig) -> Self {
368 let mut config = SessionConfig::new();
369 let mut exec_profile_builder = ExecutionProfile::builder();
370 if let Some(default_consistency) = cloud_config.get_default_consistency() {
371 exec_profile_builder = exec_profile_builder.consistency(default_consistency);
372 }
373 if let Some(default_serial_consistency) = cloud_config.get_default_serial_consistency() {
374 exec_profile_builder =
375 exec_profile_builder.serial_consistency(Some(default_serial_consistency));
376 }
377 config.default_execution_profile_handle = exec_profile_builder.build().into_handle();
378 config.cloud_config = Some(Arc::new(cloud_config));
379 CloudSessionBuilder {
380 config,
381 kind: PhantomData,
382 }
383 }
384
385 /// Creates a new SessionBuilder with default configuration,
386 /// based on provided path to Scylla Cloud Config yaml.
387 pub fn new(
388 cloud_config: impl AsRef<Path>,
389 tls_provider: CloudTlsProvider,
390 ) -> Result<Self, CloudConfigError> {
391 let cloud_config = CloudConfig::read_from_yaml(cloud_config, tls_provider)?;
392 Ok(Self::from_config(cloud_config))
393 }
394}
395
396// This block contains configuration options that make sense both for Cloud and non-Cloud
397// `Session`s. If an option fit only one of them, it should be put in a specialised block.
398impl<K: SessionBuilderKind> GenericSessionBuilder<K> {
399 /// Sets the local ip address all TCP sockets are bound to.
400 ///
401 /// By default, this option is set to `None`, which is equivalent to:
402 /// - `INADDR_ANY` for IPv4 ([`Ipv4Addr::UNSPECIFIED`][std::net::Ipv4Addr::UNSPECIFIED])
403 /// - `in6addr_any` for IPv6 ([`Ipv6Addr::UNSPECIFIED`][std::net::Ipv6Addr::UNSPECIFIED])
404 ///
405 /// # Example
406 /// ```
407 /// # use scylla::client::session::Session;
408 /// # use scylla::client::session_builder::SessionBuilder;
409 /// # use std::net::Ipv4Addr;
410 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
411 /// let session: Session = SessionBuilder::new()
412 /// .known_node("127.0.0.1:9042")
413 /// .local_ip_address(Some(Ipv4Addr::new(192, 168, 0, 1)))
414 /// .build()
415 /// .await?;
416 /// # Ok(())
417 /// # }
418 /// ```
419 pub fn local_ip_address(mut self, local_ip_address: Option<impl Into<IpAddr>>) -> Self {
420 self.config.local_ip_address = local_ip_address.map(Into::into);
421 self
422 }
423
424 /// Specifies the local port range used for shard-aware connections.
425 ///
426 /// A possible use case is when you want to have multiple [`Session`] objects and do not want
427 /// them to compete for the ports within the same range. It is then advised to assign
428 /// mutually non-overlapping port ranges to each session object.
429 ///
430 /// By default this option is set to [`ShardAwarePortRange::EPHEMERAL_PORT_RANGE`].
431 ///
432 /// For details, see [`ShardAwarePortRange`] documentation.
433 ///
434 /// # Example
435 /// ```
436 /// # use scylla::client::session::Session;
437 /// # use scylla::client::session_builder::SessionBuilder;
438 /// # use scylla::routing::ShardAwarePortRange;
439 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
440 /// let session: Session = SessionBuilder::new()
441 /// .known_node("127.0.0.1:9042")
442 /// .shard_aware_local_port_range(ShardAwarePortRange::new(49200..=50000)?)
443 /// .build()
444 /// .await?;
445 /// # Ok(())
446 /// # }
447 /// ```
448 pub fn shard_aware_local_port_range(mut self, port_range: ShardAwarePortRange) -> Self {
449 self.config.shard_aware_local_port_range = port_range;
450 self
451 }
452
453 /// Set preferred Compression algorithm.
454 /// The default is no compression.
455 /// If it is not supported by database server Session will fall back to no encryption.
456 ///
457 /// # Example
458 /// ```
459 /// # use scylla::client::session::Session;
460 /// # use scylla::client::session_builder::SessionBuilder;
461 /// # use scylla::client::Compression;
462 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
463 /// let session: Session = SessionBuilder::new()
464 /// .known_node("127.0.0.1:9042")
465 /// .compression(Some(Compression::Snappy))
466 /// .build()
467 /// .await?;
468 /// # Ok(())
469 /// # }
470 /// ```
471 pub fn compression(mut self, compression: Option<Compression>) -> Self {
472 self.config.compression = compression;
473 self
474 }
475
476 /// Set the delay for schema agreement check. How often driver should ask if schema is in agreement
477 /// The default is 200 milliseconds.
478 ///
479 /// # Example
480 /// ```
481 /// # use scylla::client::session::Session;
482 /// # use scylla::client::session_builder::SessionBuilder;
483 /// # use std::time::Duration;
484 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
485 /// let session: Session = SessionBuilder::new()
486 /// .known_node("127.0.0.1:9042")
487 /// .schema_agreement_interval(Duration::from_secs(5))
488 /// .build()
489 /// .await?;
490 /// # Ok(())
491 /// # }
492 /// ```
493 pub fn schema_agreement_interval(mut self, timeout: Duration) -> Self {
494 self.config.schema_agreement_interval = timeout;
495 self
496 }
497
498 /// Set the default execution profile using its handle
499 ///
500 /// # Example
501 /// ```
502 /// # use scylla::statement::Consistency;
503 /// # use scylla::client::execution_profile::ExecutionProfile;
504 /// # use scylla::client::session::Session;
505 /// # use scylla::client::session_builder::SessionBuilder;
506 /// # use std::time::Duration;
507 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
508 /// let execution_profile = ExecutionProfile::builder()
509 /// .consistency(Consistency::All)
510 /// .request_timeout(Some(Duration::from_secs(2)))
511 /// .build();
512 /// let session: Session = SessionBuilder::new()
513 /// .known_node("127.0.0.1:9042")
514 /// .default_execution_profile_handle(execution_profile.into_handle())
515 /// .build()
516 /// .await?;
517 /// # Ok(())
518 /// # }
519 /// ```
520 pub fn default_execution_profile_handle(
521 mut self,
522 profile_handle: ExecutionProfileHandle,
523 ) -> Self {
524 self.config.default_execution_profile_handle = profile_handle;
525 self
526 }
527
528 /// Set the nodelay TCP flag.
529 /// The default is true.
530 ///
531 /// # Example
532 /// ```
533 /// # use scylla::client::session::Session;
534 /// # use scylla::client::session_builder::SessionBuilder;
535 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
536 /// let session: Session = SessionBuilder::new()
537 /// .known_node("127.0.0.1:9042")
538 /// .tcp_nodelay(true)
539 /// .build()
540 /// .await?;
541 /// # Ok(())
542 /// # }
543 /// ```
544 pub fn tcp_nodelay(mut self, nodelay: bool) -> Self {
545 self.config.tcp_nodelay = nodelay;
546 self
547 }
548
549 /// Set the TCP keepalive interval.
550 /// The default is `None`, which implies that no keepalive messages
551 /// are sent **on TCP layer** when a connection is idle.
552 /// Note: CQL-layer keepalives are configured separately,
553 /// with `Self::keepalive_interval`.
554 ///
555 /// # Example
556 /// ```
557 /// # use scylla::client::session::Session;
558 /// # use scylla::client::session_builder::SessionBuilder;
559 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
560 /// let session: Session = SessionBuilder::new()
561 /// .known_node("127.0.0.1:9042")
562 /// .tcp_keepalive_interval(std::time::Duration::from_secs(42))
563 /// .build()
564 /// .await?;
565 /// # Ok(())
566 /// # }
567 /// ```
568 pub fn tcp_keepalive_interval(mut self, interval: Duration) -> Self {
569 if interval <= Duration::from_secs(1) {
570 warn!(
571 "Setting the TCP keepalive interval to low values ({:?}) is not recommended as it can have a negative impact on performance. Consider setting it above 1 second.",
572 interval
573 );
574 }
575
576 self.config.tcp_keepalive_interval = Some(interval);
577 self
578 }
579
580 /// Set keyspace to be used on all connections.\
581 /// Each connection will send `"USE <keyspace_name>"` before sending any requests.\
582 /// This can be later changed with [`crate::client::session::Session::use_keyspace`]
583 ///
584 /// # Example
585 /// ```
586 /// # use scylla::client::session::Session;
587 /// # use scylla::client::session_builder::SessionBuilder;
588 /// # use scylla::client::Compression;
589 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
590 /// let session: Session = SessionBuilder::new()
591 /// .known_node("127.0.0.1:9042")
592 /// .use_keyspace("my_keyspace_name", false)
593 /// .build()
594 /// .await?;
595 /// # Ok(())
596 /// # }
597 /// ```
598 pub fn use_keyspace(mut self, keyspace_name: impl Into<String>, case_sensitive: bool) -> Self {
599 self.config.used_keyspace = Some(keyspace_name.into());
600 self.config.keyspace_case_sensitive = case_sensitive;
601 self
602 }
603
604 /// Builds the Session after setting all the options.
605 ///
606 /// # Example
607 /// ```
608 /// # use scylla::client::session::Session;
609 /// # use scylla::client::session_builder::SessionBuilder;
610 /// # use scylla::client::Compression;
611 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
612 /// let session: Session = SessionBuilder::new()
613 /// .known_node("127.0.0.1:9042")
614 /// .compression(Some(Compression::Snappy))
615 /// .build() // Turns SessionBuilder into Session
616 /// .await?;
617 /// # Ok(())
618 /// # }
619 /// ```
620 pub async fn build(&self) -> Result<Session, NewSessionError> {
621 Session::connect(self.config.clone()).await
622 }
623
624 /// Changes connection timeout
625 /// The default is 5 seconds.
626 /// If it's higher than underlying os's default connection timeout it won't effect.
627 ///
628 /// # Example
629 /// ```
630 /// # use scylla::client::session::Session;
631 /// # use scylla::client::session_builder::SessionBuilder;
632 /// # use std::time::Duration;
633 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
634 /// let session: Session = SessionBuilder::new()
635 /// .known_node("127.0.0.1:9042")
636 /// .connection_timeout(Duration::from_secs(30))
637 /// .build() // Turns SessionBuilder into Session
638 /// .await?;
639 /// # Ok(())
640 /// # }
641 /// ```
642 pub fn connection_timeout(mut self, duration: Duration) -> Self {
643 self.config.connect_timeout = duration;
644 self
645 }
646
647 /// Sets the per-node connection pool size.
648 /// The default is one connection per shard, which is the recommended setting for Scylla.
649 ///
650 /// # Example
651 /// ```
652 /// # use scylla::client::session::Session;
653 /// # use scylla::client::session_builder::SessionBuilder;
654 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
655 /// use std::num::NonZeroUsize;
656 /// use scylla::client::PoolSize;
657 ///
658 /// // This session will establish 4 connections to each node.
659 /// // For Scylla clusters, this number will be divided across shards
660 /// let session: Session = SessionBuilder::new()
661 /// .known_node("127.0.0.1:9042")
662 /// .pool_size(PoolSize::PerHost(NonZeroUsize::new(4).unwrap()))
663 /// .build()
664 /// .await?;
665 /// # Ok(())
666 /// # }
667 /// ```
668 pub fn pool_size(mut self, size: PoolSize) -> Self {
669 self.config.connection_pool_size = size;
670 self
671 }
672
673 /// If true, prevents the driver from connecting to the shard-aware port, even if the node supports it.
674 ///
675 /// _This is a Scylla-specific option_. It has no effect on Cassandra clusters.
676 ///
677 /// By default, connecting to the shard-aware port is __allowed__ and, in general, this setting
678 /// _should not be changed_. The shard-aware port (19042 or 19142) makes the process of
679 /// establishing connection per shard more robust compared to the regular transport port
680 /// (9042 or 9142). With the shard-aware port, the driver is able to choose which shard
681 /// will be assigned to the connection.
682 ///
683 /// In order to be able to use the shard-aware port effectively, the port needs to be
684 /// reachable and not behind a NAT which changes source ports (the driver uses the source port
685 /// to tell Scylla which shard to assign). However, the driver is designed to behave in a robust
686 /// way if those conditions are not met - if the driver fails to connect to the port or gets
687 /// a connection to the wrong shard, it will re-attempt the connection to the regular transport port.
688 ///
689 /// The only cost of misconfigured shard-aware port should be a slightly longer reconnection time.
690 /// If it is unacceptable to you or suspect that it causes you some other problems,
691 /// you can use this option to disable the shard-aware port feature completely.
692 /// However, __you should use it as a last resort__. Before you do that, we strongly recommend
693 /// that you consider fixing the network issues.
694 ///
695 /// # Example
696 /// ```
697 /// # use scylla::client::session::Session;
698 /// # use scylla::client::session_builder::SessionBuilder;
699 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
700 /// let session: Session = SessionBuilder::new()
701 /// .known_node("127.0.0.1:9042")
702 /// .disallow_shard_aware_port(true)
703 /// .build()
704 /// .await?;
705 /// # Ok(())
706 /// # }
707 /// ```
708 pub fn disallow_shard_aware_port(mut self, disallow: bool) -> Self {
709 self.config.disallow_shard_aware_port = disallow;
710 self
711 }
712
713 /// Set the timestamp generator that will generate timestamps on the client-side.
714 ///
715 /// # Example
716 /// ```
717 /// # use scylla::client::session::Session;
718 /// # use scylla::client::session_builder::SessionBuilder;
719 /// # use scylla::policies::timestamp_generator::SimpleTimestampGenerator;
720 /// # use std::sync::Arc;
721 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
722 /// let session: Session = SessionBuilder::new()
723 /// .known_node("127.0.0.1:9042")
724 /// .timestamp_generator(Arc::new(SimpleTimestampGenerator::new()))
725 /// .build()
726 /// .await?;
727 /// # Ok(())
728 /// # }
729 /// ```
730 pub fn timestamp_generator(mut self, timestamp_generator: Arc<dyn TimestampGenerator>) -> Self {
731 self.config.timestamp_generator = Some(timestamp_generator);
732 self
733 }
734
735 /// Set the keyspaces to be fetched, to retrieve their strategy, and schema metadata if enabled
736 /// No keyspaces, the default value, means all the keyspaces will be fetched.
737 ///
738 /// # Example
739 /// ```
740 /// # use scylla::client::session::Session;
741 /// # use scylla::client::session_builder::SessionBuilder;
742 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
743 /// let session: Session = SessionBuilder::new()
744 /// .known_node("127.0.0.1:9042")
745 /// .keyspaces_to_fetch(["my_keyspace"])
746 /// .build()
747 /// .await?;
748 /// # Ok(())
749 /// # }
750 /// ```
751 pub fn keyspaces_to_fetch(
752 mut self,
753 keyspaces: impl IntoIterator<Item = impl Into<String>>,
754 ) -> Self {
755 self.config.keyspaces_to_fetch = keyspaces.into_iter().map(Into::into).collect();
756 self
757 }
758
759 /// Set the fetch schema metadata flag.
760 /// The default is true.
761 ///
762 /// # Example
763 /// ```
764 /// # use scylla::client::session::Session;
765 /// # use scylla::client::session_builder::SessionBuilder;
766 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
767 /// let session: Session = SessionBuilder::new()
768 /// .known_node("127.0.0.1:9042")
769 /// .fetch_schema_metadata(true)
770 /// .build()
771 /// .await?;
772 /// # Ok(())
773 /// # }
774 /// ```
775 pub fn fetch_schema_metadata(mut self, fetch: bool) -> Self {
776 self.config.fetch_schema_metadata = fetch;
777 self
778 }
779
780 /// Set the server-side timeout for metadata queries.
781 /// The default is `Some(Duration::from_secs(2))`. It means that
782 /// the all metadata queries will be set the 2 seconds timeout
783 /// no matter what timeout is set as a cluster default.
784 /// This prevents timeouts of schema queries when the schema is large
785 /// and the default timeout is configured as tight.
786 ///
787 /// # Example
788 /// ```
789 /// # use scylla::client::session::Session;
790 /// # use scylla::client::session_builder::SessionBuilder;
791 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
792 /// let session: Session = SessionBuilder::new()
793 /// .known_node("127.0.0.1:9042")
794 /// .metadata_request_serverside_timeout(std::time::Duration::from_secs(5))
795 /// .build()
796 /// .await?;
797 /// # Ok(())
798 /// # }
799 /// ```
800 pub fn metadata_request_serverside_timeout(mut self, timeout: Duration) -> Self {
801 self.config.metadata_request_serverside_timeout = Some(timeout);
802 self
803 }
804
805 /// Set the keepalive interval.
806 /// The default is `Some(Duration::from_secs(30))`, which corresponds
807 /// to keepalive CQL messages being sent every 30 seconds.
808 /// Note: this configures CQL-layer keepalives. See also:
809 /// `Self::tcp_keepalive_interval`.
810 ///
811 /// # Example
812 /// ```
813 /// # use scylla::client::session::Session;
814 /// # use scylla::client::session_builder::SessionBuilder;
815 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
816 /// let session: Session = SessionBuilder::new()
817 /// .known_node("127.0.0.1:9042")
818 /// .keepalive_interval(std::time::Duration::from_secs(42))
819 /// .build()
820 /// .await?;
821 /// # Ok(())
822 /// # }
823 /// ```
824 pub fn keepalive_interval(mut self, interval: Duration) -> Self {
825 if interval <= Duration::from_secs(1) {
826 warn!(
827 "Setting the keepalive interval to low values ({:?}) is not recommended as it can have a negative impact on performance. Consider setting it above 1 second.",
828 interval
829 );
830 }
831
832 self.config.keepalive_interval = Some(interval);
833 self
834 }
835
836 /// Set the keepalive timeout.
837 /// The default is `Some(Duration::from_secs(30))`. It means that
838 /// the connection will be closed if time between sending a keepalive
839 /// and receiving a response to any keepalive (not necessarily the same -
840 /// it may be one sent later) exceeds 30 seconds.
841 ///
842 /// # Example
843 /// ```
844 /// # use scylla::client::session::Session;
845 /// # use scylla::client::session_builder::SessionBuilder;
846 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
847 /// let session: Session = SessionBuilder::new()
848 /// .known_node("127.0.0.1:9042")
849 /// .keepalive_timeout(std::time::Duration::from_secs(42))
850 /// .build()
851 /// .await?;
852 /// # Ok(())
853 /// # }
854 /// ```
855 pub fn keepalive_timeout(mut self, timeout: Duration) -> Self {
856 if timeout <= Duration::from_secs(1) {
857 warn!(
858 "Setting the keepalive timeout to low values ({:?}) is not recommended as it may aggressively close connections. Consider setting it above 5 seconds.",
859 timeout
860 );
861 }
862
863 self.config.keepalive_timeout = Some(timeout);
864 self
865 }
866
867 /// Sets the timeout for waiting for schema agreement.
868 /// By default, the timeout is 60 seconds.
869 ///
870 /// # Example
871 /// ```
872 /// # use scylla::client::session::Session;
873 /// # use scylla::client::session_builder::SessionBuilder;
874 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
875 /// let session: Session = SessionBuilder::new()
876 /// .known_node("127.0.0.1:9042")
877 /// .schema_agreement_timeout(std::time::Duration::from_secs(120))
878 /// .build()
879 /// .await?;
880 /// # Ok(())
881 /// # }
882 /// ```
883 pub fn schema_agreement_timeout(mut self, timeout: Duration) -> Self {
884 self.config.schema_agreement_timeout = timeout;
885 self
886 }
887
888 /// Controls automatic waiting for schema agreement after a schema-altering
889 /// statement is sent. By default, it is enabled.
890 ///
891 /// # Example
892 /// ```
893 /// # use scylla::client::session::Session;
894 /// # use scylla::client::session_builder::SessionBuilder;
895 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
896 /// let session: Session = SessionBuilder::new()
897 /// .known_node("127.0.0.1:9042")
898 /// .auto_await_schema_agreement(false)
899 /// .build()
900 /// .await?;
901 /// # Ok(())
902 /// # }
903 /// ```
904 pub fn auto_await_schema_agreement(mut self, enabled: bool) -> Self {
905 self.config.schema_agreement_automatic_waiting = enabled;
906 self
907 }
908
909 /// Sets the host filter. The host filter decides whether any connections
910 /// should be opened to the node or not. The driver will also avoid
911 /// those nodes when re-establishing the control connection.
912 ///
913 /// See the [host filter](crate::policies::host_filter) module for a list
914 /// of pre-defined filters. It is also possible to provide a custom filter
915 /// by implementing the HostFilter trait.
916 ///
917 /// # Example
918 /// ```
919 /// # use async_trait::async_trait;
920 /// # use std::net::SocketAddr;
921 /// # use std::sync::Arc;
922 /// # use scylla::client::session::Session;
923 /// # use scylla::client::session_builder::SessionBuilder;
924 /// # use scylla::errors::TranslationError;
925 /// # use scylla::policies::address_translator::AddressTranslator;
926 /// # use scylla::policies::host_filter::DcHostFilter;
927 ///
928 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
929 /// // The session will only connect to nodes from "my-local-dc"
930 /// let session: Session = SessionBuilder::new()
931 /// .known_node("127.0.0.1:9042")
932 /// .host_filter(Arc::new(DcHostFilter::new("my-local-dc".to_string())))
933 /// .build()
934 /// .await?;
935 /// # Ok(())
936 /// # }
937 /// ```
938 pub fn host_filter(mut self, filter: Arc<dyn HostFilter>) -> Self {
939 self.config.host_filter = Some(filter);
940 self
941 }
942
943 /// Set the refresh metadata on schema agreement flag.
944 /// The default is true.
945 ///
946 /// # Example
947 /// ```
948 /// # use scylla::client::session::Session;
949 /// # use scylla::client::session_builder::SessionBuilder;
950 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
951 /// let session: Session = SessionBuilder::new()
952 /// .known_node("127.0.0.1:9042")
953 /// .refresh_metadata_on_auto_schema_agreement(true)
954 /// .build()
955 /// .await?;
956 /// # Ok(())
957 /// # }
958 /// ```
959 pub fn refresh_metadata_on_auto_schema_agreement(mut self, refresh_metadata: bool) -> Self {
960 self.config.refresh_metadata_on_auto_schema_agreement = refresh_metadata;
961 self
962 }
963
964 /// Set the number of attempts to fetch [TracingInfo](crate::observability::tracing::TracingInfo)
965 /// in [`Session::get_tracing_info`](crate::client::session::Session::get_tracing_info).
966 /// The default is 5 attempts.
967 ///
968 /// Tracing info might not be available immediately on queried node - that's why
969 /// the driver performs a few attempts with sleeps in between.
970 ///
971 /// Cassandra users may want to increase this value - the default is good
972 /// for Scylla, but Cassandra sometimes needs more time for the data to
973 /// appear in tracing table.
974 ///
975 /// # Example
976 /// ```
977 /// # use scylla::client::session::Session;
978 /// # use scylla::client::session_builder::SessionBuilder;
979 /// # use std::num::NonZeroU32;
980 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
981 /// let session: Session = SessionBuilder::new()
982 /// .known_node("127.0.0.1:9042")
983 /// .tracing_info_fetch_attempts(NonZeroU32::new(10).unwrap())
984 /// .build()
985 /// .await?;
986 /// # Ok(())
987 /// # }
988 /// ```
989 pub fn tracing_info_fetch_attempts(mut self, attempts: NonZeroU32) -> Self {
990 self.config.tracing_info_fetch_attempts = attempts;
991 self
992 }
993
994 /// Set the delay between attempts to fetch [TracingInfo](crate::observability::tracing::TracingInfo)
995 /// in [`Session::get_tracing_info`](crate::client::session::Session::get_tracing_info).
996 /// The default is 3 milliseconds.
997 ///
998 /// Tracing info might not be available immediately on queried node - that's why
999 /// the driver performs a few attempts with sleeps in between.
1000 ///
1001 /// Cassandra users may want to increase this value - the default is good
1002 /// for Scylla, but Cassandra sometimes needs more time for the data to
1003 /// appear in tracing table.
1004 ///
1005 /// # Example
1006 /// ```
1007 /// # use scylla::client::session::Session;
1008 /// # use scylla::client::session_builder::SessionBuilder;
1009 /// # use std::time::Duration;
1010 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1011 /// let session: Session = SessionBuilder::new()
1012 /// .known_node("127.0.0.1:9042")
1013 /// .tracing_info_fetch_interval(Duration::from_millis(50))
1014 /// .build()
1015 /// .await?;
1016 /// # Ok(())
1017 /// # }
1018 /// ```
1019 pub fn tracing_info_fetch_interval(mut self, interval: Duration) -> Self {
1020 self.config.tracing_info_fetch_interval = interval;
1021 self
1022 }
1023
1024 /// Set the consistency level of fetching [TracingInfo](crate::observability::tracing::TracingInfo)
1025 /// in [`Session::get_tracing_info`](crate::client::session::Session::get_tracing_info).
1026 /// The default is [`Consistency::One`].
1027 ///
1028 /// # Example
1029 /// ```
1030 /// # use scylla::statement::Consistency;
1031 /// # use scylla::client::session::Session;
1032 /// # use scylla::client::session_builder::SessionBuilder;
1033 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1034 /// let session: Session = SessionBuilder::new()
1035 /// .known_node("127.0.0.1:9042")
1036 /// .tracing_info_fetch_consistency(Consistency::One)
1037 /// .build()
1038 /// .await?;
1039 /// # Ok(())
1040 /// # }
1041 /// ```
1042 pub fn tracing_info_fetch_consistency(mut self, consistency: Consistency) -> Self {
1043 self.config.tracing_info_fetch_consistency = consistency;
1044 self
1045 }
1046
1047 /// If true, the driver will inject a delay controlled by [SessionBuilder::write_coalescing_delay()]
1048 /// before flushing data to the socket.
1049 /// This gives the driver an opportunity to collect more write requests
1050 /// and write them in a single syscall, increasing the efficiency.
1051 ///
1052 /// However, this optimization may worsen latency if the rate of requests
1053 /// issued by the application is low, but otherwise the application is
1054 /// heavily loaded with other tasks on the same tokio executor.
1055 /// Please do performance measurements before committing to disabling
1056 /// this option.
1057 ///
1058 /// This option is true by default.
1059 ///
1060 /// # Example
1061 /// ```
1062 /// # use scylla::client::session::Session;
1063 /// # use scylla::client::session_builder::SessionBuilder;
1064 /// # use scylla::client::Compression;
1065 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1066 /// let session: Session = SessionBuilder::new()
1067 /// .known_node("127.0.0.1:9042")
1068 /// .write_coalescing(false) // Enabled by default
1069 /// .build()
1070 /// .await?;
1071 /// # Ok(())
1072 /// # }
1073 /// ```
1074 pub fn write_coalescing(mut self, enable: bool) -> Self {
1075 self.config.enable_write_coalescing = enable;
1076 self
1077 }
1078
1079 /// Controls the write coalescing delay (if enabled).
1080 ///
1081 /// This option has no effect if [`SessionBuilder::write_coalescing()`] is set to false.
1082 ///
1083 /// This option is [`WriteCoalescingDelay::SmallNondeterministic`] by default.
1084 ///
1085 /// # Example
1086 /// ```
1087 /// # use scylla::client::session::Session;
1088 /// # use scylla::client::session_builder::SessionBuilder;
1089 /// # use scylla::client::WriteCoalescingDelay;
1090 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1091 /// let session: Session = SessionBuilder::new()
1092 /// .known_node("127.0.0.1:9042")
1093 /// .write_coalescing_delay(WriteCoalescingDelay::SmallNondeterministic)
1094 /// .build()
1095 /// .await?;
1096 /// # Ok(())
1097 /// # }
1098 /// ```
1099 pub fn write_coalescing_delay(mut self, delay: WriteCoalescingDelay) -> Self {
1100 self.config.write_coalescing_delay = delay;
1101 self
1102 }
1103
1104 /// Set the interval at which the driver refreshes the cluster metadata which contains information
1105 /// about the cluster topology as well as the cluster schema.
1106 ///
1107 /// The default is 60 seconds.
1108 ///
1109 /// In the given example, we have set the duration value to 20 seconds, which
1110 /// means that the metadata is refreshed every 20 seconds.
1111 /// # Example
1112 /// ```
1113 /// # use scylla::client::session::Session;
1114 /// # use scylla::client::session_builder::SessionBuilder;
1115 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1116 /// let session: Session = SessionBuilder::new()
1117 /// .known_node("127.0.0.1:9042")
1118 /// .cluster_metadata_refresh_interval(std::time::Duration::from_secs(20))
1119 /// .build()
1120 /// .await?;
1121 /// # Ok(())
1122 /// # }
1123 /// ```
1124 pub fn cluster_metadata_refresh_interval(mut self, interval: Duration) -> Self {
1125 self.config.cluster_metadata_refresh_interval = interval;
1126 self
1127 }
1128
1129 /// Set the custom identity of the driver/application/instance,
1130 /// to be sent as options in STARTUP message.
1131 ///
1132 /// By default driver name and version are sent;
1133 /// application name and version and client id are not sent.
1134 ///
1135 /// # Example
1136 /// ```
1137 /// # use scylla::client::session::Session;
1138 /// # use scylla::client::session_builder::SessionBuilder;
1139 /// # use scylla::client::SelfIdentity;
1140 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1141 /// let (app_major, app_minor, app_patch) = (2, 1, 3);
1142 /// let app_version = format!("{app_major}.{app_minor}.{app_patch}");
1143 ///
1144 /// let session: Session = SessionBuilder::new()
1145 /// .known_node("127.0.0.1:9042")
1146 /// .custom_identity(
1147 /// SelfIdentity::new()
1148 /// .with_custom_driver_version("0.13.0-custom_build_17")
1149 /// .with_application_name("my-app")
1150 /// .with_application_version(app_version)
1151 /// )
1152 /// .build()
1153 /// .await?;
1154 /// # Ok(())
1155 /// # }
1156 /// ```
1157 pub fn custom_identity(mut self, identity: SelfIdentity<'static>) -> Self {
1158 self.config.identity = identity;
1159 self
1160 }
1161}
1162
1163/// Creates a [`SessionBuilder`] with default configuration, same as [`SessionBuilder::new`]
1164impl Default for SessionBuilder {
1165 fn default() -> Self {
1166 SessionBuilder::new()
1167 }
1168}
1169
1170#[cfg(test)]
1171mod tests {
1172 use scylla_cql::frame::types::SerialConsistency;
1173 use scylla_cql::Consistency;
1174
1175 use super::super::Compression;
1176 use super::SessionBuilder;
1177 use crate::client::execution_profile::{defaults, ExecutionProfile};
1178 use crate::cluster::node::KnownNode;
1179 use crate::test_utils::setup_tracing;
1180 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
1181 use std::time::Duration;
1182
1183 #[test]
1184 fn default_session_builder() {
1185 setup_tracing();
1186 let builder = SessionBuilder::new();
1187
1188 assert!(builder.config.known_nodes.is_empty());
1189 assert_eq!(builder.config.compression, None);
1190 }
1191
1192 #[test]
1193 fn add_known_node() {
1194 setup_tracing();
1195 let mut builder = SessionBuilder::new();
1196
1197 builder = builder.known_node("test_hostname");
1198
1199 assert_eq!(
1200 builder.config.known_nodes,
1201 vec![KnownNode::Hostname("test_hostname".into())]
1202 );
1203 assert_eq!(builder.config.compression, None);
1204 }
1205
1206 #[test]
1207 fn add_known_node_addr() {
1208 setup_tracing();
1209 let mut builder = SessionBuilder::new();
1210
1211 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 17, 0, 3)), 1357);
1212 builder = builder.known_node_addr(addr);
1213
1214 assert_eq!(builder.config.known_nodes, vec![KnownNode::Address(addr)]);
1215 assert_eq!(builder.config.compression, None);
1216 }
1217
1218 #[test]
1219 fn add_known_nodes() {
1220 setup_tracing();
1221 let mut builder = SessionBuilder::new();
1222
1223 builder = builder.known_nodes(["test_hostname1", "test_hostname2"]);
1224
1225 assert_eq!(
1226 builder.config.known_nodes,
1227 vec![
1228 KnownNode::Hostname("test_hostname1".into()),
1229 KnownNode::Hostname("test_hostname2".into())
1230 ]
1231 );
1232 assert_eq!(builder.config.compression, None);
1233 }
1234
1235 #[test]
1236 fn add_known_nodes_addr() {
1237 setup_tracing();
1238 let mut builder = SessionBuilder::new();
1239
1240 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 17, 0, 3)), 1357);
1241 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 17, 0, 4)), 9090);
1242
1243 builder = builder.known_nodes_addr([addr1, addr2]);
1244
1245 assert_eq!(
1246 builder.config.known_nodes,
1247 vec![KnownNode::Address(addr1), KnownNode::Address(addr2)]
1248 );
1249 assert_eq!(builder.config.compression, None);
1250 }
1251
1252 #[test]
1253 fn compression() {
1254 setup_tracing();
1255 let mut builder = SessionBuilder::new();
1256 assert_eq!(builder.config.compression, None);
1257
1258 builder = builder.compression(Some(Compression::Lz4));
1259 assert_eq!(builder.config.compression, Some(Compression::Lz4));
1260
1261 builder = builder.compression(Some(Compression::Snappy));
1262 assert_eq!(builder.config.compression, Some(Compression::Snappy));
1263
1264 builder = builder.compression(None);
1265 assert_eq!(builder.config.compression, None);
1266 }
1267
1268 #[test]
1269 fn tcp_nodelay() {
1270 setup_tracing();
1271 let mut builder = SessionBuilder::new();
1272 assert!(builder.config.tcp_nodelay);
1273
1274 builder = builder.tcp_nodelay(false);
1275 assert!(!builder.config.tcp_nodelay);
1276
1277 builder = builder.tcp_nodelay(true);
1278 assert!(builder.config.tcp_nodelay);
1279 }
1280
1281 #[test]
1282 fn use_keyspace() {
1283 setup_tracing();
1284 let mut builder = SessionBuilder::new();
1285 assert_eq!(builder.config.used_keyspace, None);
1286 assert!(!builder.config.keyspace_case_sensitive);
1287
1288 builder = builder.use_keyspace("ks_name_1", true);
1289 assert_eq!(builder.config.used_keyspace, Some("ks_name_1".to_string()));
1290 assert!(builder.config.keyspace_case_sensitive);
1291
1292 builder = builder.use_keyspace("ks_name_2", false);
1293 assert_eq!(builder.config.used_keyspace, Some("ks_name_2".to_string()));
1294 assert!(!builder.config.keyspace_case_sensitive);
1295 }
1296
1297 #[test]
1298 fn connection_timeout() {
1299 setup_tracing();
1300 let mut builder = SessionBuilder::new();
1301 assert_eq!(
1302 builder.config.connect_timeout,
1303 std::time::Duration::from_secs(5)
1304 );
1305
1306 builder = builder.connection_timeout(std::time::Duration::from_secs(10));
1307 assert_eq!(
1308 builder.config.connect_timeout,
1309 std::time::Duration::from_secs(10)
1310 );
1311 }
1312
1313 #[test]
1314 fn fetch_schema_metadata() {
1315 setup_tracing();
1316 let mut builder = SessionBuilder::new();
1317 assert!(builder.config.fetch_schema_metadata);
1318
1319 builder = builder.fetch_schema_metadata(false);
1320 assert!(!builder.config.fetch_schema_metadata);
1321
1322 builder = builder.fetch_schema_metadata(true);
1323 assert!(builder.config.fetch_schema_metadata);
1324 }
1325
1326 // LatencyAwarePolicy, which is used in the test, requires presence of Tokio runtime.
1327 #[tokio::test]
1328 async fn execution_profile() {
1329 setup_tracing();
1330 let default_builder = SessionBuilder::new();
1331 let default_execution_profile = default_builder
1332 .config
1333 .default_execution_profile_handle
1334 .access();
1335 assert_eq!(
1336 default_execution_profile.consistency,
1337 defaults::consistency()
1338 );
1339 assert_eq!(
1340 default_execution_profile.serial_consistency,
1341 defaults::serial_consistency()
1342 );
1343 assert_eq!(
1344 default_execution_profile.request_timeout,
1345 defaults::request_timeout()
1346 );
1347 assert_eq!(
1348 default_execution_profile.load_balancing_policy.name(),
1349 defaults::load_balancing_policy().name()
1350 );
1351
1352 let custom_consistency = Consistency::Any;
1353 let custom_serial_consistency = Some(SerialConsistency::Serial);
1354 let custom_timeout = Some(Duration::from_secs(1));
1355 let execution_profile_handle = ExecutionProfile::builder()
1356 .consistency(custom_consistency)
1357 .serial_consistency(custom_serial_consistency)
1358 .request_timeout(custom_timeout)
1359 .build()
1360 .into_handle();
1361 let builder_with_profile =
1362 default_builder.default_execution_profile_handle(execution_profile_handle.clone());
1363 let execution_profile = execution_profile_handle.access();
1364
1365 let profile_in_builder = builder_with_profile
1366 .config
1367 .default_execution_profile_handle
1368 .access();
1369 assert_eq!(
1370 profile_in_builder.consistency,
1371 execution_profile.consistency
1372 );
1373 assert_eq!(
1374 profile_in_builder.serial_consistency,
1375 execution_profile.serial_consistency
1376 );
1377 assert_eq!(
1378 profile_in_builder.request_timeout,
1379 execution_profile.request_timeout
1380 );
1381 assert_eq!(
1382 profile_in_builder.load_balancing_policy.name(),
1383 execution_profile.load_balancing_policy.name()
1384 );
1385 }
1386
1387 #[test]
1388 fn cluster_metadata_refresh_interval() {
1389 setup_tracing();
1390 let builder = SessionBuilder::new();
1391 assert_eq!(
1392 builder.config.cluster_metadata_refresh_interval,
1393 std::time::Duration::from_secs(60)
1394 );
1395 }
1396
1397 #[test]
1398 fn all_features() {
1399 setup_tracing();
1400 let mut builder = SessionBuilder::new();
1401
1402 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 17, 0, 3)), 8465);
1403 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 17, 0, 3)), 1357);
1404 let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 17, 0, 4)), 9090);
1405
1406 builder = builder.known_node("hostname_test");
1407 builder = builder.known_node_addr(addr);
1408 builder = builder.known_nodes(["hostname_test1", "hostname_test2"]);
1409 builder = builder.known_nodes_addr([addr1, addr2]);
1410 builder = builder.compression(Some(Compression::Snappy));
1411 builder = builder.tcp_nodelay(true);
1412 builder = builder.use_keyspace("ks_name", true);
1413 builder = builder.fetch_schema_metadata(false);
1414 builder = builder.cluster_metadata_refresh_interval(Duration::from_secs(1));
1415
1416 assert_eq!(
1417 builder.config.known_nodes,
1418 vec![
1419 KnownNode::Hostname("hostname_test".into()),
1420 KnownNode::Address(addr),
1421 KnownNode::Hostname("hostname_test1".into()),
1422 KnownNode::Hostname("hostname_test2".into()),
1423 KnownNode::Address(addr1),
1424 KnownNode::Address(addr2),
1425 ]
1426 );
1427
1428 assert_eq!(builder.config.compression, Some(Compression::Snappy));
1429 assert!(builder.config.tcp_nodelay);
1430 assert_eq!(
1431 builder.config.cluster_metadata_refresh_interval,
1432 Duration::from_secs(1)
1433 );
1434
1435 assert_eq!(builder.config.used_keyspace, Some("ks_name".to_string()));
1436
1437 assert!(builder.config.keyspace_case_sensitive);
1438 assert!(!builder.config.fetch_schema_metadata);
1439 }
1440
1441 // This is to assert that #705 does not break the API (i.e. it merely extends it).
1442 fn _check_known_nodes_compatibility(
1443 hostnames: &[impl AsRef<str>],
1444 host_addresses: &[SocketAddr],
1445 ) {
1446 let mut sb: SessionBuilder = SessionBuilder::new();
1447 sb = sb.known_nodes(hostnames);
1448 sb = sb.known_nodes_addr(host_addresses);
1449
1450 let mut config = sb.config;
1451 config.add_known_nodes(hostnames);
1452 config.add_known_nodes_addr(host_addresses);
1453 }
1454}