scylla/client/session.rs
1//! `Session` is the main object used in the driver.\
2//! It manages all connections to the cluster and allows to execute CQL requests.
3
4use super::execution_profile::{ExecutionProfile, ExecutionProfileHandle, ExecutionProfileInner};
5use super::pager::{PreparedIteratorConfig, QueryPager};
6use super::{Compression, PoolSize, SelfIdentity, WriteCoalescingDelay};
7use crate::authentication::AuthenticatorProvider;
8#[cfg(feature = "unstable-cloud")]
9use crate::cloud::CloudConfig;
10#[cfg(feature = "unstable-cloud")]
11use crate::cluster::node::CloudEndpoint;
12use crate::cluster::node::{InternalKnownNode, KnownNode, NodeRef};
13use crate::cluster::{Cluster, ClusterNeatDebug, ClusterState};
14use crate::errors::{
15 BadQuery, ExecutionError, MetadataError, NewSessionError, PagerExecutionError, PrepareError,
16 RequestAttemptError, RequestError, SchemaAgreementError, TracingError, UseKeyspaceError,
17};
18use crate::frame::response::result;
19use crate::network::tls::TlsProvider;
20use crate::network::{Connection, ConnectionConfig, PoolConfig, VerifiedKeyspaceName};
21use crate::observability::driver_tracing::RequestSpan;
22use crate::observability::history::{self, HistoryListener};
23#[cfg(feature = "metrics")]
24use crate::observability::metrics::Metrics;
25use crate::observability::tracing::TracingInfo;
26use crate::policies::address_translator::AddressTranslator;
27use crate::policies::host_filter::HostFilter;
28use crate::policies::load_balancing::{self, RoutingInfo};
29use crate::policies::retry::{RequestInfo, RetryDecision, RetrySession};
30use crate::policies::speculative_execution;
31use crate::policies::timestamp_generator::TimestampGenerator;
32use crate::response::query_result::{MaybeFirstRowError, QueryResult, RowsError};
33use crate::response::{NonErrorQueryResponse, PagingState, PagingStateResponse, QueryResponse};
34use crate::routing::partitioner::PartitionerName;
35use crate::routing::{Shard, ShardAwarePortRange};
36use crate::statement::batch::batch_values;
37use crate::statement::batch::{Batch, BatchStatement};
38use crate::statement::prepared::{PartitionKeyError, PreparedStatement};
39use crate::statement::unprepared::Statement;
40use crate::statement::{Consistency, PageSize, StatementConfig};
41use arc_swap::ArcSwapOption;
42use futures::future::join_all;
43use futures::future::try_join_all;
44use itertools::Itertools;
45use scylla_cql::frame::response::NonErrorResponse;
46use scylla_cql::serialize::batch::BatchValues;
47use scylla_cql::serialize::row::{SerializeRow, SerializedValues};
48use std::borrow::Borrow;
49use std::future::Future;
50use std::net::{IpAddr, SocketAddr};
51use std::num::NonZeroU32;
52use std::sync::Arc;
53use std::time::Duration;
54use tokio::time::timeout;
55#[cfg(feature = "unstable-cloud")]
56use tracing::warn;
57use tracing::{debug, error, trace, trace_span, Instrument};
58use uuid::Uuid;
59
60pub(crate) const TABLET_CHANNEL_SIZE: usize = 8192;
61
62const TRACING_QUERY_PAGE_SIZE: i32 = 1024;
63
64/// `Session` manages connections to the cluster and allows to execute CQL requests.
65pub struct Session {
66 cluster: Cluster,
67 default_execution_profile_handle: ExecutionProfileHandle,
68 schema_agreement_interval: Duration,
69 #[cfg(feature = "metrics")]
70 metrics: Arc<Metrics>,
71 schema_agreement_timeout: Duration,
72 schema_agreement_automatic_waiting: bool,
73 refresh_metadata_on_auto_schema_agreement: bool,
74 keyspace_name: Arc<ArcSwapOption<String>>,
75 tracing_info_fetch_attempts: NonZeroU32,
76 tracing_info_fetch_interval: Duration,
77 tracing_info_fetch_consistency: Consistency,
78}
79
80/// This implementation deliberately omits some details from Cluster in order
81/// to avoid cluttering the print with much information of little usability.
82impl std::fmt::Debug for Session {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 let mut d = f.debug_struct("Session");
85 d.field("cluster", &ClusterNeatDebug(&self.cluster))
86 .field(
87 "default_execution_profile_handle",
88 &self.default_execution_profile_handle,
89 )
90 .field("schema_agreement_interval", &self.schema_agreement_interval);
91
92 #[cfg(feature = "metrics")]
93 d.field("metrics", &self.metrics);
94
95 d.field(
96 "auto_await_schema_agreement_timeout",
97 &self.schema_agreement_timeout,
98 )
99 .field(
100 "schema_agreement_automatic_waiting",
101 &self.schema_agreement_automatic_waiting,
102 )
103 .field(
104 "refresh_metadata_on_auto_schema_agreement",
105 &self.refresh_metadata_on_auto_schema_agreement,
106 )
107 .field("keyspace_name", &self.keyspace_name)
108 .field(
109 "tracing_info_fetch_attempts",
110 &self.tracing_info_fetch_attempts,
111 )
112 .field(
113 "tracing_info_fetch_interval",
114 &self.tracing_info_fetch_interval,
115 )
116 .field(
117 "tracing_info_fetch_consistency",
118 &self.tracing_info_fetch_consistency,
119 )
120 .finish()
121 }
122}
123
124#[derive(Clone)] // Cheaply clonable - reference counted.
125#[non_exhaustive]
126pub enum TlsContext {
127 #[cfg(feature = "openssl-010")]
128 OpenSsl010(openssl::ssl::SslContext),
129 #[cfg(feature = "rustls-023")]
130 Rustls023(Arc<rustls::ClientConfig>),
131}
132
133#[cfg(feature = "openssl-010")]
134impl From<openssl::ssl::SslContext> for TlsContext {
135 fn from(value: openssl::ssl::SslContext) -> Self {
136 TlsContext::OpenSsl010(value)
137 }
138}
139
140#[cfg(feature = "rustls-023")]
141impl From<Arc<rustls::ClientConfig>> for TlsContext {
142 fn from(value: Arc<rustls::ClientConfig>) -> Self {
143 TlsContext::Rustls023(value)
144 }
145}
146
147/// Configuration options for [`Session`].
148/// Can be created manually, but usually it's easier to use
149/// [SessionBuilder](super::session_builder::SessionBuilder)
150#[derive(Clone)]
151#[non_exhaustive]
152pub struct SessionConfig {
153 /// List of database servers known on Session startup.
154 /// Session will connect to these nodes to retrieve information about other nodes in the cluster.
155 /// Each node can be represented as a hostname or an IP address.
156 pub known_nodes: Vec<KnownNode>,
157
158 /// A local ip address to bind all driver's TCP sockets to.
159 ///
160 /// By default set to None, which is equivalent to:
161 /// - `INADDR_ANY` for IPv4 ([`Ipv4Addr::UNSPECIFIED`][std::net::Ipv4Addr::UNSPECIFIED])
162 /// - `in6addr_any` for IPv6 ([`Ipv6Addr::UNSPECIFIED`][std::net::Ipv6Addr::UNSPECIFIED])
163 pub local_ip_address: Option<IpAddr>,
164
165 /// Specifies the local port range used for shard-aware connections.
166 ///
167 /// By default set to [`ShardAwarePortRange::EPHEMERAL_PORT_RANGE`].
168 pub shard_aware_local_port_range: ShardAwarePortRange,
169
170 /// Preferred compression algorithm to use on connections.
171 /// If it's not supported by database server Session will fall back to no compression.
172 pub compression: Option<Compression>,
173 pub tcp_nodelay: bool,
174 pub tcp_keepalive_interval: Option<Duration>,
175
176 pub default_execution_profile_handle: ExecutionProfileHandle,
177
178 pub used_keyspace: Option<String>,
179 pub keyspace_case_sensitive: bool,
180
181 /// Provide our Session with TLS
182 pub tls_context: Option<TlsContext>,
183
184 pub authenticator: Option<Arc<dyn AuthenticatorProvider>>,
185
186 pub connect_timeout: Duration,
187
188 /// Size of the per-node connection pool, i.e. how many connections the driver should keep to each node.
189 /// The default is `PerShard(1)`, which is the recommended setting for Scylla clusters.
190 pub connection_pool_size: PoolSize,
191
192 /// If true, prevents the driver from connecting to the shard-aware port, even if the node supports it.
193 /// Generally, this options is best left as default (false).
194 pub disallow_shard_aware_port: bool,
195
196 /// Timestamp generator used for generating timestamps on the client-side
197 /// If None, server-side timestamps are used.
198 pub timestamp_generator: Option<Arc<dyn TimestampGenerator>>,
199
200 /// If empty, fetch all keyspaces
201 pub keyspaces_to_fetch: Vec<String>,
202
203 /// If true, full schema is fetched with every metadata refresh.
204 pub fetch_schema_metadata: bool,
205
206 /// Custom timeout for requests that query metadata.
207 pub metadata_request_serverside_timeout: Option<Duration>,
208
209 /// Interval of sending keepalive requests.
210 /// If `None`, keepalives are never sent, so `Self::keepalive_timeout` has no effect.
211 pub keepalive_interval: Option<Duration>,
212
213 /// Controls after what time of not receiving response to keepalives a connection is closed.
214 /// If `None`, connections are never closed due to lack of response to a keepalive message.
215 pub keepalive_timeout: Option<Duration>,
216
217 /// How often the driver should ask if schema is in agreement.
218 pub schema_agreement_interval: Duration,
219
220 /// Controls the timeout for waiting for schema agreement.
221 /// This works both for manual awaiting schema agreement and for
222 /// automatic waiting after a schema-altering statement is sent.
223 pub schema_agreement_timeout: Duration,
224
225 /// Controls whether schema agreement is automatically awaited
226 /// after sending a schema-altering statement.
227 pub schema_agreement_automatic_waiting: bool,
228
229 /// If true, full schema metadata is fetched after successfully reaching a schema agreement.
230 /// It is true by default but can be disabled if successive schema-altering statements should be performed.
231 pub refresh_metadata_on_auto_schema_agreement: bool,
232
233 /// The address translator is used to translate addresses received from ScyllaDB nodes
234 /// (either with cluster metadata or with an event) to addresses that can be used to
235 /// actually connect to those nodes. This may be needed e.g. when there is NAT
236 /// between the nodes and the driver.
237 pub address_translator: Option<Arc<dyn AddressTranslator>>,
238
239 /// The host filter decides whether any connections should be opened
240 /// to the node or not. The driver will also avoid filtered out nodes when
241 /// re-establishing the control connection.
242 pub host_filter: Option<Arc<dyn HostFilter>>,
243
244 /// If the driver is to connect to ScyllaCloud, there is a config for it.
245 #[cfg(feature = "unstable-cloud")]
246 pub cloud_config: Option<Arc<CloudConfig>>,
247
248 /// If true, the driver will inject a delay controlled by [`SessionConfig::write_coalescing_delay`]
249 /// before flushing data to the socket.
250 /// This gives the driver an opportunity to collect more write requests
251 /// and write them in a single syscall, increasing the efficiency.
252 ///
253 /// However, this optimization may worsen latency if the rate of requests
254 /// issued by the application is low, but otherwise the application is
255 /// heavily loaded with other tasks on the same tokio executor.
256 /// Please do performance measurements before committing to disabling
257 /// this option.
258 pub enable_write_coalescing: bool,
259
260 /// Controls the write coalescing delay (if enabled).
261 ///
262 /// This option has no effect if [`SessionConfig::enable_write_coalescing`] is false.
263 ///
264 /// This option is [`WriteCoalescingDelay::SmallNondeterministic`] by default.
265 pub write_coalescing_delay: WriteCoalescingDelay,
266
267 /// Number of attempts to fetch [`TracingInfo`]
268 /// in [`Session::get_tracing_info`]. Tracing info
269 /// might not be available immediately on queried node - that's why
270 /// the driver performs a few attempts with sleeps in between.
271 pub tracing_info_fetch_attempts: NonZeroU32,
272
273 /// Delay between attempts to fetch [`TracingInfo`]
274 /// in [`Session::get_tracing_info`]. Tracing info
275 /// might not be available immediately on queried node - that's why
276 /// the driver performs a few attempts with sleeps in between.
277 pub tracing_info_fetch_interval: Duration,
278
279 /// Consistency level of fetching [`TracingInfo`]
280 /// in [`Session::get_tracing_info`].
281 pub tracing_info_fetch_consistency: Consistency,
282
283 /// Interval between refreshing cluster metadata. This
284 /// can be configured according to the traffic pattern
285 /// for e.g: if they do not want unexpected traffic
286 /// or they expect the topology to change frequently.
287 pub cluster_metadata_refresh_interval: Duration,
288
289 /// Driver and application self-identifying information,
290 /// to be sent to server in STARTUP message.
291 pub identity: SelfIdentity<'static>,
292}
293
294impl SessionConfig {
295 /// Creates a [`SessionConfig`] with default configuration
296 /// # Default configuration
297 /// * Compression: None
298 /// * Load balancing policy: Token-aware Round-robin
299 ///
300 /// # Example
301 /// ```
302 /// # use scylla::client::session::SessionConfig;
303 /// let config = SessionConfig::new();
304 /// ```
305 pub fn new() -> Self {
306 SessionConfig {
307 known_nodes: Vec::new(),
308 local_ip_address: None,
309 shard_aware_local_port_range: ShardAwarePortRange::EPHEMERAL_PORT_RANGE,
310 compression: None,
311 tcp_nodelay: true,
312 tcp_keepalive_interval: None,
313 schema_agreement_interval: Duration::from_millis(200),
314 default_execution_profile_handle: ExecutionProfile::new_from_inner(Default::default())
315 .into_handle(),
316 used_keyspace: None,
317 keyspace_case_sensitive: false,
318 tls_context: None,
319 authenticator: None,
320 connect_timeout: Duration::from_secs(5),
321 connection_pool_size: Default::default(),
322 disallow_shard_aware_port: false,
323 timestamp_generator: None,
324 keyspaces_to_fetch: Vec::new(),
325 fetch_schema_metadata: true,
326 metadata_request_serverside_timeout: Some(Duration::from_secs(2)),
327 keepalive_interval: Some(Duration::from_secs(30)),
328 keepalive_timeout: Some(Duration::from_secs(30)),
329 schema_agreement_timeout: Duration::from_secs(60),
330 schema_agreement_automatic_waiting: true,
331 address_translator: None,
332 host_filter: None,
333 refresh_metadata_on_auto_schema_agreement: true,
334 #[cfg(feature = "unstable-cloud")]
335 cloud_config: None,
336 enable_write_coalescing: true,
337 write_coalescing_delay: WriteCoalescingDelay::SmallNondeterministic,
338 tracing_info_fetch_attempts: NonZeroU32::new(10).unwrap(),
339 tracing_info_fetch_interval: Duration::from_millis(3),
340 tracing_info_fetch_consistency: Consistency::One,
341 cluster_metadata_refresh_interval: Duration::from_secs(60),
342 identity: SelfIdentity::default(),
343 }
344 }
345
346 /// Adds a known database server with a hostname.
347 /// If the port is not explicitly specified, 9042 is used as default
348 /// # Example
349 /// ```
350 /// # use scylla::client::session::SessionConfig;
351 /// let mut config = SessionConfig::new();
352 /// config.add_known_node("127.0.0.1");
353 /// config.add_known_node("db1.example.com:9042");
354 /// ```
355 pub fn add_known_node(&mut self, hostname: impl AsRef<str>) {
356 self.known_nodes
357 .push(KnownNode::Hostname(hostname.as_ref().to_string()));
358 }
359
360 /// Adds a known database server with an IP address
361 /// # Example
362 /// ```
363 /// # use scylla::client::session::SessionConfig;
364 /// # use std::net::{SocketAddr, IpAddr, Ipv4Addr};
365 /// let mut config = SessionConfig::new();
366 /// config.add_known_node_addr(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9042));
367 /// ```
368 pub fn add_known_node_addr(&mut self, node_addr: SocketAddr) {
369 self.known_nodes.push(KnownNode::Address(node_addr));
370 }
371
372 /// Adds a list of known database server with hostnames.
373 /// If the port is not explicitly specified, 9042 is used as default
374 /// # Example
375 /// ```
376 /// # use scylla::client::session::SessionConfig;
377 /// # use std::net::{SocketAddr, IpAddr, Ipv4Addr};
378 /// let mut config = SessionConfig::new();
379 /// config.add_known_nodes(&["127.0.0.1:9042", "db1.example.com"]);
380 /// ```
381 pub fn add_known_nodes(&mut self, hostnames: impl IntoIterator<Item = impl AsRef<str>>) {
382 for hostname in hostnames {
383 self.add_known_node(hostname);
384 }
385 }
386
387 /// Adds a list of known database servers with IP addresses
388 /// # Example
389 /// ```
390 /// # use scylla::client::session::SessionConfig;
391 /// # use std::net::{SocketAddr, IpAddr, Ipv4Addr};
392 /// let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 17, 0, 3)), 9042);
393 /// let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 17, 0, 4)), 9042);
394 ///
395 /// let mut config = SessionConfig::new();
396 /// config.add_known_nodes_addr(&[addr1, addr2]);
397 /// ```
398 pub fn add_known_nodes_addr(
399 &mut self,
400 node_addrs: impl IntoIterator<Item = impl Borrow<SocketAddr>>,
401 ) {
402 for address in node_addrs {
403 self.add_known_node_addr(*address.borrow());
404 }
405 }
406}
407
408/// Creates default [`SessionConfig`], same as [`SessionConfig::new`]
409impl Default for SessionConfig {
410 fn default() -> Self {
411 Self::new()
412 }
413}
414
415pub(crate) enum RunRequestResult<ResT> {
416 IgnoredWriteError,
417 Completed(ResT),
418}
419
420impl Session {
421 /// Sends a request to the database and receives a response.\
422 /// Executes an unprepared CQL statement without paging, i.e. all results are received in a single response.
423 ///
424 /// This is the easiest way to execute a CQL statement, but performance is worse than that of prepared statements.
425 ///
426 /// It is discouraged to use this method with non-empty values argument ([`SerializeRow::is_empty()`]
427 /// trait method returns false). In such case, statement first needs to be prepared (on a single connection), so
428 /// driver will perform 2 round trips instead of 1. Please use [`Session::execute_unpaged()`] instead.
429 ///
430 /// As all results come in one response (no paging is done!), the memory footprint and latency may be huge
431 /// for statements returning rows (i.e. SELECTs)! Prefer this method for non-SELECTs, and for SELECTs
432 /// it is best to use paged requests:
433 /// - to receive multiple pages and transparently iterate through them, use [query_iter](Session::query_iter).
434 /// - to manually receive multiple pages and iterate through them, use [query_single_page](Session::query_single_page).
435 ///
436 /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/unprepared.html) for more information
437 /// # Arguments
438 /// * `statement` - statement to be executed, can be just a `&str` or the [`Statement`] struct.
439 /// * `values` - values bound to the statement, the easiest way is to use a tuple of bound values.
440 ///
441 /// # Examples
442 /// ```rust
443 /// # use scylla::client::session::Session;
444 /// # use std::error::Error;
445 /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
446 /// // Insert an int and text into a table.
447 /// session
448 /// .query_unpaged(
449 /// "INSERT INTO ks.tab (a, b) VALUES(?, ?)",
450 /// (2_i32, "some text")
451 /// )
452 /// .await?;
453 /// # Ok(())
454 /// # }
455 /// ```
456 /// ```rust
457 /// # use scylla::client::session::Session;
458 /// # use std::error::Error;
459 /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
460 ///
461 /// // Read rows containing an int and text.
462 /// // Keep in mind that all results come in one response (no paging is done!),
463 /// // so the memory footprint and latency may be huge!
464 /// // To prevent that, use `Session::query_iter` or `Session::query_single_page`.
465 /// let query_rows = session
466 /// .query_unpaged("SELECT a, b FROM ks.tab", &[])
467 /// .await?
468 /// .into_rows_result()?;
469 ///
470 /// for row in query_rows.rows()? {
471 /// // Parse row as int and text.
472 /// let (int_val, text_val): (i32, &str) = row?;
473 /// }
474 /// # Ok(())
475 /// # }
476 /// ```
477 pub async fn query_unpaged(
478 &self,
479 statement: impl Into<Statement>,
480 values: impl SerializeRow,
481 ) -> Result<QueryResult, ExecutionError> {
482 self.do_query_unpaged(&statement.into(), values).await
483 }
484
485 /// Queries a single page from the database, optionally continuing from a saved point.
486 ///
487 /// It is discouraged to use this method with non-empty values argument ([`SerializeRow::is_empty()`]
488 /// trait method returns false). In such case, CQL statement first needs to be prepared (on a single connection), so
489 /// driver will perform 2 round trips instead of 1. Please use [`Session::execute_single_page()`] instead.
490 ///
491 /// # Arguments
492 ///
493 /// * `statement` - statement to be executed
494 /// * `values` - values bound to the statement
495 /// * `paging_state` - previously received paging state or [PagingState::start()]
496 ///
497 /// # Example
498 ///
499 /// ```rust
500 /// # use scylla::client::session::Session;
501 /// # use std::error::Error;
502 /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
503 /// use std::ops::ControlFlow;
504 /// use scylla::response::PagingState;
505 ///
506 /// // Manual paging in a loop, unprepared statement.
507 /// let mut paging_state = PagingState::start();
508 /// loop {
509 /// let (res, paging_state_response) = session
510 /// .query_single_page("SELECT a, b, c FROM ks.tbl", &[], paging_state)
511 /// .await?;
512 ///
513 /// // Do something with a single page of results.
514 /// for row in res
515 /// .into_rows_result()?
516 /// .rows::<(i32, &str)>()?
517 /// {
518 /// let (a, b) = row?;
519 /// }
520 ///
521 /// match paging_state_response.into_paging_control_flow() {
522 /// ControlFlow::Break(()) => {
523 /// // No more pages to be fetched.
524 /// break;
525 /// }
526 /// ControlFlow::Continue(new_paging_state) => {
527 /// // Update paging state from the response, so that query
528 /// // will be resumed from where it ended the last time.
529 /// paging_state = new_paging_state;
530 /// }
531 /// }
532 /// }
533 /// # Ok(())
534 /// # }
535 /// ```
536 pub async fn query_single_page(
537 &self,
538 statement: impl Into<Statement>,
539 values: impl SerializeRow,
540 paging_state: PagingState,
541 ) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
542 self.do_query_single_page(&statement.into(), values, paging_state)
543 .await
544 }
545
546 /// Execute an unprepared CQL statement with paging\
547 /// This method will query all pages of the result\
548 ///
549 /// Returns an async iterator (stream) over all received rows\
550 /// Page size can be specified in the [`Statement`] passed to the function
551 ///
552 /// It is discouraged to use this method with non-empty values argument ([`SerializeRow::is_empty()`]
553 /// trait method returns false). In such case, statement first needs to be prepared (on a single connection), so
554 /// driver will initially perform 2 round trips instead of 1. Please use [`Session::execute_iter()`] instead.
555 ///
556 /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/paged.html) for more information.
557 ///
558 /// # Arguments
559 /// * `statement` - statement to be executed, can be just a `&str` or the [`Statement`] struct.
560 /// * `values` - values bound to the statement, the easiest way is to use a tuple of bound values.
561 ///
562 /// # Example
563 ///
564 /// ```rust
565 /// # use scylla::client::session::Session;
566 /// # use std::error::Error;
567 /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
568 /// use futures::stream::StreamExt;
569 ///
570 /// let mut rows_stream = session
571 /// .query_iter("SELECT a, b FROM ks.t", &[])
572 /// .await?
573 /// .rows_stream::<(i32, i32)>()?;
574 ///
575 /// while let Some(next_row_res) = rows_stream.next().await {
576 /// let (a, b): (i32, i32) = next_row_res?;
577 /// println!("a, b: {}, {}", a, b);
578 /// }
579 /// # Ok(())
580 /// # }
581 /// ```
582 pub async fn query_iter(
583 &self,
584 statement: impl Into<Statement>,
585 values: impl SerializeRow,
586 ) -> Result<QueryPager, PagerExecutionError> {
587 self.do_query_iter(statement.into(), values).await
588 }
589
590 /// Execute a prepared statement. Requires a [PreparedStatement]
591 /// generated using [`Session::prepare`](Session::prepare).\
592 /// Performs an unpaged request, i.e. all results are received in a single response.
593 ///
594 /// As all results come in one response (no paging is done!), the memory footprint and latency may be huge
595 /// for statements returning rows (i.e. SELECTs)! Prefer this method for non-SELECTs, and for SELECTs
596 /// it is best to use paged requests:
597 /// - to receive multiple pages and transparently iterate through them, use [execute_iter](Session::execute_iter).
598 /// - to manually receive multiple pages and iterate through them, use [execute_single_page](Session::execute_single_page).
599 ///
600 /// Prepared statements are much faster than unprepared statements:
601 /// * Database doesn't need to parse the statement string upon each execution (only once)
602 /// * They are properly load balanced using token aware routing
603 ///
604 /// > ***Warning***\
605 /// > For token/shard aware load balancing to work properly, all partition key values
606 /// > must be sent as bound values
607 /// > (see [performance section](https://rust-driver.docs.scylladb.com/stable/statements/prepared.html#performance)).
608 ///
609 /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/prepared.html) for more information.
610 ///
611 /// # Arguments
612 /// * `prepared` - the prepared statement to execute, generated using [`Session::prepare`](Session::prepare)
613 /// * `values` - values bound to the statement, the easiest way is to use a tuple of bound values
614 ///
615 /// # Example
616 /// ```rust
617 /// # use scylla::client::session::Session;
618 /// # use std::error::Error;
619 /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
620 /// use scylla::statement::prepared::PreparedStatement;
621 ///
622 /// // Prepare the statement for later execution
623 /// let prepared: PreparedStatement = session
624 /// .prepare("INSERT INTO ks.tab (a) VALUES(?)")
625 /// .await?;
626 ///
627 /// // Execute the prepared statement with some values, just like an unprepared statement.
628 /// let to_insert: i32 = 12345;
629 /// session.execute_unpaged(&prepared, (to_insert,)).await?;
630 /// # Ok(())
631 /// # }
632 /// ```
633 pub async fn execute_unpaged(
634 &self,
635 prepared: &PreparedStatement,
636 values: impl SerializeRow,
637 ) -> Result<QueryResult, ExecutionError> {
638 self.do_execute_unpaged(prepared, values).await
639 }
640
641 /// Executes a prepared statement, restricting results to single page.
642 /// Optionally continues fetching results from a saved point.
643 ///
644 /// # Arguments
645 ///
646 /// * `prepared` - a statement prepared with [prepare](crate::client::session::Session::prepare)
647 /// * `values` - values bound to the statement
648 /// * `paging_state` - continuation based on a paging state received from a previous paged query or None
649 ///
650 /// # Example
651 ///
652 /// ```rust
653 /// # use scylla::client::session::Session;
654 /// # use std::error::Error;
655 /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
656 /// use std::ops::ControlFlow;
657 /// use scylla::statement::unprepared::Statement;
658 /// use scylla::response::{PagingState, PagingStateResponse};
659 ///
660 /// let paged_prepared = session
661 /// .prepare(
662 /// Statement::new("SELECT a, b FROM ks.tbl")
663 /// .with_page_size(100.try_into().unwrap()),
664 /// )
665 /// .await?;
666 ///
667 /// // Manual paging in a loop, prepared statement.
668 /// let mut paging_state = PagingState::start();
669 /// loop {
670 /// let (res, paging_state_response) = session
671 /// .execute_single_page(&paged_prepared, &[], paging_state)
672 /// .await?;
673 ///
674 /// // Do something with a single page of results.
675 /// for row in res
676 /// .into_rows_result()?
677 /// .rows::<(i32, &str)>()?
678 /// {
679 /// let (a, b) = row?;
680 /// }
681 ///
682 /// match paging_state_response.into_paging_control_flow() {
683 /// ControlFlow::Break(()) => {
684 /// // No more pages to be fetched.
685 /// break;
686 /// }
687 /// ControlFlow::Continue(new_paging_state) => {
688 /// // Update paging continuation from the paging state, so that query
689 /// // will be resumed from where it ended the last time.
690 /// paging_state = new_paging_state;
691 /// }
692 /// }
693 /// }
694 /// # Ok(())
695 /// # }
696 /// ```
697 pub async fn execute_single_page(
698 &self,
699 prepared: &PreparedStatement,
700 values: impl SerializeRow,
701 paging_state: PagingState,
702 ) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
703 self.do_execute_single_page(prepared, values, paging_state)
704 .await
705 }
706
707 /// Execute a prepared statement with paging.\
708 /// This method will query all pages of the result.\
709 ///
710 /// Returns an async iterator (stream) over all received rows.\
711 /// Page size can be specified in the [PreparedStatement] passed to the function.
712 ///
713 /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/paged.html) for more information.
714 ///
715 /// # Arguments
716 /// * `prepared` - the prepared statement to execute, generated using [`Session::prepare`](Session::prepare)
717 /// * `values` - values bound to the statement, the easiest way is to use a tuple of bound values
718 ///
719 /// # Example
720 ///
721 /// ```rust
722 /// # use scylla::client::session::Session;
723 /// # use futures::StreamExt as _;
724 /// # use std::error::Error;
725 /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
726 /// use scylla::statement::prepared::PreparedStatement;
727 ///
728 /// // Prepare the statement for later execution
729 /// let prepared: PreparedStatement = session
730 /// .prepare("SELECT a, b FROM ks.t")
731 /// .await?;
732 ///
733 /// // Execute the statement and receive all pages
734 /// let mut rows_stream = session
735 /// .execute_iter(prepared, &[])
736 /// .await?
737 /// .rows_stream::<(i32, i32)>()?;
738 ///
739 /// while let Some(next_row_res) = rows_stream.next().await {
740 /// let (a, b): (i32, i32) = next_row_res?;
741 /// println!("a, b: {}, {}", a, b);
742 /// }
743 /// # Ok(())
744 /// # }
745 /// ```
746 pub async fn execute_iter(
747 &self,
748 prepared: impl Into<PreparedStatement>,
749 values: impl SerializeRow,
750 ) -> Result<QueryPager, PagerExecutionError> {
751 self.do_execute_iter(prepared.into(), values).await
752 }
753
754 /// Execute a batch statement\
755 /// Batch contains many `unprepared` or `prepared` statements which are executed at once\
756 /// Batch doesn't return any rows.
757 ///
758 /// Batch values must contain values for each of the statements.
759 ///
760 /// Avoid using non-empty values ([`SerializeRow::is_empty()`] return false) for unprepared statements
761 /// inside the batch. Such statements will first need to be prepared, so the driver will need to
762 /// send (numer_of_unprepared_statements_with_values + 1) requests instead of 1 request, severly
763 /// affecting performance.
764 ///
765 /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/batch.html) for more information
766 ///
767 /// # Arguments
768 /// * `batch` - [Batch] to be performed
769 /// * `values` - List of values for each statement, it's the easiest to use a tuple of tuples
770 ///
771 /// # Example
772 /// ```rust
773 /// # use scylla::client::session::Session;
774 /// # use std::error::Error;
775 /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
776 /// use scylla::statement::batch::Batch;
777 ///
778 /// let mut batch: Batch = Default::default();
779 ///
780 /// // A statement with two bound values
781 /// batch.append_statement("INSERT INTO ks.tab(a, b) VALUES(?, ?)");
782 ///
783 /// // A statement with one bound value
784 /// batch.append_statement("INSERT INTO ks.tab(a, b) VALUES(3, ?)");
785 ///
786 /// // A statement with no bound values
787 /// batch.append_statement("INSERT INTO ks.tab(a, b) VALUES(5, 6)");
788 ///
789 /// // Batch values is a tuple of 3 tuples containing values for each statement
790 /// let batch_values = ((1_i32, 2_i32), // Tuple with two values for the first statement
791 /// (4_i32,), // Tuple with one value for the second statement
792 /// ()); // Empty tuple/unit for the third statement
793 ///
794 /// // Run the batch
795 /// session.batch(&batch, batch_values).await?;
796 /// # Ok(())
797 /// # }
798 /// ```
799 pub async fn batch(
800 &self,
801 batch: &Batch,
802 values: impl BatchValues,
803 ) -> Result<QueryResult, ExecutionError> {
804 self.do_batch(batch, values).await
805 }
806}
807
808/// Represents a CQL session, which can be used to communicate
809/// with the database
810impl Session {
811 /// Estabilishes a CQL session with the database
812 ///
813 /// Usually it's easier to use [SessionBuilder](crate::client::session_builder::SessionBuilder)
814 /// instead of calling `Session::connect` directly, because it's more convenient.
815 /// # Arguments
816 /// * `config` - Connection configuration - known nodes, Compression, etc.
817 /// Must contain at least one known node.
818 ///
819 /// # Example
820 /// ```rust
821 /// # use std::error::Error;
822 /// # async fn check_only_compiles() -> Result<(), Box<dyn Error>> {
823 /// use scylla::client::session::{Session, SessionConfig};
824 /// use scylla::cluster::KnownNode;
825 ///
826 /// let mut config = SessionConfig::new();
827 /// config.known_nodes.push(KnownNode::Hostname("127.0.0.1:9042".to_string()));
828 ///
829 /// let session: Session = Session::connect(config).await?;
830 /// # Ok(())
831 /// # }
832 /// ```
833 pub async fn connect(config: SessionConfig) -> Result<Self, NewSessionError> {
834 let known_nodes = config.known_nodes;
835
836 #[cfg(feature = "unstable-cloud")]
837 let cloud_known_nodes: Option<Vec<InternalKnownNode>> =
838 if let Some(ref cloud_config) = config.cloud_config {
839 let cloud_servers = cloud_config
840 .get_datacenters()
841 .iter()
842 .map(|(dc_name, dc_data)| {
843 InternalKnownNode::CloudEndpoint(CloudEndpoint {
844 hostname: dc_data.get_server().to_owned(),
845 datacenter: dc_name.clone(),
846 })
847 })
848 .collect();
849 Some(cloud_servers)
850 } else {
851 None
852 };
853
854 #[cfg(not(feature = "unstable-cloud"))]
855 let cloud_known_nodes: Option<Vec<InternalKnownNode>> = None;
856
857 #[allow(clippy::unnecessary_literal_unwrap)]
858 let known_nodes = cloud_known_nodes
859 .unwrap_or_else(|| known_nodes.into_iter().map(|node| node.into()).collect());
860
861 // Ensure there is at least one known node
862 if known_nodes.is_empty() {
863 return Err(NewSessionError::EmptyKnownNodesList);
864 }
865
866 let (tablet_sender, tablet_receiver) = tokio::sync::mpsc::channel(TABLET_CHANNEL_SIZE);
867
868 #[allow(unused_labels)] // Triggers when `cloud` feature is disabled.
869 let address_translator = 'translator: {
870 #[cfg(feature = "unstable-cloud")]
871 if let Some(translator) = config.cloud_config.clone() {
872 if config.address_translator.is_some() {
873 // This can only happen if the user builds SessionConfig by hand, as SessionBuilder in cloud mode prevents setting custom AddressTranslator.
874 warn!(
875 "Overriding user-provided AddressTranslator with Scylla Cloud AddressTranslator due \
876 to CloudConfig being provided. This is certainly an API misuse - Cloud \
877 may not be combined with user's own AddressTranslator."
878 )
879 }
880
881 break 'translator Some(translator as Arc<dyn AddressTranslator>);
882 }
883
884 config.address_translator
885 };
886
887 let tls_provider = 'provider: {
888 #[cfg(feature = "unstable-cloud")]
889 if let Some(cloud_config) = config.cloud_config {
890 if config.tls_context.is_some() {
891 // This can only happen if the user builds SessionConfig by hand, as SessionBuilder in cloud mode prevents setting custom TlsContext.
892 warn!(
893 "Overriding user-provided TlsContext with Scylla Cloud TlsContext due \
894 to CloudConfig being provided. This is certainly an API misuse - Cloud \
895 may not be combined with user's own TLS config."
896 )
897 }
898
899 let provider = TlsProvider::new_cloud(cloud_config);
900 break 'provider Some(provider);
901 }
902 if let Some(tls_context) = config.tls_context {
903 // To silence warnings when TlsContext is an empty enum (tls features are disabled).
904 // In such case, TlsProvider is uninhabited.
905 #[allow(unused_variables)]
906 let provider = TlsProvider::new_with_global_context(tls_context);
907 #[allow(unreachable_code)]
908 break 'provider Some(provider);
909 }
910 None
911 };
912
913 let connection_config = ConnectionConfig {
914 local_ip_address: config.local_ip_address,
915 shard_aware_local_port_range: config.shard_aware_local_port_range,
916 compression: config.compression,
917 tcp_nodelay: config.tcp_nodelay,
918 tcp_keepalive_interval: config.tcp_keepalive_interval,
919 timestamp_generator: config.timestamp_generator,
920 tls_provider,
921 authenticator: config.authenticator,
922 connect_timeout: config.connect_timeout,
923 event_sender: None,
924 default_consistency: Default::default(),
925 address_translator,
926 write_coalescing_delay: config
927 .enable_write_coalescing
928 .then_some(config.write_coalescing_delay),
929 keepalive_interval: config.keepalive_interval,
930 keepalive_timeout: config.keepalive_timeout,
931 tablet_sender: Some(tablet_sender),
932 identity: config.identity,
933 };
934
935 let pool_config = PoolConfig {
936 connection_config,
937 pool_size: config.connection_pool_size,
938 can_use_shard_aware_port: !config.disallow_shard_aware_port,
939 };
940
941 #[cfg(feature = "metrics")]
942 let metrics = Arc::new(Metrics::new());
943
944 let cluster = Cluster::new(
945 known_nodes,
946 pool_config,
947 config.keyspaces_to_fetch,
948 config.fetch_schema_metadata,
949 config.metadata_request_serverside_timeout,
950 config.host_filter,
951 config.cluster_metadata_refresh_interval,
952 tablet_receiver,
953 #[cfg(feature = "metrics")]
954 Arc::clone(&metrics),
955 )
956 .await?;
957
958 let default_execution_profile_handle = config.default_execution_profile_handle;
959
960 let session = Self {
961 cluster,
962 default_execution_profile_handle,
963 schema_agreement_interval: config.schema_agreement_interval,
964 #[cfg(feature = "metrics")]
965 metrics,
966 schema_agreement_timeout: config.schema_agreement_timeout,
967 schema_agreement_automatic_waiting: config.schema_agreement_automatic_waiting,
968 refresh_metadata_on_auto_schema_agreement: config
969 .refresh_metadata_on_auto_schema_agreement,
970 keyspace_name: Arc::new(ArcSwapOption::default()), // will be set by use_keyspace
971 tracing_info_fetch_attempts: config.tracing_info_fetch_attempts,
972 tracing_info_fetch_interval: config.tracing_info_fetch_interval,
973 tracing_info_fetch_consistency: config.tracing_info_fetch_consistency,
974 };
975
976 if let Some(keyspace_name) = config.used_keyspace {
977 session
978 .use_keyspace(keyspace_name, config.keyspace_case_sensitive)
979 .await?;
980 }
981
982 Ok(session)
983 }
984
985 async fn do_query_unpaged(
986 &self,
987 statement: &Statement,
988 values: impl SerializeRow,
989 ) -> Result<QueryResult, ExecutionError> {
990 let (result, paging_state_response) = self
991 .query(statement, values, None, PagingState::start())
992 .await?;
993 if !paging_state_response.finished() {
994 error!("Unpaged unprepared query returned a non-empty paging state! This is a driver-side or server-side bug.");
995 return Err(ExecutionError::LastAttemptError(
996 RequestAttemptError::NonfinishedPagingState,
997 ));
998 }
999 Ok(result)
1000 }
1001
1002 async fn do_query_single_page(
1003 &self,
1004 statement: &Statement,
1005 values: impl SerializeRow,
1006 paging_state: PagingState,
1007 ) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1008 self.query(
1009 statement,
1010 values,
1011 Some(statement.get_validated_page_size()),
1012 paging_state,
1013 )
1014 .await
1015 }
1016
1017 /// Sends a request to the database.
1018 /// Optionally continues fetching results from a saved point.
1019 ///
1020 /// This is now an internal method only.
1021 ///
1022 /// Tl;dr: use [Session::query_unpaged], [Session::query_single_page] or [Session::query_iter] instead.
1023 ///
1024 /// The rationale is that we believe that paging is so important concept (and it has shown to be error-prone as well)
1025 /// that we need to require users to make a conscious decision to use paging or not. For that, we expose
1026 /// the aforementioned 3 methods clearly differing in naming and API, so that no unconscious choices about paging
1027 /// should be made.
1028 async fn query(
1029 &self,
1030 statement: &Statement,
1031 values: impl SerializeRow,
1032 page_size: Option<PageSize>,
1033 paging_state: PagingState,
1034 ) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1035 let execution_profile = statement
1036 .get_execution_profile_handle()
1037 .unwrap_or_else(|| self.get_default_execution_profile_handle())
1038 .access();
1039
1040 let statement_info = RoutingInfo {
1041 consistency: statement
1042 .config
1043 .consistency
1044 .unwrap_or(execution_profile.consistency),
1045 serial_consistency: statement
1046 .config
1047 .serial_consistency
1048 .unwrap_or(execution_profile.serial_consistency),
1049 ..Default::default()
1050 };
1051
1052 let span = RequestSpan::new_query(&statement.contents);
1053 let span_ref = &span;
1054 let run_request_result = self
1055 .run_request(
1056 statement_info,
1057 &statement.config,
1058 execution_profile,
1059 |connection: Arc<Connection>,
1060 consistency: Consistency,
1061 execution_profile: &ExecutionProfileInner| {
1062 let serial_consistency = statement
1063 .config
1064 .serial_consistency
1065 .unwrap_or(execution_profile.serial_consistency);
1066 // Needed to avoid moving query and values into async move block
1067 let statement_ref = &statement;
1068 let values_ref = &values;
1069 let paging_state_ref = &paging_state;
1070 async move {
1071 if values_ref.is_empty() {
1072 span_ref.record_request_size(0);
1073 connection
1074 .query_raw_with_consistency(
1075 statement_ref,
1076 consistency,
1077 serial_consistency,
1078 page_size,
1079 paging_state_ref.clone(),
1080 )
1081 .await
1082 .and_then(QueryResponse::into_non_error_query_response)
1083 } else {
1084 let prepared = connection.prepare(statement_ref).await?;
1085 let serialized = prepared.serialize_values(values_ref)?;
1086 span_ref.record_request_size(serialized.buffer_size());
1087 connection
1088 .execute_raw_with_consistency(
1089 &prepared,
1090 &serialized,
1091 consistency,
1092 serial_consistency,
1093 page_size,
1094 paging_state_ref.clone(),
1095 )
1096 .await
1097 .and_then(QueryResponse::into_non_error_query_response)
1098 }
1099 }
1100 },
1101 &span,
1102 )
1103 .instrument(span.span().clone())
1104 .await?;
1105
1106 let response = match run_request_result {
1107 RunRequestResult::IgnoredWriteError => NonErrorQueryResponse {
1108 response: NonErrorResponse::Result(result::Result::Void),
1109 tracing_id: None,
1110 warnings: Vec::new(),
1111 },
1112 RunRequestResult::Completed(response) => response,
1113 };
1114
1115 self.handle_set_keyspace_response(&response).await?;
1116 self.handle_auto_await_schema_agreement(&response).await?;
1117
1118 let (result, paging_state_response) = response.into_query_result_and_paging_state()?;
1119 span.record_result_fields(&result);
1120
1121 Ok((result, paging_state_response))
1122 }
1123
1124 async fn handle_set_keyspace_response(
1125 &self,
1126 response: &NonErrorQueryResponse,
1127 ) -> Result<(), UseKeyspaceError> {
1128 if let Some(set_keyspace) = response.as_set_keyspace() {
1129 debug!(
1130 "Detected USE KEYSPACE query, setting session's keyspace to {}",
1131 set_keyspace.keyspace_name
1132 );
1133 self.use_keyspace(set_keyspace.keyspace_name.clone(), true)
1134 .await?;
1135 }
1136
1137 Ok(())
1138 }
1139
1140 async fn handle_auto_await_schema_agreement(
1141 &self,
1142 response: &NonErrorQueryResponse,
1143 ) -> Result<(), ExecutionError> {
1144 if self.schema_agreement_automatic_waiting {
1145 if response.as_schema_change().is_some() {
1146 self.await_schema_agreement().await?;
1147 }
1148
1149 if self.refresh_metadata_on_auto_schema_agreement
1150 && response.as_schema_change().is_some()
1151 {
1152 self.refresh_metadata().await?;
1153 }
1154 }
1155
1156 Ok(())
1157 }
1158
1159 async fn do_query_iter(
1160 &self,
1161 statement: Statement,
1162 values: impl SerializeRow,
1163 ) -> Result<QueryPager, PagerExecutionError> {
1164 let execution_profile = statement
1165 .get_execution_profile_handle()
1166 .unwrap_or_else(|| self.get_default_execution_profile_handle())
1167 .access();
1168
1169 if values.is_empty() {
1170 QueryPager::new_for_query(
1171 statement,
1172 execution_profile,
1173 self.cluster.get_state(),
1174 #[cfg(feature = "metrics")]
1175 Arc::clone(&self.metrics),
1176 )
1177 .await
1178 .map_err(PagerExecutionError::NextPageError)
1179 } else {
1180 // Making QueryPager::new_for_query work with values is too hard (if even possible)
1181 // so instead of sending one prepare to a specific connection on each iterator query,
1182 // we fully prepare a statement beforehand.
1183 let prepared = self.prepare(statement).await?;
1184 let values = prepared.serialize_values(&values)?;
1185 QueryPager::new_for_prepared_statement(PreparedIteratorConfig {
1186 prepared,
1187 values,
1188 execution_profile,
1189 cluster_state: self.cluster.get_state(),
1190 #[cfg(feature = "metrics")]
1191 metrics: Arc::clone(&self.metrics),
1192 })
1193 .await
1194 .map_err(PagerExecutionError::NextPageError)
1195 }
1196 }
1197
1198 /// Prepares a statement on the server side and returns a prepared statement,
1199 /// which can later be used to perform more efficient requests.
1200 ///
1201 /// Prepared statements are much faster than unprepared statements:
1202 /// * Database doesn't need to parse the statement string upon each execution (only once)
1203 /// * They are properly load balanced using token aware routing
1204 ///
1205 /// > ***Warning***\
1206 /// > For token/shard aware load balancing to work properly, all partition key values
1207 /// > must be sent as bound values
1208 /// > (see [performance section](https://rust-driver.docs.scylladb.com/stable/statements/prepared.html#performance))
1209 ///
1210 /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/prepared.html) for more information.
1211 /// See the documentation of [`PreparedStatement`].
1212 ///
1213 /// # Arguments
1214 /// * `statement` - statement to prepare, can be just a `&str` or the [`Statement`] struct.
1215 ///
1216 /// # Example
1217 /// ```rust
1218 /// # use scylla::client::session::Session;
1219 /// # use std::error::Error;
1220 /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
1221 /// use scylla::statement::prepared::PreparedStatement;
1222 ///
1223 /// // Prepare the statement for later execution
1224 /// let prepared: PreparedStatement = session
1225 /// .prepare("INSERT INTO ks.tab (a) VALUES(?)")
1226 /// .await?;
1227 ///
1228 /// // Execute the prepared statement with some values, just like an unprepared statement.
1229 /// let to_insert: i32 = 12345;
1230 /// session.execute_unpaged(&prepared, (to_insert,)).await?;
1231 /// # Ok(())
1232 /// # }
1233 /// ```
1234 pub async fn prepare(
1235 &self,
1236 statement: impl Into<Statement>,
1237 ) -> Result<PreparedStatement, PrepareError> {
1238 let statement = statement.into();
1239 let statement_ref = &statement;
1240
1241 let cluster_state = self.get_cluster_state();
1242 let connections_iter = cluster_state.iter_working_connections()?;
1243
1244 // Prepare statements on all connections concurrently
1245 let handles = connections_iter.map(|c| async move { c.prepare(statement_ref).await });
1246 let mut results = join_all(handles).await.into_iter();
1247
1248 // If at least one prepare was successful, `prepare()` returns Ok.
1249 // Find the first result that is Ok, or Err if all failed.
1250
1251 // Safety: there is at least one node in the cluster, and `Cluster::iter_working_connections()`
1252 // returns either an error or an iterator with at least one connection, so there will be at least one result.
1253 let first_ok: Result<PreparedStatement, RequestAttemptError> =
1254 results.by_ref().find_or_first(Result::is_ok).unwrap();
1255 let mut prepared: PreparedStatement =
1256 first_ok.map_err(|first_attempt| PrepareError::AllAttemptsFailed { first_attempt })?;
1257
1258 // Validate prepared ids equality
1259 for statement in results.flatten() {
1260 if prepared.get_id() != statement.get_id() {
1261 return Err(PrepareError::PreparedStatementIdsMismatch);
1262 }
1263
1264 // Collect all tracing ids from prepare() queries in the final result
1265 prepared
1266 .prepare_tracing_ids
1267 .extend(statement.prepare_tracing_ids);
1268 }
1269
1270 prepared.set_partitioner_name(
1271 self.extract_partitioner_name(&prepared, &self.cluster.get_state())
1272 .and_then(PartitionerName::from_str)
1273 .unwrap_or_default(),
1274 );
1275
1276 Ok(prepared)
1277 }
1278
1279 fn extract_partitioner_name<'a>(
1280 &self,
1281 prepared: &PreparedStatement,
1282 cluster_state: &'a ClusterState,
1283 ) -> Option<&'a str> {
1284 let table_spec = prepared.get_table_spec()?;
1285 cluster_state
1286 .keyspaces
1287 .get(table_spec.ks_name())?
1288 .tables
1289 .get(table_spec.table_name())?
1290 .partitioner
1291 .as_deref()
1292 }
1293
1294 async fn do_execute_unpaged(
1295 &self,
1296 prepared: &PreparedStatement,
1297 values: impl SerializeRow,
1298 ) -> Result<QueryResult, ExecutionError> {
1299 let serialized_values = prepared.serialize_values(&values)?;
1300 let (result, paging_state) = self
1301 .execute(prepared, &serialized_values, None, PagingState::start())
1302 .await?;
1303 if !paging_state.finished() {
1304 error!("Unpaged prepared query returned a non-empty paging state! This is a driver-side or server-side bug.");
1305 return Err(ExecutionError::LastAttemptError(
1306 RequestAttemptError::NonfinishedPagingState,
1307 ));
1308 }
1309 Ok(result)
1310 }
1311
1312 async fn do_execute_single_page(
1313 &self,
1314 prepared: &PreparedStatement,
1315 values: impl SerializeRow,
1316 paging_state: PagingState,
1317 ) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1318 let serialized_values = prepared.serialize_values(&values)?;
1319 let page_size = prepared.get_validated_page_size();
1320 self.execute(prepared, &serialized_values, Some(page_size), paging_state)
1321 .await
1322 }
1323
1324 /// Sends a prepared request to the database, optionally continuing from a saved point.
1325 ///
1326 /// This is now an internal method only.
1327 ///
1328 /// Tl;dr: use [Session::execute_unpaged], [Session::execute_single_page] or [Session::execute_iter] instead.
1329 ///
1330 /// The rationale is that we believe that paging is so important concept (and it has shown to be error-prone as well)
1331 /// that we need to require users to make a conscious decision to use paging or not. For that, we expose
1332 /// the aforementioned 3 methods clearly differing in naming and API, so that no unconscious choices about paging
1333 /// should be made.
1334 async fn execute(
1335 &self,
1336 prepared: &PreparedStatement,
1337 serialized_values: &SerializedValues,
1338 page_size: Option<PageSize>,
1339 paging_state: PagingState,
1340 ) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1341 let values_ref = &serialized_values;
1342 let paging_state_ref = &paging_state;
1343
1344 let (partition_key, token) = prepared
1345 .extract_partition_key_and_calculate_token(prepared.get_partitioner_name(), values_ref)
1346 .map_err(PartitionKeyError::into_execution_error)?
1347 .unzip();
1348
1349 let execution_profile = prepared
1350 .get_execution_profile_handle()
1351 .unwrap_or_else(|| self.get_default_execution_profile_handle())
1352 .access();
1353
1354 let table_spec = prepared.get_table_spec();
1355
1356 let statement_info = RoutingInfo {
1357 consistency: prepared
1358 .config
1359 .consistency
1360 .unwrap_or(execution_profile.consistency),
1361 serial_consistency: prepared
1362 .config
1363 .serial_consistency
1364 .unwrap_or(execution_profile.serial_consistency),
1365 token,
1366 table: table_spec,
1367 is_confirmed_lwt: prepared.is_confirmed_lwt(),
1368 };
1369
1370 let span = RequestSpan::new_prepared(
1371 partition_key.as_ref().map(|pk| pk.iter()),
1372 token,
1373 serialized_values.buffer_size(),
1374 );
1375
1376 if !span.span().is_disabled() {
1377 if let (Some(table_spec), Some(token)) = (statement_info.table, token) {
1378 let cluster_state = self.get_cluster_state();
1379 let replicas = cluster_state.get_token_endpoints_iter(table_spec, token);
1380 span.record_replicas(replicas)
1381 }
1382 }
1383
1384 let run_request_result: RunRequestResult<NonErrorQueryResponse> = self
1385 .run_request(
1386 statement_info,
1387 &prepared.config,
1388 execution_profile,
1389 |connection: Arc<Connection>,
1390 consistency: Consistency,
1391 execution_profile: &ExecutionProfileInner| {
1392 let serial_consistency = prepared
1393 .config
1394 .serial_consistency
1395 .unwrap_or(execution_profile.serial_consistency);
1396 async move {
1397 connection
1398 .execute_raw_with_consistency(
1399 prepared,
1400 values_ref,
1401 consistency,
1402 serial_consistency,
1403 page_size,
1404 paging_state_ref.clone(),
1405 )
1406 .await
1407 .and_then(QueryResponse::into_non_error_query_response)
1408 }
1409 },
1410 &span,
1411 )
1412 .instrument(span.span().clone())
1413 .await?;
1414
1415 let response = match run_request_result {
1416 RunRequestResult::IgnoredWriteError => NonErrorQueryResponse {
1417 response: NonErrorResponse::Result(result::Result::Void),
1418 tracing_id: None,
1419 warnings: Vec::new(),
1420 },
1421 RunRequestResult::Completed(response) => response,
1422 };
1423
1424 self.handle_set_keyspace_response(&response).await?;
1425 self.handle_auto_await_schema_agreement(&response).await?;
1426
1427 let (result, paging_state_response) = response.into_query_result_and_paging_state()?;
1428 span.record_result_fields(&result);
1429
1430 Ok((result, paging_state_response))
1431 }
1432
1433 async fn do_execute_iter(
1434 &self,
1435 prepared: PreparedStatement,
1436 values: impl SerializeRow,
1437 ) -> Result<QueryPager, PagerExecutionError> {
1438 let serialized_values = prepared.serialize_values(&values)?;
1439
1440 let execution_profile = prepared
1441 .get_execution_profile_handle()
1442 .unwrap_or_else(|| self.get_default_execution_profile_handle())
1443 .access();
1444
1445 QueryPager::new_for_prepared_statement(PreparedIteratorConfig {
1446 prepared,
1447 values: serialized_values,
1448 execution_profile,
1449 cluster_state: self.cluster.get_state(),
1450 #[cfg(feature = "metrics")]
1451 metrics: Arc::clone(&self.metrics),
1452 })
1453 .await
1454 .map_err(PagerExecutionError::NextPageError)
1455 }
1456
1457 async fn do_batch(
1458 &self,
1459 batch: &Batch,
1460 values: impl BatchValues,
1461 ) -> Result<QueryResult, ExecutionError> {
1462 // Shard-awareness behavior for batch will be to pick shard based on first batch statement's shard
1463 // If users batch statements by shard, they will be rewarded with full shard awareness
1464
1465 // check to ensure that we don't send a batch statement with more than u16::MAX queries
1466 let batch_statements_length = batch.statements.len();
1467 if batch_statements_length > u16::MAX as usize {
1468 return Err(ExecutionError::BadQuery(
1469 BadQuery::TooManyQueriesInBatchStatement(batch_statements_length),
1470 ));
1471 }
1472
1473 let execution_profile = batch
1474 .get_execution_profile_handle()
1475 .unwrap_or_else(|| self.get_default_execution_profile_handle())
1476 .access();
1477
1478 let consistency = batch
1479 .config
1480 .consistency
1481 .unwrap_or(execution_profile.consistency);
1482
1483 let serial_consistency = batch
1484 .config
1485 .serial_consistency
1486 .unwrap_or(execution_profile.serial_consistency);
1487
1488 let (first_value_token, values) =
1489 batch_values::peek_first_token(values, batch.statements.first())?;
1490 let values_ref = &values;
1491
1492 let table_spec =
1493 if let Some(BatchStatement::PreparedStatement(ps)) = batch.statements.first() {
1494 ps.get_table_spec()
1495 } else {
1496 None
1497 };
1498
1499 let statement_info = RoutingInfo {
1500 consistency,
1501 serial_consistency,
1502 token: first_value_token,
1503 table: table_spec,
1504 is_confirmed_lwt: false,
1505 };
1506
1507 let span = RequestSpan::new_batch();
1508
1509 let run_request_result: RunRequestResult<NonErrorQueryResponse> = self
1510 .run_request(
1511 statement_info,
1512 &batch.config,
1513 execution_profile,
1514 |connection: Arc<Connection>,
1515 consistency: Consistency,
1516 execution_profile: &ExecutionProfileInner| {
1517 let serial_consistency = batch
1518 .config
1519 .serial_consistency
1520 .unwrap_or(execution_profile.serial_consistency);
1521 async move {
1522 connection
1523 .batch_with_consistency(
1524 batch,
1525 values_ref,
1526 consistency,
1527 serial_consistency,
1528 )
1529 .await
1530 .and_then(QueryResponse::into_non_error_query_response)
1531 }
1532 },
1533 &span,
1534 )
1535 .instrument(span.span().clone())
1536 .await?;
1537
1538 let result = match run_request_result {
1539 RunRequestResult::IgnoredWriteError => QueryResult::mock_empty(),
1540 RunRequestResult::Completed(non_error_query_response) => {
1541 let result = non_error_query_response.into_query_result()?;
1542 span.record_result_fields(&result);
1543 result
1544 }
1545 };
1546
1547 Ok(result)
1548 }
1549
1550 /// Prepares all statements within the batch and returns a new batch where every
1551 /// statement is prepared.
1552 /// /// # Example
1553 /// ```rust
1554 /// # extern crate scylla;
1555 /// # use scylla::client::session::Session;
1556 /// # use std::error::Error;
1557 /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
1558 /// use scylla::statement::batch::Batch;
1559 ///
1560 /// // Create a batch statement with unprepared statements
1561 /// let mut batch: Batch = Default::default();
1562 /// batch.append_statement("INSERT INTO ks.simple_unprepared1 VALUES(?, ?)");
1563 /// batch.append_statement("INSERT INTO ks.simple_unprepared2 VALUES(?, ?)");
1564 ///
1565 /// // Prepare all statements in the batch at once
1566 /// let prepared_batch: Batch = session.prepare_batch(&batch).await?;
1567 ///
1568 /// // Specify bound values to use with each statement
1569 /// let batch_values = ((1_i32, 2_i32),
1570 /// (3_i32, 4_i32));
1571 ///
1572 /// // Run the prepared batch
1573 /// session.batch(&prepared_batch, batch_values).await?;
1574 /// # Ok(())
1575 /// # }
1576 /// ```
1577 pub async fn prepare_batch(&self, batch: &Batch) -> Result<Batch, PrepareError> {
1578 let mut prepared_batch = batch.clone();
1579
1580 try_join_all(
1581 prepared_batch
1582 .statements
1583 .iter_mut()
1584 .map(|statement| async move {
1585 if let BatchStatement::Query(query) = statement {
1586 let prepared = self.prepare(query.clone()).await?;
1587 *statement = BatchStatement::PreparedStatement(prepared);
1588 }
1589 Ok::<(), PrepareError>(())
1590 }),
1591 )
1592 .await?;
1593
1594 Ok(prepared_batch)
1595 }
1596
1597 /// Sends `USE <keyspace_name>` request on all connections\
1598 /// This allows to write `SELECT * FROM table` instead of `SELECT * FROM keyspace.table`\
1599 ///
1600 /// Note that even failed `use_keyspace` can change currently used keyspace - the request is sent on all connections and
1601 /// can overwrite previously used keyspace.
1602 ///
1603 /// Call only one `use_keyspace` at a time.\
1604 /// Trying to do two `use_keyspace` requests simultaneously with different names
1605 /// can end with some connections using one keyspace and the rest using the other.
1606 ///
1607 /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/usekeyspace.html) for more information
1608 ///
1609 /// # Arguments
1610 ///
1611 /// * `keyspace_name` - keyspace name to use,
1612 /// keyspace names can have up to 48 alphanumeric characters and contain underscores
1613 /// * `case_sensitive` - if set to true the generated statement will put keyspace name in quotes
1614 /// # Example
1615 /// ```rust
1616 /// # use scylla::client::session::Session;
1617 /// # use scylla::client::session_builder::SessionBuilder;
1618 /// # use scylla::client::Compression;
1619 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1620 /// # let session = SessionBuilder::new().known_node("127.0.0.1:9042").build().await?;
1621 /// session
1622 /// .query_unpaged("INSERT INTO my_keyspace.tab (a) VALUES ('test1')", &[])
1623 /// .await?;
1624 ///
1625 /// session.use_keyspace("my_keyspace", false).await?;
1626 ///
1627 /// // Now we can omit keyspace name in the statement
1628 /// session
1629 /// .query_unpaged("INSERT INTO tab (a) VALUES ('test2')", &[])
1630 /// .await?;
1631 /// # Ok(())
1632 /// # }
1633 /// ```
1634 pub async fn use_keyspace(
1635 &self,
1636 keyspace_name: impl Into<String>,
1637 case_sensitive: bool,
1638 ) -> Result<(), UseKeyspaceError> {
1639 let keyspace_name = keyspace_name.into();
1640 self.keyspace_name
1641 .store(Some(Arc::new(keyspace_name.clone())));
1642
1643 // Trying to pass keyspace as bound value in "USE ?" doesn't work
1644 // So we have to create a string for query: "USE " + new_keyspace
1645 // To avoid any possible CQL injections it's good to verify that the name is valid
1646 let verified_ks_name = VerifiedKeyspaceName::new(keyspace_name, case_sensitive)?;
1647
1648 self.cluster.use_keyspace(verified_ks_name).await?;
1649
1650 Ok(())
1651 }
1652
1653 /// Manually trigger a metadata refresh\
1654 /// The driver will fetch current nodes in the cluster and update its metadata
1655 ///
1656 /// Normally this is not needed,
1657 /// the driver should automatically detect all metadata changes in the cluster
1658 pub async fn refresh_metadata(&self) -> Result<(), MetadataError> {
1659 self.cluster.refresh_metadata().await
1660 }
1661
1662 /// Access metrics collected by the driver\
1663 /// Driver collects various metrics like number of queries or query latencies.
1664 /// They can be read using this method
1665 #[cfg(feature = "metrics")]
1666 pub fn get_metrics(&self) -> Arc<Metrics> {
1667 Arc::clone(&self.metrics)
1668 }
1669
1670 /// Access cluster state visible by the driver.
1671 ///
1672 /// Driver collects various information about network topology or schema.
1673 /// It can be read using this method.
1674 pub fn get_cluster_state(&self) -> Arc<ClusterState> {
1675 self.cluster.get_state()
1676 }
1677
1678 /// Get [`TracingInfo`] of a traced query performed earlier
1679 ///
1680 /// See [the book](https://rust-driver.docs.scylladb.com/stable/tracing/tracing.html)
1681 /// for more information about query tracing
1682 pub async fn get_tracing_info(&self, tracing_id: &Uuid) -> Result<TracingInfo, TracingError> {
1683 // tracing_info_fetch_attempts is NonZeroU32 so at least one attempt will be made
1684 for _ in 0..self.tracing_info_fetch_attempts.get() {
1685 let current_try: Option<TracingInfo> = self
1686 .try_getting_tracing_info(tracing_id, Some(self.tracing_info_fetch_consistency))
1687 .await?;
1688
1689 match current_try {
1690 Some(tracing_info) => return Ok(tracing_info),
1691 None => tokio::time::sleep(self.tracing_info_fetch_interval).await,
1692 };
1693 }
1694
1695 Err(TracingError::EmptyResults)
1696 }
1697
1698 /// Gets the name of the keyspace that is currently set, or `None` if no
1699 /// keyspace was set.
1700 ///
1701 /// It will initially return the name of the keyspace that was set
1702 /// in the session configuration, but calling `use_keyspace` will update
1703 /// it.
1704 ///
1705 /// Note: the return value might be wrong if `use_keyspace` was called
1706 /// concurrently or it previously failed. It is also unspecified
1707 /// if `get_keyspace` is called concurrently with `use_keyspace`.
1708 #[inline]
1709 pub fn get_keyspace(&self) -> Option<Arc<String>> {
1710 self.keyspace_name.load_full()
1711 }
1712
1713 // Tries getting the tracing info
1714 // If the queries return 0 rows then returns None - the information didn't reach this node yet
1715 // If there is some other error returns this error
1716 async fn try_getting_tracing_info(
1717 &self,
1718 tracing_id: &Uuid,
1719 consistency: Option<Consistency>,
1720 ) -> Result<Option<TracingInfo>, TracingError> {
1721 // Query system_traces.sessions for TracingInfo
1722 let mut traces_session_query =
1723 Statement::new(crate::observability::tracing::TRACES_SESSION_QUERY_STR);
1724 traces_session_query.config.consistency = consistency;
1725 traces_session_query.set_page_size(TRACING_QUERY_PAGE_SIZE);
1726
1727 // Query system_traces.events for TracingEvents
1728 let mut traces_events_query =
1729 Statement::new(crate::observability::tracing::TRACES_EVENTS_QUERY_STR);
1730 traces_events_query.config.consistency = consistency;
1731 traces_events_query.set_page_size(TRACING_QUERY_PAGE_SIZE);
1732
1733 let (traces_session_res, traces_events_res) = tokio::try_join!(
1734 self.do_query_unpaged(&traces_session_query, (tracing_id,)),
1735 self.do_query_unpaged(&traces_events_query, (tracing_id,))
1736 )?;
1737
1738 // Get tracing info
1739 let maybe_tracing_info: Option<TracingInfo> = traces_session_res
1740 .into_rows_result()
1741 .map_err(TracingError::TracesSessionIntoRowsResultError)?
1742 .maybe_first_row()
1743 .map_err(|err| match err {
1744 MaybeFirstRowError::TypeCheckFailed(e) => {
1745 TracingError::TracesSessionInvalidColumnType(e)
1746 }
1747 MaybeFirstRowError::DeserializationFailed(e) => {
1748 TracingError::TracesSessionDeserializationFailed(e)
1749 }
1750 })?;
1751
1752 let mut tracing_info = match maybe_tracing_info {
1753 None => return Ok(None),
1754 Some(tracing_info) => tracing_info,
1755 };
1756
1757 // Get tracing events
1758 let tracing_event_rows_result = traces_events_res
1759 .into_rows_result()
1760 .map_err(TracingError::TracesEventsIntoRowsResultError)?;
1761 let tracing_event_rows = tracing_event_rows_result.rows().map_err(|err| match err {
1762 RowsError::TypeCheckFailed(err) => TracingError::TracesEventsInvalidColumnType(err),
1763 })?;
1764
1765 tracing_info.events = tracing_event_rows
1766 .collect::<Result<_, _>>()
1767 .map_err(TracingError::TracesEventsDeserializationFailed)?;
1768
1769 if tracing_info.events.is_empty() {
1770 return Ok(None);
1771 }
1772
1773 Ok(Some(tracing_info))
1774 }
1775
1776 /// This method allows to easily run a request using load balancing, retry policy etc.
1777 /// Requires some information about the request and a closure.
1778 /// The closure is used to execute the request once on a chosen connection.
1779 /// - query will use connection.query()
1780 /// - execute will use connection.execute()
1781 ///
1782 /// If this closure fails with some errors, retry policy is used to perform retries.
1783 /// On success, this request's result is returned.
1784 // I tried to make this closures take a reference instead of an Arc but failed
1785 // maybe once async closures get stabilized this can be fixed
1786 async fn run_request<'a, QueryFut, ResT>(
1787 &'a self,
1788 statement_info: RoutingInfo<'a>,
1789 statement_config: &'a StatementConfig,
1790 execution_profile: Arc<ExecutionProfileInner>,
1791 run_request_once: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
1792 request_span: &'a RequestSpan,
1793 ) -> Result<RunRequestResult<ResT>, ExecutionError>
1794 where
1795 QueryFut: Future<Output = Result<ResT, RequestAttemptError>>,
1796 ResT: AllowedRunRequestResTType,
1797 {
1798 let history_listener_and_id: Option<(&'a dyn HistoryListener, history::RequestId)> =
1799 statement_config
1800 .history_listener
1801 .as_ref()
1802 .map(|hl| (&**hl, hl.log_request_start()));
1803
1804 let load_balancer = &execution_profile.load_balancing_policy;
1805
1806 let runner = async {
1807 let cluster_state = self.cluster.get_state();
1808 let request_plan =
1809 load_balancing::Plan::new(load_balancer.as_ref(), &statement_info, &cluster_state);
1810
1811 // If a speculative execution policy is used to run request, request_plan has to be shared
1812 // between different async functions. This struct helps to wrap request_plan in mutex so it
1813 // can be shared safely.
1814 struct SharedPlan<'a, I>
1815 where
1816 I: Iterator<Item = (NodeRef<'a>, Shard)>,
1817 {
1818 iter: std::sync::Mutex<I>,
1819 }
1820
1821 impl<'a, I> Iterator for &SharedPlan<'a, I>
1822 where
1823 I: Iterator<Item = (NodeRef<'a>, Shard)>,
1824 {
1825 type Item = (NodeRef<'a>, Shard);
1826
1827 fn next(&mut self) -> Option<Self::Item> {
1828 self.iter.lock().unwrap().next()
1829 }
1830 }
1831
1832 let retry_policy = statement_config
1833 .retry_policy
1834 .as_deref()
1835 .unwrap_or(&*execution_profile.retry_policy);
1836
1837 let speculative_policy = execution_profile.speculative_execution_policy.as_ref();
1838
1839 match speculative_policy {
1840 Some(speculative) if statement_config.is_idempotent => {
1841 let shared_request_plan = SharedPlan {
1842 iter: std::sync::Mutex::new(request_plan),
1843 };
1844
1845 let request_runner_generator = |is_speculative: bool| {
1846 let history_data: Option<HistoryData> = history_listener_and_id
1847 .as_ref()
1848 .map(|(history_listener, request_id)| {
1849 let speculative_id: Option<history::SpeculativeId> =
1850 if is_speculative {
1851 Some(
1852 history_listener.log_new_speculative_fiber(*request_id),
1853 )
1854 } else {
1855 None
1856 };
1857 HistoryData {
1858 listener: *history_listener,
1859 request_id: *request_id,
1860 speculative_id,
1861 }
1862 });
1863
1864 if is_speculative {
1865 request_span.inc_speculative_executions();
1866 }
1867
1868 self.run_request_speculative_fiber(
1869 &shared_request_plan,
1870 &run_request_once,
1871 &execution_profile,
1872 ExecuteRequestContext {
1873 is_idempotent: statement_config.is_idempotent,
1874 consistency_set_on_statement: statement_config.consistency,
1875 retry_session: retry_policy.new_session(),
1876 history_data,
1877 query_info: &statement_info,
1878 request_span,
1879 },
1880 )
1881 };
1882
1883 let context = speculative_execution::Context {
1884 #[cfg(feature = "metrics")]
1885 metrics: Arc::clone(&self.metrics),
1886 };
1887
1888 speculative_execution::execute(
1889 speculative.as_ref(),
1890 &context,
1891 request_runner_generator,
1892 )
1893 .await
1894 }
1895 _ => {
1896 let history_data: Option<HistoryData> =
1897 history_listener_and_id
1898 .as_ref()
1899 .map(|(history_listener, request_id)| HistoryData {
1900 listener: *history_listener,
1901 request_id: *request_id,
1902 speculative_id: None,
1903 });
1904 self.run_request_speculative_fiber(
1905 request_plan,
1906 &run_request_once,
1907 &execution_profile,
1908 ExecuteRequestContext {
1909 is_idempotent: statement_config.is_idempotent,
1910 consistency_set_on_statement: statement_config.consistency,
1911 retry_session: retry_policy.new_session(),
1912 history_data,
1913 query_info: &statement_info,
1914 request_span,
1915 },
1916 )
1917 .await
1918 .unwrap_or(Err(RequestError::EmptyPlan))
1919 }
1920 }
1921 };
1922
1923 let effective_timeout = statement_config
1924 .request_timeout
1925 .or(execution_profile.request_timeout);
1926 let result = match effective_timeout {
1927 Some(timeout) => tokio::time::timeout(timeout, runner).await.unwrap_or_else(
1928 |_: tokio::time::error::Elapsed| {
1929 #[cfg(feature = "metrics")]
1930 self.metrics.inc_request_timeouts();
1931 Err(RequestError::RequestTimeout(timeout))
1932 },
1933 ),
1934 None => runner.await,
1935 };
1936
1937 if let Some((history_listener, request_id)) = history_listener_and_id {
1938 match &result {
1939 Ok(_) => history_listener.log_request_success(request_id),
1940 Err(e) => history_listener.log_request_error(request_id, e),
1941 }
1942 }
1943
1944 result.map_err(RequestError::into_execution_error)
1945 }
1946
1947 /// Executes the closure `run_request_once`, provided the load balancing plan and some information
1948 /// about the request, including retry session.
1949 /// If request fails, retry session is used to perform retries.
1950 ///
1951 /// Returns None, if provided plan is empty.
1952 async fn run_request_speculative_fiber<'a, QueryFut, ResT>(
1953 &'a self,
1954 request_plan: impl Iterator<Item = (NodeRef<'a>, Shard)>,
1955 run_request_once: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
1956 execution_profile: &ExecutionProfileInner,
1957 mut context: ExecuteRequestContext<'a>,
1958 ) -> Option<Result<RunRequestResult<ResT>, RequestError>>
1959 where
1960 QueryFut: Future<Output = Result<ResT, RequestAttemptError>>,
1961 ResT: AllowedRunRequestResTType,
1962 {
1963 let mut last_error: Option<RequestError> = None;
1964 let mut current_consistency: Consistency = context
1965 .consistency_set_on_statement
1966 .unwrap_or(execution_profile.consistency);
1967
1968 'nodes_in_plan: for (node, shard) in request_plan {
1969 let span = trace_span!("Executing request", node = %node.address);
1970 'same_node_retries: loop {
1971 trace!(parent: &span, "Execution started");
1972 let connection = match node.connection_for_shard(shard).await {
1973 Ok(connection) => connection,
1974 Err(e) => {
1975 trace!(
1976 parent: &span,
1977 error = %e,
1978 "Choosing connection failed"
1979 );
1980 last_error = Some(e.into());
1981 // Broken connection doesn't count as a failed request, don't log in metrics
1982 continue 'nodes_in_plan;
1983 }
1984 };
1985 context.request_span.record_shard_id(&connection);
1986
1987 #[cfg(feature = "metrics")]
1988 self.metrics.inc_total_nonpaged_queries();
1989 let request_start = std::time::Instant::now();
1990
1991 trace!(
1992 parent: &span,
1993 connection = %connection.get_connect_address(),
1994 "Sending"
1995 );
1996 let attempt_id: Option<history::AttemptId> =
1997 context.log_attempt_start(connection.get_connect_address());
1998 let request_result: Result<ResT, RequestAttemptError> =
1999 run_request_once(connection, current_consistency, execution_profile)
2000 .instrument(span.clone())
2001 .await;
2002
2003 let elapsed = request_start.elapsed();
2004 let request_error: RequestAttemptError = match request_result {
2005 Ok(response) => {
2006 trace!(parent: &span, "Request succeeded");
2007 #[cfg(feature = "metrics")]
2008 let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64);
2009 context.log_attempt_success(&attempt_id);
2010 execution_profile.load_balancing_policy.on_request_success(
2011 context.query_info,
2012 elapsed,
2013 node,
2014 );
2015 return Some(Ok(RunRequestResult::Completed(response)));
2016 }
2017 Err(e) => {
2018 trace!(
2019 parent: &span,
2020 last_error = %e,
2021 "Request failed"
2022 );
2023 #[cfg(feature = "metrics")]
2024 self.metrics.inc_failed_nonpaged_queries();
2025 execution_profile.load_balancing_policy.on_request_failure(
2026 context.query_info,
2027 elapsed,
2028 node,
2029 &e,
2030 );
2031 e
2032 }
2033 };
2034
2035 // Use retry policy to decide what to do next
2036 let query_info = RequestInfo {
2037 error: &request_error,
2038 is_idempotent: context.is_idempotent,
2039 consistency: context
2040 .consistency_set_on_statement
2041 .unwrap_or(execution_profile.consistency),
2042 };
2043
2044 let retry_decision = context.retry_session.decide_should_retry(query_info);
2045 trace!(
2046 parent: &span,
2047 retry_decision = ?retry_decision
2048 );
2049
2050 context.log_attempt_error(&attempt_id, &request_error, &retry_decision);
2051
2052 last_error = Some(request_error.into());
2053
2054 match retry_decision {
2055 RetryDecision::RetrySameTarget(new_cl) => {
2056 #[cfg(feature = "metrics")]
2057 self.metrics.inc_retries_num();
2058 current_consistency = new_cl.unwrap_or(current_consistency);
2059 continue 'same_node_retries;
2060 }
2061 RetryDecision::RetryNextTarget(new_cl) => {
2062 #[cfg(feature = "metrics")]
2063 self.metrics.inc_retries_num();
2064 current_consistency = new_cl.unwrap_or(current_consistency);
2065 continue 'nodes_in_plan;
2066 }
2067 RetryDecision::DontRetry => break 'nodes_in_plan,
2068
2069 RetryDecision::IgnoreWriteError => {
2070 return Some(Ok(RunRequestResult::IgnoredWriteError))
2071 }
2072 };
2073 }
2074 }
2075
2076 last_error.map(Result::Err)
2077 }
2078
2079 async fn await_schema_agreement_indefinitely(&self) -> Result<Uuid, SchemaAgreementError> {
2080 loop {
2081 tokio::time::sleep(self.schema_agreement_interval).await;
2082 if let Some(agreed_version) = self.check_schema_agreement().await? {
2083 return Ok(agreed_version);
2084 }
2085 }
2086 }
2087
2088 pub async fn await_schema_agreement(&self) -> Result<Uuid, SchemaAgreementError> {
2089 timeout(
2090 self.schema_agreement_timeout,
2091 self.await_schema_agreement_indefinitely(),
2092 )
2093 .await
2094 .unwrap_or(Err(SchemaAgreementError::Timeout(
2095 self.schema_agreement_timeout,
2096 )))
2097 }
2098
2099 pub async fn check_schema_agreement(&self) -> Result<Option<Uuid>, SchemaAgreementError> {
2100 let cluster_state = self.get_cluster_state();
2101 let connections_iter = cluster_state.iter_working_connections()?;
2102
2103 let handles = connections_iter.map(|c| async move { c.fetch_schema_version().await });
2104 let versions = try_join_all(handles).await?;
2105
2106 let local_version: Uuid = versions[0];
2107 let in_agreement = versions.into_iter().all(|v| v == local_version);
2108 Ok(in_agreement.then_some(local_version))
2109 }
2110
2111 /// Retrieves the handle to execution profile that is used by this session
2112 /// by default, i.e. when an executed statement does not define its own handle.
2113 pub fn get_default_execution_profile_handle(&self) -> &ExecutionProfileHandle {
2114 &self.default_execution_profile_handle
2115 }
2116}
2117
2118// run_request, run_request_speculative_fiber, etc have a template type called ResT.
2119// There was a bug where ResT was set to QueryResponse, which could
2120// be an error response. This was not caught by retry policy which
2121// assumed all errors would come from analyzing Result<ResT, ExecutionError>.
2122// This trait is a guard to make sure that this mistake doesn't
2123// happen again.
2124// When using run_request make sure that the ResT type is NOT able
2125// to contain any errors.
2126// See https://github.com/scylladb/scylla-rust-driver/issues/501
2127pub(crate) trait AllowedRunRequestResTType {}
2128
2129impl AllowedRunRequestResTType for Uuid {}
2130impl AllowedRunRequestResTType for QueryResult {}
2131impl AllowedRunRequestResTType for NonErrorQueryResponse {}
2132
2133struct ExecuteRequestContext<'a> {
2134 is_idempotent: bool,
2135 consistency_set_on_statement: Option<Consistency>,
2136 retry_session: Box<dyn RetrySession>,
2137 history_data: Option<HistoryData<'a>>,
2138 query_info: &'a load_balancing::RoutingInfo<'a>,
2139 request_span: &'a RequestSpan,
2140}
2141
2142struct HistoryData<'a> {
2143 listener: &'a dyn HistoryListener,
2144 request_id: history::RequestId,
2145 speculative_id: Option<history::SpeculativeId>,
2146}
2147
2148impl ExecuteRequestContext<'_> {
2149 fn log_attempt_start(&self, node_addr: SocketAddr) -> Option<history::AttemptId> {
2150 self.history_data.as_ref().map(|hd| {
2151 hd.listener
2152 .log_attempt_start(hd.request_id, hd.speculative_id, node_addr)
2153 })
2154 }
2155
2156 fn log_attempt_success(&self, attempt_id_opt: &Option<history::AttemptId>) {
2157 let attempt_id: &history::AttemptId = match attempt_id_opt {
2158 Some(id) => id,
2159 None => return,
2160 };
2161
2162 let history_data: &HistoryData = match &self.history_data {
2163 Some(data) => data,
2164 None => return,
2165 };
2166
2167 history_data.listener.log_attempt_success(*attempt_id);
2168 }
2169
2170 fn log_attempt_error(
2171 &self,
2172 attempt_id_opt: &Option<history::AttemptId>,
2173 error: &RequestAttemptError,
2174 retry_decision: &RetryDecision,
2175 ) {
2176 let attempt_id: &history::AttemptId = match attempt_id_opt {
2177 Some(id) => id,
2178 None => return,
2179 };
2180
2181 let history_data: &HistoryData = match &self.history_data {
2182 Some(data) => data,
2183 None => return,
2184 };
2185
2186 history_data
2187 .listener
2188 .log_attempt_error(*attempt_id, error, retry_decision);
2189 }
2190}