scylla/client/
session.rs

1//! `Session` is the main object used in the driver.\
2//! It manages all connections to the cluster and allows to execute CQL requests.
3
4use super::execution_profile::{ExecutionProfile, ExecutionProfileHandle, ExecutionProfileInner};
5use super::pager::{PreparedIteratorConfig, QueryPager};
6use super::{Compression, PoolSize, SelfIdentity, WriteCoalescingDelay};
7use crate::authentication::AuthenticatorProvider;
8#[cfg(feature = "unstable-cloud")]
9use crate::cloud::CloudConfig;
10#[cfg(feature = "unstable-cloud")]
11use crate::cluster::node::CloudEndpoint;
12use crate::cluster::node::{InternalKnownNode, KnownNode, NodeRef};
13use crate::cluster::{Cluster, ClusterNeatDebug, ClusterState};
14use crate::errors::{
15    BadQuery, ExecutionError, MetadataError, NewSessionError, PagerExecutionError, PrepareError,
16    RequestAttemptError, RequestError, SchemaAgreementError, TracingError, UseKeyspaceError,
17};
18use crate::frame::response::result;
19use crate::network::tls::TlsProvider;
20use crate::network::{Connection, ConnectionConfig, PoolConfig, VerifiedKeyspaceName};
21use crate::observability::driver_tracing::RequestSpan;
22use crate::observability::history::{self, HistoryListener};
23#[cfg(feature = "metrics")]
24use crate::observability::metrics::Metrics;
25use crate::observability::tracing::TracingInfo;
26use crate::policies::address_translator::AddressTranslator;
27use crate::policies::host_filter::HostFilter;
28use crate::policies::load_balancing::{self, RoutingInfo};
29use crate::policies::retry::{RequestInfo, RetryDecision, RetrySession};
30use crate::policies::speculative_execution;
31use crate::policies::timestamp_generator::TimestampGenerator;
32use crate::response::query_result::{MaybeFirstRowError, QueryResult, RowsError};
33use crate::response::{NonErrorQueryResponse, PagingState, PagingStateResponse, QueryResponse};
34use crate::routing::partitioner::PartitionerName;
35use crate::routing::{Shard, ShardAwarePortRange};
36use crate::statement::batch::batch_values;
37use crate::statement::batch::{Batch, BatchStatement};
38use crate::statement::prepared::{PartitionKeyError, PreparedStatement};
39use crate::statement::unprepared::Statement;
40use crate::statement::{Consistency, PageSize, StatementConfig};
41use arc_swap::ArcSwapOption;
42use futures::future::join_all;
43use futures::future::try_join_all;
44use itertools::Itertools;
45use scylla_cql::frame::response::NonErrorResponse;
46use scylla_cql::serialize::batch::BatchValues;
47use scylla_cql::serialize::row::{SerializeRow, SerializedValues};
48use std::borrow::Borrow;
49use std::future::Future;
50use std::net::{IpAddr, SocketAddr};
51use std::num::NonZeroU32;
52use std::sync::Arc;
53use std::time::Duration;
54use tokio::time::timeout;
55#[cfg(feature = "unstable-cloud")]
56use tracing::warn;
57use tracing::{debug, error, trace, trace_span, Instrument};
58use uuid::Uuid;
59
60pub(crate) const TABLET_CHANNEL_SIZE: usize = 8192;
61
62const TRACING_QUERY_PAGE_SIZE: i32 = 1024;
63
64/// `Session` manages connections to the cluster and allows to execute CQL requests.
65pub struct Session {
66    cluster: Cluster,
67    default_execution_profile_handle: ExecutionProfileHandle,
68    schema_agreement_interval: Duration,
69    #[cfg(feature = "metrics")]
70    metrics: Arc<Metrics>,
71    schema_agreement_timeout: Duration,
72    schema_agreement_automatic_waiting: bool,
73    refresh_metadata_on_auto_schema_agreement: bool,
74    keyspace_name: Arc<ArcSwapOption<String>>,
75    tracing_info_fetch_attempts: NonZeroU32,
76    tracing_info_fetch_interval: Duration,
77    tracing_info_fetch_consistency: Consistency,
78}
79
80/// This implementation deliberately omits some details from Cluster in order
81/// to avoid cluttering the print with much information of little usability.
82impl std::fmt::Debug for Session {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        let mut d = f.debug_struct("Session");
85        d.field("cluster", &ClusterNeatDebug(&self.cluster))
86            .field(
87                "default_execution_profile_handle",
88                &self.default_execution_profile_handle,
89            )
90            .field("schema_agreement_interval", &self.schema_agreement_interval);
91
92        #[cfg(feature = "metrics")]
93        d.field("metrics", &self.metrics);
94
95        d.field(
96            "auto_await_schema_agreement_timeout",
97            &self.schema_agreement_timeout,
98        )
99        .field(
100            "schema_agreement_automatic_waiting",
101            &self.schema_agreement_automatic_waiting,
102        )
103        .field(
104            "refresh_metadata_on_auto_schema_agreement",
105            &self.refresh_metadata_on_auto_schema_agreement,
106        )
107        .field("keyspace_name", &self.keyspace_name)
108        .field(
109            "tracing_info_fetch_attempts",
110            &self.tracing_info_fetch_attempts,
111        )
112        .field(
113            "tracing_info_fetch_interval",
114            &self.tracing_info_fetch_interval,
115        )
116        .field(
117            "tracing_info_fetch_consistency",
118            &self.tracing_info_fetch_consistency,
119        )
120        .finish()
121    }
122}
123
124#[derive(Clone)] // Cheaply clonable - reference counted.
125#[non_exhaustive]
126pub enum TlsContext {
127    #[cfg(feature = "openssl-010")]
128    OpenSsl010(openssl::ssl::SslContext),
129    #[cfg(feature = "rustls-023")]
130    Rustls023(Arc<rustls::ClientConfig>),
131}
132
133#[cfg(feature = "openssl-010")]
134impl From<openssl::ssl::SslContext> for TlsContext {
135    fn from(value: openssl::ssl::SslContext) -> Self {
136        TlsContext::OpenSsl010(value)
137    }
138}
139
140#[cfg(feature = "rustls-023")]
141impl From<Arc<rustls::ClientConfig>> for TlsContext {
142    fn from(value: Arc<rustls::ClientConfig>) -> Self {
143        TlsContext::Rustls023(value)
144    }
145}
146
147/// Configuration options for [`Session`].
148/// Can be created manually, but usually it's easier to use
149/// [SessionBuilder](super::session_builder::SessionBuilder)
150#[derive(Clone)]
151#[non_exhaustive]
152pub struct SessionConfig {
153    /// List of database servers known on Session startup.
154    /// Session will connect to these nodes to retrieve information about other nodes in the cluster.
155    /// Each node can be represented as a hostname or an IP address.
156    pub known_nodes: Vec<KnownNode>,
157
158    /// A local ip address to bind all driver's TCP sockets to.
159    ///
160    /// By default set to None, which is equivalent to:
161    /// - `INADDR_ANY` for IPv4 ([`Ipv4Addr::UNSPECIFIED`][std::net::Ipv4Addr::UNSPECIFIED])
162    /// - `in6addr_any` for IPv6 ([`Ipv6Addr::UNSPECIFIED`][std::net::Ipv6Addr::UNSPECIFIED])
163    pub local_ip_address: Option<IpAddr>,
164
165    /// Specifies the local port range used for shard-aware connections.
166    ///
167    /// By default set to [`ShardAwarePortRange::EPHEMERAL_PORT_RANGE`].
168    pub shard_aware_local_port_range: ShardAwarePortRange,
169
170    /// Preferred compression algorithm to use on connections.
171    /// If it's not supported by database server Session will fall back to no compression.
172    pub compression: Option<Compression>,
173    pub tcp_nodelay: bool,
174    pub tcp_keepalive_interval: Option<Duration>,
175
176    pub default_execution_profile_handle: ExecutionProfileHandle,
177
178    pub used_keyspace: Option<String>,
179    pub keyspace_case_sensitive: bool,
180
181    /// Provide our Session with TLS
182    pub tls_context: Option<TlsContext>,
183
184    pub authenticator: Option<Arc<dyn AuthenticatorProvider>>,
185
186    pub connect_timeout: Duration,
187
188    /// Size of the per-node connection pool, i.e. how many connections the driver should keep to each node.
189    /// The default is `PerShard(1)`, which is the recommended setting for Scylla clusters.
190    pub connection_pool_size: PoolSize,
191
192    /// If true, prevents the driver from connecting to the shard-aware port, even if the node supports it.
193    /// Generally, this options is best left as default (false).
194    pub disallow_shard_aware_port: bool,
195
196    ///  Timestamp generator used for generating timestamps on the client-side
197    ///  If None, server-side timestamps are used.
198    pub timestamp_generator: Option<Arc<dyn TimestampGenerator>>,
199
200    /// If empty, fetch all keyspaces
201    pub keyspaces_to_fetch: Vec<String>,
202
203    /// If true, full schema is fetched with every metadata refresh.
204    pub fetch_schema_metadata: bool,
205
206    /// Custom timeout for requests that query metadata.
207    pub metadata_request_serverside_timeout: Option<Duration>,
208
209    /// Interval of sending keepalive requests.
210    /// If `None`, keepalives are never sent, so `Self::keepalive_timeout` has no effect.
211    pub keepalive_interval: Option<Duration>,
212
213    /// Controls after what time of not receiving response to keepalives a connection is closed.
214    /// If `None`, connections are never closed due to lack of response to a keepalive message.
215    pub keepalive_timeout: Option<Duration>,
216
217    /// How often the driver should ask if schema is in agreement.
218    pub schema_agreement_interval: Duration,
219
220    /// Controls the timeout for waiting for schema agreement.
221    /// This works both for manual awaiting schema agreement and for
222    /// automatic waiting after a schema-altering statement is sent.
223    pub schema_agreement_timeout: Duration,
224
225    /// Controls whether schema agreement is automatically awaited
226    /// after sending a schema-altering statement.
227    pub schema_agreement_automatic_waiting: bool,
228
229    /// If true, full schema metadata is fetched after successfully reaching a schema agreement.
230    /// It is true by default but can be disabled if successive schema-altering statements should be performed.
231    pub refresh_metadata_on_auto_schema_agreement: bool,
232
233    /// The address translator is used to translate addresses received from ScyllaDB nodes
234    /// (either with cluster metadata or with an event) to addresses that can be used to
235    /// actually connect to those nodes. This may be needed e.g. when there is NAT
236    /// between the nodes and the driver.
237    pub address_translator: Option<Arc<dyn AddressTranslator>>,
238
239    /// The host filter decides whether any connections should be opened
240    /// to the node or not. The driver will also avoid filtered out nodes when
241    /// re-establishing the control connection.
242    pub host_filter: Option<Arc<dyn HostFilter>>,
243
244    /// If the driver is to connect to ScyllaCloud, there is a config for it.
245    #[cfg(feature = "unstable-cloud")]
246    pub cloud_config: Option<Arc<CloudConfig>>,
247
248    /// If true, the driver will inject a delay controlled by [`SessionConfig::write_coalescing_delay`]
249    /// before flushing data to the socket.
250    /// This gives the driver an opportunity to collect more write requests
251    /// and write them in a single syscall, increasing the efficiency.
252    ///
253    /// However, this optimization may worsen latency if the rate of requests
254    /// issued by the application is low, but otherwise the application is
255    /// heavily loaded with other tasks on the same tokio executor.
256    /// Please do performance measurements before committing to disabling
257    /// this option.
258    pub enable_write_coalescing: bool,
259
260    /// Controls the write coalescing delay (if enabled).
261    ///
262    /// This option has no effect if [`SessionConfig::enable_write_coalescing`] is false.
263    ///
264    /// This option is [`WriteCoalescingDelay::SmallNondeterministic`] by default.
265    pub write_coalescing_delay: WriteCoalescingDelay,
266
267    /// Number of attempts to fetch [`TracingInfo`]
268    /// in [`Session::get_tracing_info`]. Tracing info
269    /// might not be available immediately on queried node - that's why
270    /// the driver performs a few attempts with sleeps in between.
271    pub tracing_info_fetch_attempts: NonZeroU32,
272
273    /// Delay between attempts to fetch [`TracingInfo`]
274    /// in [`Session::get_tracing_info`]. Tracing info
275    /// might not be available immediately on queried node - that's why
276    /// the driver performs a few attempts with sleeps in between.
277    pub tracing_info_fetch_interval: Duration,
278
279    /// Consistency level of fetching [`TracingInfo`]
280    /// in [`Session::get_tracing_info`].
281    pub tracing_info_fetch_consistency: Consistency,
282
283    /// Interval between refreshing cluster metadata. This
284    /// can be configured according to the traffic pattern
285    /// for e.g: if they do not want unexpected traffic
286    /// or they expect the topology to change frequently.
287    pub cluster_metadata_refresh_interval: Duration,
288
289    /// Driver and application self-identifying information,
290    /// to be sent to server in STARTUP message.
291    pub identity: SelfIdentity<'static>,
292}
293
294impl SessionConfig {
295    /// Creates a [`SessionConfig`] with default configuration
296    /// # Default configuration
297    /// * Compression: None
298    /// * Load balancing policy: Token-aware Round-robin
299    ///
300    /// # Example
301    /// ```
302    /// # use scylla::client::session::SessionConfig;
303    /// let config = SessionConfig::new();
304    /// ```
305    pub fn new() -> Self {
306        SessionConfig {
307            known_nodes: Vec::new(),
308            local_ip_address: None,
309            shard_aware_local_port_range: ShardAwarePortRange::EPHEMERAL_PORT_RANGE,
310            compression: None,
311            tcp_nodelay: true,
312            tcp_keepalive_interval: None,
313            schema_agreement_interval: Duration::from_millis(200),
314            default_execution_profile_handle: ExecutionProfile::new_from_inner(Default::default())
315                .into_handle(),
316            used_keyspace: None,
317            keyspace_case_sensitive: false,
318            tls_context: None,
319            authenticator: None,
320            connect_timeout: Duration::from_secs(5),
321            connection_pool_size: Default::default(),
322            disallow_shard_aware_port: false,
323            timestamp_generator: None,
324            keyspaces_to_fetch: Vec::new(),
325            fetch_schema_metadata: true,
326            metadata_request_serverside_timeout: Some(Duration::from_secs(2)),
327            keepalive_interval: Some(Duration::from_secs(30)),
328            keepalive_timeout: Some(Duration::from_secs(30)),
329            schema_agreement_timeout: Duration::from_secs(60),
330            schema_agreement_automatic_waiting: true,
331            address_translator: None,
332            host_filter: None,
333            refresh_metadata_on_auto_schema_agreement: true,
334            #[cfg(feature = "unstable-cloud")]
335            cloud_config: None,
336            enable_write_coalescing: true,
337            write_coalescing_delay: WriteCoalescingDelay::SmallNondeterministic,
338            tracing_info_fetch_attempts: NonZeroU32::new(10).unwrap(),
339            tracing_info_fetch_interval: Duration::from_millis(3),
340            tracing_info_fetch_consistency: Consistency::One,
341            cluster_metadata_refresh_interval: Duration::from_secs(60),
342            identity: SelfIdentity::default(),
343        }
344    }
345
346    /// Adds a known database server with a hostname.
347    /// If the port is not explicitly specified, 9042 is used as default
348    /// # Example
349    /// ```
350    /// # use scylla::client::session::SessionConfig;
351    /// let mut config = SessionConfig::new();
352    /// config.add_known_node("127.0.0.1");
353    /// config.add_known_node("db1.example.com:9042");
354    /// ```
355    pub fn add_known_node(&mut self, hostname: impl AsRef<str>) {
356        self.known_nodes
357            .push(KnownNode::Hostname(hostname.as_ref().to_string()));
358    }
359
360    /// Adds a known database server with an IP address
361    /// # Example
362    /// ```
363    /// # use scylla::client::session::SessionConfig;
364    /// # use std::net::{SocketAddr, IpAddr, Ipv4Addr};
365    /// let mut config = SessionConfig::new();
366    /// config.add_known_node_addr(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9042));
367    /// ```
368    pub fn add_known_node_addr(&mut self, node_addr: SocketAddr) {
369        self.known_nodes.push(KnownNode::Address(node_addr));
370    }
371
372    /// Adds a list of known database server with hostnames.
373    /// If the port is not explicitly specified, 9042 is used as default
374    /// # Example
375    /// ```
376    /// # use scylla::client::session::SessionConfig;
377    /// # use std::net::{SocketAddr, IpAddr, Ipv4Addr};
378    /// let mut config = SessionConfig::new();
379    /// config.add_known_nodes(&["127.0.0.1:9042", "db1.example.com"]);
380    /// ```
381    pub fn add_known_nodes(&mut self, hostnames: impl IntoIterator<Item = impl AsRef<str>>) {
382        for hostname in hostnames {
383            self.add_known_node(hostname);
384        }
385    }
386
387    /// Adds a list of known database servers with IP addresses
388    /// # Example
389    /// ```
390    /// # use scylla::client::session::SessionConfig;
391    /// # use std::net::{SocketAddr, IpAddr, Ipv4Addr};
392    /// let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 17, 0, 3)), 9042);
393    /// let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(172, 17, 0, 4)), 9042);
394    ///
395    /// let mut config = SessionConfig::new();
396    /// config.add_known_nodes_addr(&[addr1, addr2]);
397    /// ```
398    pub fn add_known_nodes_addr(
399        &mut self,
400        node_addrs: impl IntoIterator<Item = impl Borrow<SocketAddr>>,
401    ) {
402        for address in node_addrs {
403            self.add_known_node_addr(*address.borrow());
404        }
405    }
406}
407
408/// Creates default [`SessionConfig`], same as [`SessionConfig::new`]
409impl Default for SessionConfig {
410    fn default() -> Self {
411        Self::new()
412    }
413}
414
415pub(crate) enum RunRequestResult<ResT> {
416    IgnoredWriteError,
417    Completed(ResT),
418}
419
420impl Session {
421    /// Sends a request to the database and receives a response.\
422    /// Executes an unprepared CQL statement without paging, i.e. all results are received in a single response.
423    ///
424    /// This is the easiest way to execute a CQL statement, but performance is worse than that of prepared statements.
425    ///
426    /// It is discouraged to use this method with non-empty values argument ([`SerializeRow::is_empty()`]
427    /// trait method returns false). In such case, statement first needs to be prepared (on a single connection), so
428    /// driver will perform 2 round trips instead of 1. Please use [`Session::execute_unpaged()`] instead.
429    ///
430    /// As all results come in one response (no paging is done!), the memory footprint and latency may be huge
431    /// for statements returning rows (i.e. SELECTs)! Prefer this method for non-SELECTs, and for SELECTs
432    /// it is best to use paged requests:
433    /// - to receive multiple pages and transparently iterate through them, use [query_iter](Session::query_iter).
434    /// - to manually receive multiple pages and iterate through them, use [query_single_page](Session::query_single_page).
435    ///
436    /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/unprepared.html) for more information
437    /// # Arguments
438    /// * `statement` - statement to be executed, can be just a `&str` or the [`Statement`] struct.
439    /// * `values` - values bound to the statement, the easiest way is to use a tuple of bound values.
440    ///
441    /// # Examples
442    /// ```rust
443    /// # use scylla::client::session::Session;
444    /// # use std::error::Error;
445    /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
446    /// // Insert an int and text into a table.
447    /// session
448    ///     .query_unpaged(
449    ///         "INSERT INTO ks.tab (a, b) VALUES(?, ?)",
450    ///         (2_i32, "some text")
451    ///     )
452    ///     .await?;
453    /// # Ok(())
454    /// # }
455    /// ```
456    /// ```rust
457    /// # use scylla::client::session::Session;
458    /// # use std::error::Error;
459    /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
460    ///
461    /// // Read rows containing an int and text.
462    /// // Keep in mind that all results come in one response (no paging is done!),
463    /// // so the memory footprint and latency may be huge!
464    /// // To prevent that, use `Session::query_iter` or `Session::query_single_page`.
465    /// let query_rows = session
466    ///     .query_unpaged("SELECT a, b FROM ks.tab", &[])
467    ///     .await?
468    ///     .into_rows_result()?;
469    ///
470    /// for row in query_rows.rows()? {
471    ///     // Parse row as int and text.
472    ///     let (int_val, text_val): (i32, &str) = row?;
473    /// }
474    /// # Ok(())
475    /// # }
476    /// ```
477    pub async fn query_unpaged(
478        &self,
479        statement: impl Into<Statement>,
480        values: impl SerializeRow,
481    ) -> Result<QueryResult, ExecutionError> {
482        self.do_query_unpaged(&statement.into(), values).await
483    }
484
485    /// Queries a single page from the database, optionally continuing from a saved point.
486    ///
487    /// It is discouraged to use this method with non-empty values argument ([`SerializeRow::is_empty()`]
488    /// trait method returns false). In such case, CQL statement first needs to be prepared (on a single connection), so
489    /// driver will perform 2 round trips instead of 1. Please use [`Session::execute_single_page()`] instead.
490    ///
491    /// # Arguments
492    ///
493    /// * `statement` - statement to be executed
494    /// * `values` - values bound to the statement
495    /// * `paging_state` - previously received paging state or [PagingState::start()]
496    ///
497    /// # Example
498    ///
499    /// ```rust
500    /// # use scylla::client::session::Session;
501    /// # use std::error::Error;
502    /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
503    /// use std::ops::ControlFlow;
504    /// use scylla::response::PagingState;
505    ///
506    /// // Manual paging in a loop, unprepared statement.
507    /// let mut paging_state = PagingState::start();
508    /// loop {
509    ///    let (res, paging_state_response) = session
510    ///        .query_single_page("SELECT a, b, c FROM ks.tbl", &[], paging_state)
511    ///        .await?;
512    ///
513    ///    // Do something with a single page of results.
514    ///    for row in res
515    ///        .into_rows_result()?
516    ///        .rows::<(i32, &str)>()?
517    ///    {
518    ///        let (a, b) = row?;
519    ///    }
520    ///
521    ///    match paging_state_response.into_paging_control_flow() {
522    ///        ControlFlow::Break(()) => {
523    ///            // No more pages to be fetched.
524    ///            break;
525    ///        }
526    ///        ControlFlow::Continue(new_paging_state) => {
527    ///            // Update paging state from the response, so that query
528    ///            // will be resumed from where it ended the last time.
529    ///            paging_state = new_paging_state;
530    ///        }
531    ///    }
532    /// }
533    /// # Ok(())
534    /// # }
535    /// ```
536    pub async fn query_single_page(
537        &self,
538        statement: impl Into<Statement>,
539        values: impl SerializeRow,
540        paging_state: PagingState,
541    ) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
542        self.do_query_single_page(&statement.into(), values, paging_state)
543            .await
544    }
545
546    /// Execute an unprepared CQL statement with paging\
547    /// This method will query all pages of the result\
548    ///
549    /// Returns an async iterator (stream) over all received rows\
550    /// Page size can be specified in the [`Statement`] passed to the function
551    ///
552    /// It is discouraged to use this method with non-empty values argument ([`SerializeRow::is_empty()`]
553    /// trait method returns false). In such case, statement first needs to be prepared (on a single connection), so
554    /// driver will initially perform 2 round trips instead of 1. Please use [`Session::execute_iter()`] instead.
555    ///
556    /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/paged.html) for more information.
557    ///
558    /// # Arguments
559    /// * `statement` - statement to be executed, can be just a `&str` or the [`Statement`] struct.
560    /// * `values` - values bound to the statement, the easiest way is to use a tuple of bound values.
561    ///
562    /// # Example
563    ///
564    /// ```rust
565    /// # use scylla::client::session::Session;
566    /// # use std::error::Error;
567    /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
568    /// use futures::stream::StreamExt;
569    ///
570    /// let mut rows_stream = session
571    ///    .query_iter("SELECT a, b FROM ks.t", &[])
572    ///    .await?
573    ///    .rows_stream::<(i32, i32)>()?;
574    ///
575    /// while let Some(next_row_res) = rows_stream.next().await {
576    ///     let (a, b): (i32, i32) = next_row_res?;
577    ///     println!("a, b: {}, {}", a, b);
578    /// }
579    /// # Ok(())
580    /// # }
581    /// ```
582    pub async fn query_iter(
583        &self,
584        statement: impl Into<Statement>,
585        values: impl SerializeRow,
586    ) -> Result<QueryPager, PagerExecutionError> {
587        self.do_query_iter(statement.into(), values).await
588    }
589
590    /// Execute a prepared statement. Requires a [PreparedStatement]
591    /// generated using [`Session::prepare`](Session::prepare).\
592    /// Performs an unpaged request, i.e. all results are received in a single response.
593    ///
594    /// As all results come in one response (no paging is done!), the memory footprint and latency may be huge
595    /// for statements returning rows (i.e. SELECTs)! Prefer this method for non-SELECTs, and for SELECTs
596    /// it is best to use paged requests:
597    /// - to receive multiple pages and transparently iterate through them, use [execute_iter](Session::execute_iter).
598    /// - to manually receive multiple pages and iterate through them, use [execute_single_page](Session::execute_single_page).
599    ///
600    /// Prepared statements are much faster than unprepared statements:
601    /// * Database doesn't need to parse the statement string upon each execution (only once)
602    /// * They are properly load balanced using token aware routing
603    ///
604    /// > ***Warning***\
605    /// > For token/shard aware load balancing to work properly, all partition key values
606    /// > must be sent as bound values
607    /// > (see [performance section](https://rust-driver.docs.scylladb.com/stable/statements/prepared.html#performance)).
608    ///
609    /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/prepared.html) for more information.
610    ///
611    /// # Arguments
612    /// * `prepared` - the prepared statement to execute, generated using [`Session::prepare`](Session::prepare)
613    /// * `values` - values bound to the statement, the easiest way is to use a tuple of bound values
614    ///
615    /// # Example
616    /// ```rust
617    /// # use scylla::client::session::Session;
618    /// # use std::error::Error;
619    /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
620    /// use scylla::statement::prepared::PreparedStatement;
621    ///
622    /// // Prepare the statement for later execution
623    /// let prepared: PreparedStatement = session
624    ///     .prepare("INSERT INTO ks.tab (a) VALUES(?)")
625    ///     .await?;
626    ///
627    /// // Execute the prepared statement with some values, just like an unprepared statement.
628    /// let to_insert: i32 = 12345;
629    /// session.execute_unpaged(&prepared, (to_insert,)).await?;
630    /// # Ok(())
631    /// # }
632    /// ```
633    pub async fn execute_unpaged(
634        &self,
635        prepared: &PreparedStatement,
636        values: impl SerializeRow,
637    ) -> Result<QueryResult, ExecutionError> {
638        self.do_execute_unpaged(prepared, values).await
639    }
640
641    /// Executes a prepared statement, restricting results to single page.
642    /// Optionally continues fetching results from a saved point.
643    ///
644    /// # Arguments
645    ///
646    /// * `prepared` - a statement prepared with [prepare](crate::client::session::Session::prepare)
647    /// * `values` - values bound to the statement
648    /// * `paging_state` - continuation based on a paging state received from a previous paged query or None
649    ///
650    /// # Example
651    ///
652    /// ```rust
653    /// # use scylla::client::session::Session;
654    /// # use std::error::Error;
655    /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
656    /// use std::ops::ControlFlow;
657    /// use scylla::statement::unprepared::Statement;
658    /// use scylla::response::{PagingState, PagingStateResponse};
659    ///
660    /// let paged_prepared = session
661    ///     .prepare(
662    ///         Statement::new("SELECT a, b FROM ks.tbl")
663    ///             .with_page_size(100.try_into().unwrap()),
664    ///     )
665    ///     .await?;
666    ///
667    /// // Manual paging in a loop, prepared statement.
668    /// let mut paging_state = PagingState::start();
669    /// loop {
670    ///     let (res, paging_state_response) = session
671    ///         .execute_single_page(&paged_prepared, &[], paging_state)
672    ///         .await?;
673    ///
674    ///    // Do something with a single page of results.
675    ///    for row in res
676    ///        .into_rows_result()?
677    ///        .rows::<(i32, &str)>()?
678    ///    {
679    ///        let (a, b) = row?;
680    ///    }
681    ///
682    ///     match paging_state_response.into_paging_control_flow() {
683    ///         ControlFlow::Break(()) => {
684    ///             // No more pages to be fetched.
685    ///             break;
686    ///         }
687    ///         ControlFlow::Continue(new_paging_state) => {
688    ///             // Update paging continuation from the paging state, so that query
689    ///             // will be resumed from where it ended the last time.
690    ///             paging_state = new_paging_state;
691    ///         }
692    ///     }
693    /// }
694    /// # Ok(())
695    /// # }
696    /// ```
697    pub async fn execute_single_page(
698        &self,
699        prepared: &PreparedStatement,
700        values: impl SerializeRow,
701        paging_state: PagingState,
702    ) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
703        self.do_execute_single_page(prepared, values, paging_state)
704            .await
705    }
706
707    /// Execute a prepared statement with paging.\
708    /// This method will query all pages of the result.\
709    ///
710    /// Returns an async iterator (stream) over all received rows.\
711    /// Page size can be specified in the [PreparedStatement] passed to the function.
712    ///
713    /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/paged.html) for more information.
714    ///
715    /// # Arguments
716    /// * `prepared` - the prepared statement to execute, generated using [`Session::prepare`](Session::prepare)
717    /// * `values` - values bound to the statement, the easiest way is to use a tuple of bound values
718    ///
719    /// # Example
720    ///
721    /// ```rust
722    /// # use scylla::client::session::Session;
723    /// # use futures::StreamExt as _;
724    /// # use std::error::Error;
725    /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
726    /// use scylla::statement::prepared::PreparedStatement;
727    ///
728    /// // Prepare the statement for later execution
729    /// let prepared: PreparedStatement = session
730    ///     .prepare("SELECT a, b FROM ks.t")
731    ///     .await?;
732    ///
733    /// // Execute the statement and receive all pages
734    /// let mut rows_stream = session
735    ///    .execute_iter(prepared, &[])
736    ///    .await?
737    ///    .rows_stream::<(i32, i32)>()?;
738    ///
739    /// while let Some(next_row_res) = rows_stream.next().await {
740    ///     let (a, b): (i32, i32) = next_row_res?;
741    ///     println!("a, b: {}, {}", a, b);
742    /// }
743    /// # Ok(())
744    /// # }
745    /// ```
746    pub async fn execute_iter(
747        &self,
748        prepared: impl Into<PreparedStatement>,
749        values: impl SerializeRow,
750    ) -> Result<QueryPager, PagerExecutionError> {
751        self.do_execute_iter(prepared.into(), values).await
752    }
753
754    /// Execute a batch statement\
755    /// Batch contains many `unprepared` or `prepared` statements which are executed at once\
756    /// Batch doesn't return any rows.
757    ///
758    /// Batch values must contain values for each of the statements.
759    ///
760    /// Avoid using non-empty values ([`SerializeRow::is_empty()`] return false) for unprepared statements
761    /// inside the batch. Such statements will first need to be prepared, so the driver will need to
762    /// send (numer_of_unprepared_statements_with_values + 1) requests instead of 1 request, severly
763    /// affecting performance.
764    ///
765    /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/batch.html) for more information
766    ///
767    /// # Arguments
768    /// * `batch` - [Batch] to be performed
769    /// * `values` - List of values for each statement, it's the easiest to use a tuple of tuples
770    ///
771    /// # Example
772    /// ```rust
773    /// # use scylla::client::session::Session;
774    /// # use std::error::Error;
775    /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
776    /// use scylla::statement::batch::Batch;
777    ///
778    /// let mut batch: Batch = Default::default();
779    ///
780    /// // A statement with two bound values
781    /// batch.append_statement("INSERT INTO ks.tab(a, b) VALUES(?, ?)");
782    ///
783    /// // A statement with one bound value
784    /// batch.append_statement("INSERT INTO ks.tab(a, b) VALUES(3, ?)");
785    ///
786    /// // A statement with no bound values
787    /// batch.append_statement("INSERT INTO ks.tab(a, b) VALUES(5, 6)");
788    ///
789    /// // Batch values is a tuple of 3 tuples containing values for each statement
790    /// let batch_values = ((1_i32, 2_i32), // Tuple with two values for the first statement
791    ///                     (4_i32,),       // Tuple with one value for the second statement
792    ///                     ());            // Empty tuple/unit for the third statement
793    ///
794    /// // Run the batch
795    /// session.batch(&batch, batch_values).await?;
796    /// # Ok(())
797    /// # }
798    /// ```
799    pub async fn batch(
800        &self,
801        batch: &Batch,
802        values: impl BatchValues,
803    ) -> Result<QueryResult, ExecutionError> {
804        self.do_batch(batch, values).await
805    }
806}
807
808/// Represents a CQL session, which can be used to communicate
809/// with the database
810impl Session {
811    /// Estabilishes a CQL session with the database
812    ///
813    /// Usually it's easier to use [SessionBuilder](crate::client::session_builder::SessionBuilder)
814    /// instead of calling `Session::connect` directly, because it's more convenient.
815    /// # Arguments
816    /// * `config` - Connection configuration - known nodes, Compression, etc.
817    ///   Must contain at least one known node.
818    ///
819    /// # Example
820    /// ```rust
821    /// # use std::error::Error;
822    /// # async fn check_only_compiles() -> Result<(), Box<dyn Error>> {
823    /// use scylla::client::session::{Session, SessionConfig};
824    /// use scylla::cluster::KnownNode;
825    ///
826    /// let mut config = SessionConfig::new();
827    /// config.known_nodes.push(KnownNode::Hostname("127.0.0.1:9042".to_string()));
828    ///
829    /// let session: Session = Session::connect(config).await?;
830    /// # Ok(())
831    /// # }
832    /// ```
833    pub async fn connect(config: SessionConfig) -> Result<Self, NewSessionError> {
834        let known_nodes = config.known_nodes;
835
836        #[cfg(feature = "unstable-cloud")]
837        let cloud_known_nodes: Option<Vec<InternalKnownNode>> =
838            if let Some(ref cloud_config) = config.cloud_config {
839                let cloud_servers = cloud_config
840                    .get_datacenters()
841                    .iter()
842                    .map(|(dc_name, dc_data)| {
843                        InternalKnownNode::CloudEndpoint(CloudEndpoint {
844                            hostname: dc_data.get_server().to_owned(),
845                            datacenter: dc_name.clone(),
846                        })
847                    })
848                    .collect();
849                Some(cloud_servers)
850            } else {
851                None
852            };
853
854        #[cfg(not(feature = "unstable-cloud"))]
855        let cloud_known_nodes: Option<Vec<InternalKnownNode>> = None;
856
857        #[allow(clippy::unnecessary_literal_unwrap)]
858        let known_nodes = cloud_known_nodes
859            .unwrap_or_else(|| known_nodes.into_iter().map(|node| node.into()).collect());
860
861        // Ensure there is at least one known node
862        if known_nodes.is_empty() {
863            return Err(NewSessionError::EmptyKnownNodesList);
864        }
865
866        let (tablet_sender, tablet_receiver) = tokio::sync::mpsc::channel(TABLET_CHANNEL_SIZE);
867
868        #[allow(unused_labels)] // Triggers when `cloud` feature is disabled.
869        let address_translator = 'translator: {
870            #[cfg(feature = "unstable-cloud")]
871            if let Some(translator) = config.cloud_config.clone() {
872                if config.address_translator.is_some() {
873                    // This can only happen if the user builds SessionConfig by hand, as SessionBuilder in cloud mode prevents setting custom AddressTranslator.
874                    warn!(
875                        "Overriding user-provided AddressTranslator with Scylla Cloud AddressTranslator due \
876                            to CloudConfig being provided. This is certainly an API misuse - Cloud \
877                            may not be combined with user's own AddressTranslator."
878                    )
879                }
880
881                break 'translator Some(translator as Arc<dyn AddressTranslator>);
882            }
883
884            config.address_translator
885        };
886
887        let tls_provider = 'provider: {
888            #[cfg(feature = "unstable-cloud")]
889            if let Some(cloud_config) = config.cloud_config {
890                if config.tls_context.is_some() {
891                    // This can only happen if the user builds SessionConfig by hand, as SessionBuilder in cloud mode prevents setting custom TlsContext.
892                    warn!(
893                        "Overriding user-provided TlsContext with Scylla Cloud TlsContext due \
894                            to CloudConfig being provided. This is certainly an API misuse - Cloud \
895                            may not be combined with user's own TLS config."
896                    )
897                }
898
899                let provider = TlsProvider::new_cloud(cloud_config);
900                break 'provider Some(provider);
901            }
902            if let Some(tls_context) = config.tls_context {
903                // To silence warnings when TlsContext is an empty enum (tls features are disabled).
904                // In such case, TlsProvider is uninhabited.
905                #[allow(unused_variables)]
906                let provider = TlsProvider::new_with_global_context(tls_context);
907                #[allow(unreachable_code)]
908                break 'provider Some(provider);
909            }
910            None
911        };
912
913        let connection_config = ConnectionConfig {
914            local_ip_address: config.local_ip_address,
915            shard_aware_local_port_range: config.shard_aware_local_port_range,
916            compression: config.compression,
917            tcp_nodelay: config.tcp_nodelay,
918            tcp_keepalive_interval: config.tcp_keepalive_interval,
919            timestamp_generator: config.timestamp_generator,
920            tls_provider,
921            authenticator: config.authenticator,
922            connect_timeout: config.connect_timeout,
923            event_sender: None,
924            default_consistency: Default::default(),
925            address_translator,
926            write_coalescing_delay: config
927                .enable_write_coalescing
928                .then_some(config.write_coalescing_delay),
929            keepalive_interval: config.keepalive_interval,
930            keepalive_timeout: config.keepalive_timeout,
931            tablet_sender: Some(tablet_sender),
932            identity: config.identity,
933        };
934
935        let pool_config = PoolConfig {
936            connection_config,
937            pool_size: config.connection_pool_size,
938            can_use_shard_aware_port: !config.disallow_shard_aware_port,
939        };
940
941        #[cfg(feature = "metrics")]
942        let metrics = Arc::new(Metrics::new());
943
944        let cluster = Cluster::new(
945            known_nodes,
946            pool_config,
947            config.keyspaces_to_fetch,
948            config.fetch_schema_metadata,
949            config.metadata_request_serverside_timeout,
950            config.host_filter,
951            config.cluster_metadata_refresh_interval,
952            tablet_receiver,
953            #[cfg(feature = "metrics")]
954            Arc::clone(&metrics),
955        )
956        .await?;
957
958        let default_execution_profile_handle = config.default_execution_profile_handle;
959
960        let session = Self {
961            cluster,
962            default_execution_profile_handle,
963            schema_agreement_interval: config.schema_agreement_interval,
964            #[cfg(feature = "metrics")]
965            metrics,
966            schema_agreement_timeout: config.schema_agreement_timeout,
967            schema_agreement_automatic_waiting: config.schema_agreement_automatic_waiting,
968            refresh_metadata_on_auto_schema_agreement: config
969                .refresh_metadata_on_auto_schema_agreement,
970            keyspace_name: Arc::new(ArcSwapOption::default()), // will be set by use_keyspace
971            tracing_info_fetch_attempts: config.tracing_info_fetch_attempts,
972            tracing_info_fetch_interval: config.tracing_info_fetch_interval,
973            tracing_info_fetch_consistency: config.tracing_info_fetch_consistency,
974        };
975
976        if let Some(keyspace_name) = config.used_keyspace {
977            session
978                .use_keyspace(keyspace_name, config.keyspace_case_sensitive)
979                .await?;
980        }
981
982        Ok(session)
983    }
984
985    async fn do_query_unpaged(
986        &self,
987        statement: &Statement,
988        values: impl SerializeRow,
989    ) -> Result<QueryResult, ExecutionError> {
990        let (result, paging_state_response) = self
991            .query(statement, values, None, PagingState::start())
992            .await?;
993        if !paging_state_response.finished() {
994            error!("Unpaged unprepared query returned a non-empty paging state! This is a driver-side or server-side bug.");
995            return Err(ExecutionError::LastAttemptError(
996                RequestAttemptError::NonfinishedPagingState,
997            ));
998        }
999        Ok(result)
1000    }
1001
1002    async fn do_query_single_page(
1003        &self,
1004        statement: &Statement,
1005        values: impl SerializeRow,
1006        paging_state: PagingState,
1007    ) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1008        self.query(
1009            statement,
1010            values,
1011            Some(statement.get_validated_page_size()),
1012            paging_state,
1013        )
1014        .await
1015    }
1016
1017    /// Sends a request to the database.
1018    /// Optionally continues fetching results from a saved point.
1019    ///
1020    /// This is now an internal method only.
1021    ///
1022    /// Tl;dr: use [Session::query_unpaged], [Session::query_single_page] or [Session::query_iter] instead.
1023    ///
1024    /// The rationale is that we believe that paging is so important concept (and it has shown to be error-prone as well)
1025    /// that we need to require users to make a conscious decision to use paging or not. For that, we expose
1026    /// the aforementioned 3 methods clearly differing in naming and API, so that no unconscious choices about paging
1027    /// should be made.
1028    async fn query(
1029        &self,
1030        statement: &Statement,
1031        values: impl SerializeRow,
1032        page_size: Option<PageSize>,
1033        paging_state: PagingState,
1034    ) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1035        let execution_profile = statement
1036            .get_execution_profile_handle()
1037            .unwrap_or_else(|| self.get_default_execution_profile_handle())
1038            .access();
1039
1040        let statement_info = RoutingInfo {
1041            consistency: statement
1042                .config
1043                .consistency
1044                .unwrap_or(execution_profile.consistency),
1045            serial_consistency: statement
1046                .config
1047                .serial_consistency
1048                .unwrap_or(execution_profile.serial_consistency),
1049            ..Default::default()
1050        };
1051
1052        let span = RequestSpan::new_query(&statement.contents);
1053        let span_ref = &span;
1054        let run_request_result = self
1055            .run_request(
1056                statement_info,
1057                &statement.config,
1058                execution_profile,
1059                |connection: Arc<Connection>,
1060                 consistency: Consistency,
1061                 execution_profile: &ExecutionProfileInner| {
1062                    let serial_consistency = statement
1063                        .config
1064                        .serial_consistency
1065                        .unwrap_or(execution_profile.serial_consistency);
1066                    // Needed to avoid moving query and values into async move block
1067                    let statement_ref = &statement;
1068                    let values_ref = &values;
1069                    let paging_state_ref = &paging_state;
1070                    async move {
1071                        if values_ref.is_empty() {
1072                            span_ref.record_request_size(0);
1073                            connection
1074                                .query_raw_with_consistency(
1075                                    statement_ref,
1076                                    consistency,
1077                                    serial_consistency,
1078                                    page_size,
1079                                    paging_state_ref.clone(),
1080                                )
1081                                .await
1082                                .and_then(QueryResponse::into_non_error_query_response)
1083                        } else {
1084                            let prepared = connection.prepare(statement_ref).await?;
1085                            let serialized = prepared.serialize_values(values_ref)?;
1086                            span_ref.record_request_size(serialized.buffer_size());
1087                            connection
1088                                .execute_raw_with_consistency(
1089                                    &prepared,
1090                                    &serialized,
1091                                    consistency,
1092                                    serial_consistency,
1093                                    page_size,
1094                                    paging_state_ref.clone(),
1095                                )
1096                                .await
1097                                .and_then(QueryResponse::into_non_error_query_response)
1098                        }
1099                    }
1100                },
1101                &span,
1102            )
1103            .instrument(span.span().clone())
1104            .await?;
1105
1106        let response = match run_request_result {
1107            RunRequestResult::IgnoredWriteError => NonErrorQueryResponse {
1108                response: NonErrorResponse::Result(result::Result::Void),
1109                tracing_id: None,
1110                warnings: Vec::new(),
1111            },
1112            RunRequestResult::Completed(response) => response,
1113        };
1114
1115        self.handle_set_keyspace_response(&response).await?;
1116        self.handle_auto_await_schema_agreement(&response).await?;
1117
1118        let (result, paging_state_response) = response.into_query_result_and_paging_state()?;
1119        span.record_result_fields(&result);
1120
1121        Ok((result, paging_state_response))
1122    }
1123
1124    async fn handle_set_keyspace_response(
1125        &self,
1126        response: &NonErrorQueryResponse,
1127    ) -> Result<(), UseKeyspaceError> {
1128        if let Some(set_keyspace) = response.as_set_keyspace() {
1129            debug!(
1130                "Detected USE KEYSPACE query, setting session's keyspace to {}",
1131                set_keyspace.keyspace_name
1132            );
1133            self.use_keyspace(set_keyspace.keyspace_name.clone(), true)
1134                .await?;
1135        }
1136
1137        Ok(())
1138    }
1139
1140    async fn handle_auto_await_schema_agreement(
1141        &self,
1142        response: &NonErrorQueryResponse,
1143    ) -> Result<(), ExecutionError> {
1144        if self.schema_agreement_automatic_waiting {
1145            if response.as_schema_change().is_some() {
1146                self.await_schema_agreement().await?;
1147            }
1148
1149            if self.refresh_metadata_on_auto_schema_agreement
1150                && response.as_schema_change().is_some()
1151            {
1152                self.refresh_metadata().await?;
1153            }
1154        }
1155
1156        Ok(())
1157    }
1158
1159    async fn do_query_iter(
1160        &self,
1161        statement: Statement,
1162        values: impl SerializeRow,
1163    ) -> Result<QueryPager, PagerExecutionError> {
1164        let execution_profile = statement
1165            .get_execution_profile_handle()
1166            .unwrap_or_else(|| self.get_default_execution_profile_handle())
1167            .access();
1168
1169        if values.is_empty() {
1170            QueryPager::new_for_query(
1171                statement,
1172                execution_profile,
1173                self.cluster.get_state(),
1174                #[cfg(feature = "metrics")]
1175                Arc::clone(&self.metrics),
1176            )
1177            .await
1178            .map_err(PagerExecutionError::NextPageError)
1179        } else {
1180            // Making QueryPager::new_for_query work with values is too hard (if even possible)
1181            // so instead of sending one prepare to a specific connection on each iterator query,
1182            // we fully prepare a statement beforehand.
1183            let prepared = self.prepare(statement).await?;
1184            let values = prepared.serialize_values(&values)?;
1185            QueryPager::new_for_prepared_statement(PreparedIteratorConfig {
1186                prepared,
1187                values,
1188                execution_profile,
1189                cluster_state: self.cluster.get_state(),
1190                #[cfg(feature = "metrics")]
1191                metrics: Arc::clone(&self.metrics),
1192            })
1193            .await
1194            .map_err(PagerExecutionError::NextPageError)
1195        }
1196    }
1197
1198    /// Prepares a statement on the server side and returns a prepared statement,
1199    /// which can later be used to perform more efficient requests.
1200    ///
1201    /// Prepared statements are much faster than unprepared statements:
1202    /// * Database doesn't need to parse the statement string upon each execution (only once)
1203    /// * They are properly load balanced using token aware routing
1204    ///
1205    /// > ***Warning***\
1206    /// > For token/shard aware load balancing to work properly, all partition key values
1207    /// > must be sent as bound values
1208    /// > (see [performance section](https://rust-driver.docs.scylladb.com/stable/statements/prepared.html#performance))
1209    ///
1210    /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/prepared.html) for more information.
1211    /// See the documentation of [`PreparedStatement`].
1212    ///
1213    /// # Arguments
1214    /// * `statement` - statement to prepare, can be just a `&str` or the [`Statement`] struct.
1215    ///
1216    /// # Example
1217    /// ```rust
1218    /// # use scylla::client::session::Session;
1219    /// # use std::error::Error;
1220    /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
1221    /// use scylla::statement::prepared::PreparedStatement;
1222    ///
1223    /// // Prepare the statement for later execution
1224    /// let prepared: PreparedStatement = session
1225    ///     .prepare("INSERT INTO ks.tab (a) VALUES(?)")
1226    ///     .await?;
1227    ///
1228    /// // Execute the prepared statement with some values, just like an unprepared statement.
1229    /// let to_insert: i32 = 12345;
1230    /// session.execute_unpaged(&prepared, (to_insert,)).await?;
1231    /// # Ok(())
1232    /// # }
1233    /// ```
1234    pub async fn prepare(
1235        &self,
1236        statement: impl Into<Statement>,
1237    ) -> Result<PreparedStatement, PrepareError> {
1238        let statement = statement.into();
1239        let statement_ref = &statement;
1240
1241        let cluster_state = self.get_cluster_state();
1242        let connections_iter = cluster_state.iter_working_connections()?;
1243
1244        // Prepare statements on all connections concurrently
1245        let handles = connections_iter.map(|c| async move { c.prepare(statement_ref).await });
1246        let mut results = join_all(handles).await.into_iter();
1247
1248        // If at least one prepare was successful, `prepare()` returns Ok.
1249        // Find the first result that is Ok, or Err if all failed.
1250
1251        // Safety: there is at least one node in the cluster, and `Cluster::iter_working_connections()`
1252        // returns either an error or an iterator with at least one connection, so there will be at least one result.
1253        let first_ok: Result<PreparedStatement, RequestAttemptError> =
1254            results.by_ref().find_or_first(Result::is_ok).unwrap();
1255        let mut prepared: PreparedStatement =
1256            first_ok.map_err(|first_attempt| PrepareError::AllAttemptsFailed { first_attempt })?;
1257
1258        // Validate prepared ids equality
1259        for statement in results.flatten() {
1260            if prepared.get_id() != statement.get_id() {
1261                return Err(PrepareError::PreparedStatementIdsMismatch);
1262            }
1263
1264            // Collect all tracing ids from prepare() queries in the final result
1265            prepared
1266                .prepare_tracing_ids
1267                .extend(statement.prepare_tracing_ids);
1268        }
1269
1270        prepared.set_partitioner_name(
1271            self.extract_partitioner_name(&prepared, &self.cluster.get_state())
1272                .and_then(PartitionerName::from_str)
1273                .unwrap_or_default(),
1274        );
1275
1276        Ok(prepared)
1277    }
1278
1279    fn extract_partitioner_name<'a>(
1280        &self,
1281        prepared: &PreparedStatement,
1282        cluster_state: &'a ClusterState,
1283    ) -> Option<&'a str> {
1284        let table_spec = prepared.get_table_spec()?;
1285        cluster_state
1286            .keyspaces
1287            .get(table_spec.ks_name())?
1288            .tables
1289            .get(table_spec.table_name())?
1290            .partitioner
1291            .as_deref()
1292    }
1293
1294    async fn do_execute_unpaged(
1295        &self,
1296        prepared: &PreparedStatement,
1297        values: impl SerializeRow,
1298    ) -> Result<QueryResult, ExecutionError> {
1299        let serialized_values = prepared.serialize_values(&values)?;
1300        let (result, paging_state) = self
1301            .execute(prepared, &serialized_values, None, PagingState::start())
1302            .await?;
1303        if !paging_state.finished() {
1304            error!("Unpaged prepared query returned a non-empty paging state! This is a driver-side or server-side bug.");
1305            return Err(ExecutionError::LastAttemptError(
1306                RequestAttemptError::NonfinishedPagingState,
1307            ));
1308        }
1309        Ok(result)
1310    }
1311
1312    async fn do_execute_single_page(
1313        &self,
1314        prepared: &PreparedStatement,
1315        values: impl SerializeRow,
1316        paging_state: PagingState,
1317    ) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1318        let serialized_values = prepared.serialize_values(&values)?;
1319        let page_size = prepared.get_validated_page_size();
1320        self.execute(prepared, &serialized_values, Some(page_size), paging_state)
1321            .await
1322    }
1323
1324    /// Sends a prepared request to the database, optionally continuing from a saved point.
1325    ///
1326    /// This is now an internal method only.
1327    ///
1328    /// Tl;dr: use [Session::execute_unpaged], [Session::execute_single_page] or [Session::execute_iter] instead.
1329    ///
1330    /// The rationale is that we believe that paging is so important concept (and it has shown to be error-prone as well)
1331    /// that we need to require users to make a conscious decision to use paging or not. For that, we expose
1332    /// the aforementioned 3 methods clearly differing in naming and API, so that no unconscious choices about paging
1333    /// should be made.
1334    async fn execute(
1335        &self,
1336        prepared: &PreparedStatement,
1337        serialized_values: &SerializedValues,
1338        page_size: Option<PageSize>,
1339        paging_state: PagingState,
1340    ) -> Result<(QueryResult, PagingStateResponse), ExecutionError> {
1341        let values_ref = &serialized_values;
1342        let paging_state_ref = &paging_state;
1343
1344        let (partition_key, token) = prepared
1345            .extract_partition_key_and_calculate_token(prepared.get_partitioner_name(), values_ref)
1346            .map_err(PartitionKeyError::into_execution_error)?
1347            .unzip();
1348
1349        let execution_profile = prepared
1350            .get_execution_profile_handle()
1351            .unwrap_or_else(|| self.get_default_execution_profile_handle())
1352            .access();
1353
1354        let table_spec = prepared.get_table_spec();
1355
1356        let statement_info = RoutingInfo {
1357            consistency: prepared
1358                .config
1359                .consistency
1360                .unwrap_or(execution_profile.consistency),
1361            serial_consistency: prepared
1362                .config
1363                .serial_consistency
1364                .unwrap_or(execution_profile.serial_consistency),
1365            token,
1366            table: table_spec,
1367            is_confirmed_lwt: prepared.is_confirmed_lwt(),
1368        };
1369
1370        let span = RequestSpan::new_prepared(
1371            partition_key.as_ref().map(|pk| pk.iter()),
1372            token,
1373            serialized_values.buffer_size(),
1374        );
1375
1376        if !span.span().is_disabled() {
1377            if let (Some(table_spec), Some(token)) = (statement_info.table, token) {
1378                let cluster_state = self.get_cluster_state();
1379                let replicas = cluster_state.get_token_endpoints_iter(table_spec, token);
1380                span.record_replicas(replicas)
1381            }
1382        }
1383
1384        let run_request_result: RunRequestResult<NonErrorQueryResponse> = self
1385            .run_request(
1386                statement_info,
1387                &prepared.config,
1388                execution_profile,
1389                |connection: Arc<Connection>,
1390                 consistency: Consistency,
1391                 execution_profile: &ExecutionProfileInner| {
1392                    let serial_consistency = prepared
1393                        .config
1394                        .serial_consistency
1395                        .unwrap_or(execution_profile.serial_consistency);
1396                    async move {
1397                        connection
1398                            .execute_raw_with_consistency(
1399                                prepared,
1400                                values_ref,
1401                                consistency,
1402                                serial_consistency,
1403                                page_size,
1404                                paging_state_ref.clone(),
1405                            )
1406                            .await
1407                            .and_then(QueryResponse::into_non_error_query_response)
1408                    }
1409                },
1410                &span,
1411            )
1412            .instrument(span.span().clone())
1413            .await?;
1414
1415        let response = match run_request_result {
1416            RunRequestResult::IgnoredWriteError => NonErrorQueryResponse {
1417                response: NonErrorResponse::Result(result::Result::Void),
1418                tracing_id: None,
1419                warnings: Vec::new(),
1420            },
1421            RunRequestResult::Completed(response) => response,
1422        };
1423
1424        self.handle_set_keyspace_response(&response).await?;
1425        self.handle_auto_await_schema_agreement(&response).await?;
1426
1427        let (result, paging_state_response) = response.into_query_result_and_paging_state()?;
1428        span.record_result_fields(&result);
1429
1430        Ok((result, paging_state_response))
1431    }
1432
1433    async fn do_execute_iter(
1434        &self,
1435        prepared: PreparedStatement,
1436        values: impl SerializeRow,
1437    ) -> Result<QueryPager, PagerExecutionError> {
1438        let serialized_values = prepared.serialize_values(&values)?;
1439
1440        let execution_profile = prepared
1441            .get_execution_profile_handle()
1442            .unwrap_or_else(|| self.get_default_execution_profile_handle())
1443            .access();
1444
1445        QueryPager::new_for_prepared_statement(PreparedIteratorConfig {
1446            prepared,
1447            values: serialized_values,
1448            execution_profile,
1449            cluster_state: self.cluster.get_state(),
1450            #[cfg(feature = "metrics")]
1451            metrics: Arc::clone(&self.metrics),
1452        })
1453        .await
1454        .map_err(PagerExecutionError::NextPageError)
1455    }
1456
1457    async fn do_batch(
1458        &self,
1459        batch: &Batch,
1460        values: impl BatchValues,
1461    ) -> Result<QueryResult, ExecutionError> {
1462        // Shard-awareness behavior for batch will be to pick shard based on first batch statement's shard
1463        // If users batch statements by shard, they will be rewarded with full shard awareness
1464
1465        // check to ensure that we don't send a batch statement with more than u16::MAX queries
1466        let batch_statements_length = batch.statements.len();
1467        if batch_statements_length > u16::MAX as usize {
1468            return Err(ExecutionError::BadQuery(
1469                BadQuery::TooManyQueriesInBatchStatement(batch_statements_length),
1470            ));
1471        }
1472
1473        let execution_profile = batch
1474            .get_execution_profile_handle()
1475            .unwrap_or_else(|| self.get_default_execution_profile_handle())
1476            .access();
1477
1478        let consistency = batch
1479            .config
1480            .consistency
1481            .unwrap_or(execution_profile.consistency);
1482
1483        let serial_consistency = batch
1484            .config
1485            .serial_consistency
1486            .unwrap_or(execution_profile.serial_consistency);
1487
1488        let (first_value_token, values) =
1489            batch_values::peek_first_token(values, batch.statements.first())?;
1490        let values_ref = &values;
1491
1492        let table_spec =
1493            if let Some(BatchStatement::PreparedStatement(ps)) = batch.statements.first() {
1494                ps.get_table_spec()
1495            } else {
1496                None
1497            };
1498
1499        let statement_info = RoutingInfo {
1500            consistency,
1501            serial_consistency,
1502            token: first_value_token,
1503            table: table_spec,
1504            is_confirmed_lwt: false,
1505        };
1506
1507        let span = RequestSpan::new_batch();
1508
1509        let run_request_result: RunRequestResult<NonErrorQueryResponse> = self
1510            .run_request(
1511                statement_info,
1512                &batch.config,
1513                execution_profile,
1514                |connection: Arc<Connection>,
1515                 consistency: Consistency,
1516                 execution_profile: &ExecutionProfileInner| {
1517                    let serial_consistency = batch
1518                        .config
1519                        .serial_consistency
1520                        .unwrap_or(execution_profile.serial_consistency);
1521                    async move {
1522                        connection
1523                            .batch_with_consistency(
1524                                batch,
1525                                values_ref,
1526                                consistency,
1527                                serial_consistency,
1528                            )
1529                            .await
1530                            .and_then(QueryResponse::into_non_error_query_response)
1531                    }
1532                },
1533                &span,
1534            )
1535            .instrument(span.span().clone())
1536            .await?;
1537
1538        let result = match run_request_result {
1539            RunRequestResult::IgnoredWriteError => QueryResult::mock_empty(),
1540            RunRequestResult::Completed(non_error_query_response) => {
1541                let result = non_error_query_response.into_query_result()?;
1542                span.record_result_fields(&result);
1543                result
1544            }
1545        };
1546
1547        Ok(result)
1548    }
1549
1550    /// Prepares all statements within the batch and returns a new batch where every
1551    /// statement is prepared.
1552    /// /// # Example
1553    /// ```rust
1554    /// # extern crate scylla;
1555    /// # use scylla::client::session::Session;
1556    /// # use std::error::Error;
1557    /// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
1558    /// use scylla::statement::batch::Batch;
1559    ///
1560    /// // Create a batch statement with unprepared statements
1561    /// let mut batch: Batch = Default::default();
1562    /// batch.append_statement("INSERT INTO ks.simple_unprepared1 VALUES(?, ?)");
1563    /// batch.append_statement("INSERT INTO ks.simple_unprepared2 VALUES(?, ?)");
1564    ///
1565    /// // Prepare all statements in the batch at once
1566    /// let prepared_batch: Batch = session.prepare_batch(&batch).await?;
1567    ///
1568    /// // Specify bound values to use with each statement
1569    /// let batch_values = ((1_i32, 2_i32),
1570    ///                     (3_i32, 4_i32));
1571    ///
1572    /// // Run the prepared batch
1573    /// session.batch(&prepared_batch, batch_values).await?;
1574    /// # Ok(())
1575    /// # }
1576    /// ```
1577    pub async fn prepare_batch(&self, batch: &Batch) -> Result<Batch, PrepareError> {
1578        let mut prepared_batch = batch.clone();
1579
1580        try_join_all(
1581            prepared_batch
1582                .statements
1583                .iter_mut()
1584                .map(|statement| async move {
1585                    if let BatchStatement::Query(query) = statement {
1586                        let prepared = self.prepare(query.clone()).await?;
1587                        *statement = BatchStatement::PreparedStatement(prepared);
1588                    }
1589                    Ok::<(), PrepareError>(())
1590                }),
1591        )
1592        .await?;
1593
1594        Ok(prepared_batch)
1595    }
1596
1597    /// Sends `USE <keyspace_name>` request on all connections\
1598    /// This allows to write `SELECT * FROM table` instead of `SELECT * FROM keyspace.table`\
1599    ///
1600    /// Note that even failed `use_keyspace` can change currently used keyspace - the request is sent on all connections and
1601    /// can overwrite previously used keyspace.
1602    ///
1603    /// Call only one `use_keyspace` at a time.\
1604    /// Trying to do two `use_keyspace` requests simultaneously with different names
1605    /// can end with some connections using one keyspace and the rest using the other.
1606    ///
1607    /// See [the book](https://rust-driver.docs.scylladb.com/stable/statements/usekeyspace.html) for more information
1608    ///
1609    /// # Arguments
1610    ///
1611    /// * `keyspace_name` - keyspace name to use,
1612    ///   keyspace names can have up to 48 alphanumeric characters and contain underscores
1613    /// * `case_sensitive` - if set to true the generated statement will put keyspace name in quotes
1614    /// # Example
1615    /// ```rust
1616    /// # use scylla::client::session::Session;
1617    /// # use scylla::client::session_builder::SessionBuilder;
1618    /// # use scylla::client::Compression;
1619    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1620    /// # let session = SessionBuilder::new().known_node("127.0.0.1:9042").build().await?;
1621    /// session
1622    ///     .query_unpaged("INSERT INTO my_keyspace.tab (a) VALUES ('test1')", &[])
1623    ///     .await?;
1624    ///
1625    /// session.use_keyspace("my_keyspace", false).await?;
1626    ///
1627    /// // Now we can omit keyspace name in the statement
1628    /// session
1629    ///     .query_unpaged("INSERT INTO tab (a) VALUES ('test2')", &[])
1630    ///     .await?;
1631    /// # Ok(())
1632    /// # }
1633    /// ```
1634    pub async fn use_keyspace(
1635        &self,
1636        keyspace_name: impl Into<String>,
1637        case_sensitive: bool,
1638    ) -> Result<(), UseKeyspaceError> {
1639        let keyspace_name = keyspace_name.into();
1640        self.keyspace_name
1641            .store(Some(Arc::new(keyspace_name.clone())));
1642
1643        // Trying to pass keyspace as bound value in "USE ?" doesn't work
1644        // So we have to create a string for query: "USE " + new_keyspace
1645        // To avoid any possible CQL injections it's good to verify that the name is valid
1646        let verified_ks_name = VerifiedKeyspaceName::new(keyspace_name, case_sensitive)?;
1647
1648        self.cluster.use_keyspace(verified_ks_name).await?;
1649
1650        Ok(())
1651    }
1652
1653    /// Manually trigger a metadata refresh\
1654    /// The driver will fetch current nodes in the cluster and update its metadata
1655    ///
1656    /// Normally this is not needed,
1657    /// the driver should automatically detect all metadata changes in the cluster
1658    pub async fn refresh_metadata(&self) -> Result<(), MetadataError> {
1659        self.cluster.refresh_metadata().await
1660    }
1661
1662    /// Access metrics collected by the driver\
1663    /// Driver collects various metrics like number of queries or query latencies.
1664    /// They can be read using this method
1665    #[cfg(feature = "metrics")]
1666    pub fn get_metrics(&self) -> Arc<Metrics> {
1667        Arc::clone(&self.metrics)
1668    }
1669
1670    /// Access cluster state visible by the driver.
1671    ///
1672    /// Driver collects various information about network topology or schema.
1673    /// It can be read using this method.
1674    pub fn get_cluster_state(&self) -> Arc<ClusterState> {
1675        self.cluster.get_state()
1676    }
1677
1678    /// Get [`TracingInfo`] of a traced query performed earlier
1679    ///
1680    /// See [the book](https://rust-driver.docs.scylladb.com/stable/tracing/tracing.html)
1681    /// for more information about query tracing
1682    pub async fn get_tracing_info(&self, tracing_id: &Uuid) -> Result<TracingInfo, TracingError> {
1683        // tracing_info_fetch_attempts is NonZeroU32 so at least one attempt will be made
1684        for _ in 0..self.tracing_info_fetch_attempts.get() {
1685            let current_try: Option<TracingInfo> = self
1686                .try_getting_tracing_info(tracing_id, Some(self.tracing_info_fetch_consistency))
1687                .await?;
1688
1689            match current_try {
1690                Some(tracing_info) => return Ok(tracing_info),
1691                None => tokio::time::sleep(self.tracing_info_fetch_interval).await,
1692            };
1693        }
1694
1695        Err(TracingError::EmptyResults)
1696    }
1697
1698    /// Gets the name of the keyspace that is currently set, or `None` if no
1699    /// keyspace was set.
1700    ///
1701    /// It will initially return the name of the keyspace that was set
1702    /// in the session configuration, but calling `use_keyspace` will update
1703    /// it.
1704    ///
1705    /// Note: the return value might be wrong if `use_keyspace` was called
1706    /// concurrently or it previously failed. It is also unspecified
1707    /// if `get_keyspace` is called concurrently with `use_keyspace`.
1708    #[inline]
1709    pub fn get_keyspace(&self) -> Option<Arc<String>> {
1710        self.keyspace_name.load_full()
1711    }
1712
1713    // Tries getting the tracing info
1714    // If the queries return 0 rows then returns None - the information didn't reach this node yet
1715    // If there is some other error returns this error
1716    async fn try_getting_tracing_info(
1717        &self,
1718        tracing_id: &Uuid,
1719        consistency: Option<Consistency>,
1720    ) -> Result<Option<TracingInfo>, TracingError> {
1721        // Query system_traces.sessions for TracingInfo
1722        let mut traces_session_query =
1723            Statement::new(crate::observability::tracing::TRACES_SESSION_QUERY_STR);
1724        traces_session_query.config.consistency = consistency;
1725        traces_session_query.set_page_size(TRACING_QUERY_PAGE_SIZE);
1726
1727        // Query system_traces.events for TracingEvents
1728        let mut traces_events_query =
1729            Statement::new(crate::observability::tracing::TRACES_EVENTS_QUERY_STR);
1730        traces_events_query.config.consistency = consistency;
1731        traces_events_query.set_page_size(TRACING_QUERY_PAGE_SIZE);
1732
1733        let (traces_session_res, traces_events_res) = tokio::try_join!(
1734            self.do_query_unpaged(&traces_session_query, (tracing_id,)),
1735            self.do_query_unpaged(&traces_events_query, (tracing_id,))
1736        )?;
1737
1738        // Get tracing info
1739        let maybe_tracing_info: Option<TracingInfo> = traces_session_res
1740            .into_rows_result()
1741            .map_err(TracingError::TracesSessionIntoRowsResultError)?
1742            .maybe_first_row()
1743            .map_err(|err| match err {
1744                MaybeFirstRowError::TypeCheckFailed(e) => {
1745                    TracingError::TracesSessionInvalidColumnType(e)
1746                }
1747                MaybeFirstRowError::DeserializationFailed(e) => {
1748                    TracingError::TracesSessionDeserializationFailed(e)
1749                }
1750            })?;
1751
1752        let mut tracing_info = match maybe_tracing_info {
1753            None => return Ok(None),
1754            Some(tracing_info) => tracing_info,
1755        };
1756
1757        // Get tracing events
1758        let tracing_event_rows_result = traces_events_res
1759            .into_rows_result()
1760            .map_err(TracingError::TracesEventsIntoRowsResultError)?;
1761        let tracing_event_rows = tracing_event_rows_result.rows().map_err(|err| match err {
1762            RowsError::TypeCheckFailed(err) => TracingError::TracesEventsInvalidColumnType(err),
1763        })?;
1764
1765        tracing_info.events = tracing_event_rows
1766            .collect::<Result<_, _>>()
1767            .map_err(TracingError::TracesEventsDeserializationFailed)?;
1768
1769        if tracing_info.events.is_empty() {
1770            return Ok(None);
1771        }
1772
1773        Ok(Some(tracing_info))
1774    }
1775
1776    /// This method allows to easily run a request using load balancing, retry policy etc.
1777    /// Requires some information about the request and a closure.
1778    /// The closure is used to execute the request once on a chosen connection.
1779    /// - query will use connection.query()
1780    /// - execute will use connection.execute()
1781    ///
1782    /// If this closure fails with some errors, retry policy is used to perform retries.
1783    /// On success, this request's result is returned.
1784    // I tried to make this closures take a reference instead of an Arc but failed
1785    // maybe once async closures get stabilized this can be fixed
1786    async fn run_request<'a, QueryFut, ResT>(
1787        &'a self,
1788        statement_info: RoutingInfo<'a>,
1789        statement_config: &'a StatementConfig,
1790        execution_profile: Arc<ExecutionProfileInner>,
1791        run_request_once: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
1792        request_span: &'a RequestSpan,
1793    ) -> Result<RunRequestResult<ResT>, ExecutionError>
1794    where
1795        QueryFut: Future<Output = Result<ResT, RequestAttemptError>>,
1796        ResT: AllowedRunRequestResTType,
1797    {
1798        let history_listener_and_id: Option<(&'a dyn HistoryListener, history::RequestId)> =
1799            statement_config
1800                .history_listener
1801                .as_ref()
1802                .map(|hl| (&**hl, hl.log_request_start()));
1803
1804        let load_balancer = &execution_profile.load_balancing_policy;
1805
1806        let runner = async {
1807            let cluster_state = self.cluster.get_state();
1808            let request_plan =
1809                load_balancing::Plan::new(load_balancer.as_ref(), &statement_info, &cluster_state);
1810
1811            // If a speculative execution policy is used to run request, request_plan has to be shared
1812            // between different async functions. This struct helps to wrap request_plan in mutex so it
1813            // can be shared safely.
1814            struct SharedPlan<'a, I>
1815            where
1816                I: Iterator<Item = (NodeRef<'a>, Shard)>,
1817            {
1818                iter: std::sync::Mutex<I>,
1819            }
1820
1821            impl<'a, I> Iterator for &SharedPlan<'a, I>
1822            where
1823                I: Iterator<Item = (NodeRef<'a>, Shard)>,
1824            {
1825                type Item = (NodeRef<'a>, Shard);
1826
1827                fn next(&mut self) -> Option<Self::Item> {
1828                    self.iter.lock().unwrap().next()
1829                }
1830            }
1831
1832            let retry_policy = statement_config
1833                .retry_policy
1834                .as_deref()
1835                .unwrap_or(&*execution_profile.retry_policy);
1836
1837            let speculative_policy = execution_profile.speculative_execution_policy.as_ref();
1838
1839            match speculative_policy {
1840                Some(speculative) if statement_config.is_idempotent => {
1841                    let shared_request_plan = SharedPlan {
1842                        iter: std::sync::Mutex::new(request_plan),
1843                    };
1844
1845                    let request_runner_generator = |is_speculative: bool| {
1846                        let history_data: Option<HistoryData> = history_listener_and_id
1847                            .as_ref()
1848                            .map(|(history_listener, request_id)| {
1849                                let speculative_id: Option<history::SpeculativeId> =
1850                                    if is_speculative {
1851                                        Some(
1852                                            history_listener.log_new_speculative_fiber(*request_id),
1853                                        )
1854                                    } else {
1855                                        None
1856                                    };
1857                                HistoryData {
1858                                    listener: *history_listener,
1859                                    request_id: *request_id,
1860                                    speculative_id,
1861                                }
1862                            });
1863
1864                        if is_speculative {
1865                            request_span.inc_speculative_executions();
1866                        }
1867
1868                        self.run_request_speculative_fiber(
1869                            &shared_request_plan,
1870                            &run_request_once,
1871                            &execution_profile,
1872                            ExecuteRequestContext {
1873                                is_idempotent: statement_config.is_idempotent,
1874                                consistency_set_on_statement: statement_config.consistency,
1875                                retry_session: retry_policy.new_session(),
1876                                history_data,
1877                                query_info: &statement_info,
1878                                request_span,
1879                            },
1880                        )
1881                    };
1882
1883                    let context = speculative_execution::Context {
1884                        #[cfg(feature = "metrics")]
1885                        metrics: Arc::clone(&self.metrics),
1886                    };
1887
1888                    speculative_execution::execute(
1889                        speculative.as_ref(),
1890                        &context,
1891                        request_runner_generator,
1892                    )
1893                    .await
1894                }
1895                _ => {
1896                    let history_data: Option<HistoryData> =
1897                        history_listener_and_id
1898                            .as_ref()
1899                            .map(|(history_listener, request_id)| HistoryData {
1900                                listener: *history_listener,
1901                                request_id: *request_id,
1902                                speculative_id: None,
1903                            });
1904                    self.run_request_speculative_fiber(
1905                        request_plan,
1906                        &run_request_once,
1907                        &execution_profile,
1908                        ExecuteRequestContext {
1909                            is_idempotent: statement_config.is_idempotent,
1910                            consistency_set_on_statement: statement_config.consistency,
1911                            retry_session: retry_policy.new_session(),
1912                            history_data,
1913                            query_info: &statement_info,
1914                            request_span,
1915                        },
1916                    )
1917                    .await
1918                    .unwrap_or(Err(RequestError::EmptyPlan))
1919                }
1920            }
1921        };
1922
1923        let effective_timeout = statement_config
1924            .request_timeout
1925            .or(execution_profile.request_timeout);
1926        let result = match effective_timeout {
1927            Some(timeout) => tokio::time::timeout(timeout, runner).await.unwrap_or_else(
1928                |_: tokio::time::error::Elapsed| {
1929                    #[cfg(feature = "metrics")]
1930                    self.metrics.inc_request_timeouts();
1931                    Err(RequestError::RequestTimeout(timeout))
1932                },
1933            ),
1934            None => runner.await,
1935        };
1936
1937        if let Some((history_listener, request_id)) = history_listener_and_id {
1938            match &result {
1939                Ok(_) => history_listener.log_request_success(request_id),
1940                Err(e) => history_listener.log_request_error(request_id, e),
1941            }
1942        }
1943
1944        result.map_err(RequestError::into_execution_error)
1945    }
1946
1947    /// Executes the closure `run_request_once`, provided the load balancing plan and some information
1948    /// about the request, including retry session.
1949    /// If request fails, retry session is used to perform retries.
1950    ///
1951    /// Returns None, if provided plan is empty.
1952    async fn run_request_speculative_fiber<'a, QueryFut, ResT>(
1953        &'a self,
1954        request_plan: impl Iterator<Item = (NodeRef<'a>, Shard)>,
1955        run_request_once: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
1956        execution_profile: &ExecutionProfileInner,
1957        mut context: ExecuteRequestContext<'a>,
1958    ) -> Option<Result<RunRequestResult<ResT>, RequestError>>
1959    where
1960        QueryFut: Future<Output = Result<ResT, RequestAttemptError>>,
1961        ResT: AllowedRunRequestResTType,
1962    {
1963        let mut last_error: Option<RequestError> = None;
1964        let mut current_consistency: Consistency = context
1965            .consistency_set_on_statement
1966            .unwrap_or(execution_profile.consistency);
1967
1968        'nodes_in_plan: for (node, shard) in request_plan {
1969            let span = trace_span!("Executing request", node = %node.address);
1970            'same_node_retries: loop {
1971                trace!(parent: &span, "Execution started");
1972                let connection = match node.connection_for_shard(shard).await {
1973                    Ok(connection) => connection,
1974                    Err(e) => {
1975                        trace!(
1976                            parent: &span,
1977                            error = %e,
1978                            "Choosing connection failed"
1979                        );
1980                        last_error = Some(e.into());
1981                        // Broken connection doesn't count as a failed request, don't log in metrics
1982                        continue 'nodes_in_plan;
1983                    }
1984                };
1985                context.request_span.record_shard_id(&connection);
1986
1987                #[cfg(feature = "metrics")]
1988                self.metrics.inc_total_nonpaged_queries();
1989                let request_start = std::time::Instant::now();
1990
1991                trace!(
1992                    parent: &span,
1993                    connection = %connection.get_connect_address(),
1994                    "Sending"
1995                );
1996                let attempt_id: Option<history::AttemptId> =
1997                    context.log_attempt_start(connection.get_connect_address());
1998                let request_result: Result<ResT, RequestAttemptError> =
1999                    run_request_once(connection, current_consistency, execution_profile)
2000                        .instrument(span.clone())
2001                        .await;
2002
2003                let elapsed = request_start.elapsed();
2004                let request_error: RequestAttemptError = match request_result {
2005                    Ok(response) => {
2006                        trace!(parent: &span, "Request succeeded");
2007                        #[cfg(feature = "metrics")]
2008                        let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64);
2009                        context.log_attempt_success(&attempt_id);
2010                        execution_profile.load_balancing_policy.on_request_success(
2011                            context.query_info,
2012                            elapsed,
2013                            node,
2014                        );
2015                        return Some(Ok(RunRequestResult::Completed(response)));
2016                    }
2017                    Err(e) => {
2018                        trace!(
2019                            parent: &span,
2020                            last_error = %e,
2021                            "Request failed"
2022                        );
2023                        #[cfg(feature = "metrics")]
2024                        self.metrics.inc_failed_nonpaged_queries();
2025                        execution_profile.load_balancing_policy.on_request_failure(
2026                            context.query_info,
2027                            elapsed,
2028                            node,
2029                            &e,
2030                        );
2031                        e
2032                    }
2033                };
2034
2035                // Use retry policy to decide what to do next
2036                let query_info = RequestInfo {
2037                    error: &request_error,
2038                    is_idempotent: context.is_idempotent,
2039                    consistency: context
2040                        .consistency_set_on_statement
2041                        .unwrap_or(execution_profile.consistency),
2042                };
2043
2044                let retry_decision = context.retry_session.decide_should_retry(query_info);
2045                trace!(
2046                    parent: &span,
2047                    retry_decision = ?retry_decision
2048                );
2049
2050                context.log_attempt_error(&attempt_id, &request_error, &retry_decision);
2051
2052                last_error = Some(request_error.into());
2053
2054                match retry_decision {
2055                    RetryDecision::RetrySameTarget(new_cl) => {
2056                        #[cfg(feature = "metrics")]
2057                        self.metrics.inc_retries_num();
2058                        current_consistency = new_cl.unwrap_or(current_consistency);
2059                        continue 'same_node_retries;
2060                    }
2061                    RetryDecision::RetryNextTarget(new_cl) => {
2062                        #[cfg(feature = "metrics")]
2063                        self.metrics.inc_retries_num();
2064                        current_consistency = new_cl.unwrap_or(current_consistency);
2065                        continue 'nodes_in_plan;
2066                    }
2067                    RetryDecision::DontRetry => break 'nodes_in_plan,
2068
2069                    RetryDecision::IgnoreWriteError => {
2070                        return Some(Ok(RunRequestResult::IgnoredWriteError))
2071                    }
2072                };
2073            }
2074        }
2075
2076        last_error.map(Result::Err)
2077    }
2078
2079    async fn await_schema_agreement_indefinitely(&self) -> Result<Uuid, SchemaAgreementError> {
2080        loop {
2081            tokio::time::sleep(self.schema_agreement_interval).await;
2082            if let Some(agreed_version) = self.check_schema_agreement().await? {
2083                return Ok(agreed_version);
2084            }
2085        }
2086    }
2087
2088    pub async fn await_schema_agreement(&self) -> Result<Uuid, SchemaAgreementError> {
2089        timeout(
2090            self.schema_agreement_timeout,
2091            self.await_schema_agreement_indefinitely(),
2092        )
2093        .await
2094        .unwrap_or(Err(SchemaAgreementError::Timeout(
2095            self.schema_agreement_timeout,
2096        )))
2097    }
2098
2099    pub async fn check_schema_agreement(&self) -> Result<Option<Uuid>, SchemaAgreementError> {
2100        let cluster_state = self.get_cluster_state();
2101        let connections_iter = cluster_state.iter_working_connections()?;
2102
2103        let handles = connections_iter.map(|c| async move { c.fetch_schema_version().await });
2104        let versions = try_join_all(handles).await?;
2105
2106        let local_version: Uuid = versions[0];
2107        let in_agreement = versions.into_iter().all(|v| v == local_version);
2108        Ok(in_agreement.then_some(local_version))
2109    }
2110
2111    /// Retrieves the handle to execution profile that is used by this session
2112    /// by default, i.e. when an executed statement does not define its own handle.
2113    pub fn get_default_execution_profile_handle(&self) -> &ExecutionProfileHandle {
2114        &self.default_execution_profile_handle
2115    }
2116}
2117
2118// run_request, run_request_speculative_fiber, etc have a template type called ResT.
2119// There was a bug where ResT was set to QueryResponse, which could
2120// be an error response. This was not caught by retry policy which
2121// assumed all errors would come from analyzing Result<ResT, ExecutionError>.
2122// This trait is a guard to make sure that this mistake doesn't
2123// happen again.
2124// When using run_request make sure that the ResT type is NOT able
2125// to contain any errors.
2126// See https://github.com/scylladb/scylla-rust-driver/issues/501
2127pub(crate) trait AllowedRunRequestResTType {}
2128
2129impl AllowedRunRequestResTType for Uuid {}
2130impl AllowedRunRequestResTType for QueryResult {}
2131impl AllowedRunRequestResTType for NonErrorQueryResponse {}
2132
2133struct ExecuteRequestContext<'a> {
2134    is_idempotent: bool,
2135    consistency_set_on_statement: Option<Consistency>,
2136    retry_session: Box<dyn RetrySession>,
2137    history_data: Option<HistoryData<'a>>,
2138    query_info: &'a load_balancing::RoutingInfo<'a>,
2139    request_span: &'a RequestSpan,
2140}
2141
2142struct HistoryData<'a> {
2143    listener: &'a dyn HistoryListener,
2144    request_id: history::RequestId,
2145    speculative_id: Option<history::SpeculativeId>,
2146}
2147
2148impl ExecuteRequestContext<'_> {
2149    fn log_attempt_start(&self, node_addr: SocketAddr) -> Option<history::AttemptId> {
2150        self.history_data.as_ref().map(|hd| {
2151            hd.listener
2152                .log_attempt_start(hd.request_id, hd.speculative_id, node_addr)
2153        })
2154    }
2155
2156    fn log_attempt_success(&self, attempt_id_opt: &Option<history::AttemptId>) {
2157        let attempt_id: &history::AttemptId = match attempt_id_opt {
2158            Some(id) => id,
2159            None => return,
2160        };
2161
2162        let history_data: &HistoryData = match &self.history_data {
2163            Some(data) => data,
2164            None => return,
2165        };
2166
2167        history_data.listener.log_attempt_success(*attempt_id);
2168    }
2169
2170    fn log_attempt_error(
2171        &self,
2172        attempt_id_opt: &Option<history::AttemptId>,
2173        error: &RequestAttemptError,
2174        retry_decision: &RetryDecision,
2175    ) {
2176        let attempt_id: &history::AttemptId = match attempt_id_opt {
2177            Some(id) => id,
2178            None => return,
2179        };
2180
2181        let history_data: &HistoryData = match &self.history_data {
2182            Some(data) => data,
2183            None => return,
2184        };
2185
2186        history_data
2187            .listener
2188            .log_attempt_error(*attempt_id, error, retry_decision);
2189    }
2190}