scylla/client/
execution_profile.rs

1//! `ExecutionProfile` is a grouping of configurable options regarding CQL statement execution.
2//!
3//! Profiles can be created to represent different workloads, which thanks to them
4//! can be run conveniently on a single session.
5//!
6//! There are two classes of objects related to execution profiles: `ExecutionProfile` and `ExecutionProfileHandle`.
7//! The former is simply an immutable set of the settings. The latter is a handle that at particular moment points
8//! to some `ExecutionProfile` (but during its lifetime, it can change the profile it points at).
9//! Handles are assigned to `Sessions` and `Statements`.
10//! At any moment, handles point to another `ExecutionProfile`. This allows convenient switching between workloads
11//! for all `Sessions` and/or `Statements` that, for instance, share common characteristics.
12//!
13//! ### Example
14//! To create an `ExecutionProfile` and attach it as default for `Session`:
15//! ```
16//! # extern crate scylla;
17//! # use std::error::Error;
18//! # async fn check_only_compiles() -> Result<(), Box<dyn Error>> {
19//! use scylla::client::session::Session;
20//! use scylla::client::session_builder::SessionBuilder;
21//! use scylla::statement::Consistency;
22//! use scylla::client::execution_profile::ExecutionProfile;
23//!
24//! let profile = ExecutionProfile::builder()
25//!     .consistency(Consistency::LocalOne)
26//!     .request_timeout(None) // no request timeout
27//!     .build();
28//!
29//! let handle = profile.into_handle();
30//!
31//! let session: Session = SessionBuilder::new()
32//!     .known_node("127.0.0.1:9042")
33//!     .default_execution_profile_handle(handle)
34//!     .build()
35//!     .await?;
36//! # Ok(())
37//! # }
38//! ```
39//!
40//! ### Example
41//! To create an [`ExecutionProfile`] and attach it to a [`Statement`](crate::statement::Statement):
42//! ```
43//! # extern crate scylla;
44//! # use std::error::Error;
45//! # async fn check_only_compiles() -> Result<(), Box<dyn Error>> {
46//! use scylla::statement::unprepared::Statement;
47//! use scylla::statement::Consistency;
48//! use scylla::client::execution_profile::ExecutionProfile;
49//! use std::time::Duration;
50//!
51//! let profile = ExecutionProfile::builder()
52//!     .consistency(Consistency::All)
53//!     .request_timeout(Some(Duration::from_secs(30)))
54//!     .build();
55//!
56//! let handle = profile.into_handle();
57//!
58//! let mut query1 = Statement::from("SELECT * FROM ks.table");
59//! query1.set_execution_profile_handle(Some(handle.clone()));
60//!
61//! let mut query2 = Statement::from("SELECT pk FROM ks.table WHERE pk = ?");
62//! query2.set_execution_profile_handle(Some(handle));
63//! # Ok(())
64//! # }
65//! ```
66//!
67//! ### Example
68//! To create an `ExecutionProfile` with config options defaulting
69//! to those set on another profile:
70//! ```
71//! # extern crate scylla;
72//! # use std::error::Error;
73//! # async fn check_only_compiles() -> Result<(), Box<dyn Error>> {
74//! use scylla::statement::Consistency;
75//! use scylla::client::execution_profile::ExecutionProfile;
76//! use std::time::Duration;
77//!
78//! let base_profile = ExecutionProfile::builder()
79//!     .request_timeout(Some(Duration::from_secs(30)))
80//!     .build();
81//!
82//! let profile = base_profile.to_builder()
83//!     .consistency(Consistency::All)
84//!     .build();
85//!
86//! # Ok(())
87//! # }
88//! ```
89//!
90//! `ExecutionProfileHandle`s can be remapped to another `ExecutionProfile`, and the change affects all sessions and statements that have been assigned that handle. This enables quick workload switches.
91//!
92//! Example mapping:
93//! * session1 -> handle1 -> profile1
94//! * statement1 -> handle1 -> profile1
95//! * statement2 -> handle2 -> profile2
96//!
97//! We can now remap handle2 to profile1, so that the mapping for statement2 becomes as follows:
98//! * statement2 -> handle2 -> profile1
99//!
100//! We can also change statement1's handle to handle2, and remap handle1 to profile2, yielding:
101//! * session1 -> handle1 -> profile2
102//! * statement1 -> handle2 -> profile1
103//! * statement2 -> handle2 -> profile1
104//!
105//! As you can see, profiles are a powerful and convenient way to define and modify your workloads.
106//!
107//! ### Example
108//! Below, the remaps described above are followed in code.
109//! ```
110//! # extern crate scylla;
111//! # use std::error::Error;
112//! # async fn check_only_compiles() -> Result<(), Box<dyn Error>> {
113//! use scylla::client::session::Session;
114//! use scylla::client::session_builder::SessionBuilder;
115//! use scylla::statement::unprepared::Statement;
116//! use scylla::statement::Consistency;
117//! use scylla::client::execution_profile::ExecutionProfile;
118//!
119//! let profile1 = ExecutionProfile::builder()
120//!     .consistency(Consistency::One)
121//!     .build();
122//!
123//! let profile2 = ExecutionProfile::builder()
124//!     .consistency(Consistency::Two)
125//!     .build();
126//!
127//! let mut handle1 = profile1.clone().into_handle();
128//! let mut handle2 = profile2.clone().into_handle();
129//!
130//! let session: Session = SessionBuilder::new()
131//!     .known_node("127.0.0.1:9042")
132//!     .default_execution_profile_handle(handle1.clone())
133//!     .build()
134//!     .await?;
135//!
136//! let mut query1 = Statement::from("SELECT * FROM ks.table");
137//! let mut query2 = Statement::from("SELECT pk FROM ks.table WHERE pk = ?");
138//!
139//! query1.set_execution_profile_handle(Some(handle1.clone()));
140//! query2.set_execution_profile_handle(Some(handle2.clone()));
141//!
142//! // session1 -> handle1 -> profile1
143//! //   query1 -> handle1 -> profile1
144//! //   query2 -> handle2 -> profile2
145//!
146//! // We can now remap handle2 to profile1:
147//! handle2.map_to_another_profile(profile1);
148//! // ...so that the mapping for query2 becomes as follows:
149//! // query2 -> handle2 -> profile1
150//!
151//! // We can also change query1's handle to handle2:
152//! query1.set_execution_profile_handle(Some(handle2.clone()));
153//! // ...and remap handle1 to profile2:
154//! handle1.map_to_another_profile(profile2);
155//! // ...yielding:
156//! // session1 -> handle1 -> profile2
157//! //   query1 -> handle2 -> profile1
158//! //   query2 -> handle2 -> profile1
159//!
160//! # Ok(())
161//! # }
162//! ```
163//!
164
165use std::{fmt::Debug, sync::Arc, time::Duration};
166
167use arc_swap::ArcSwap;
168use scylla_cql::{frame::types::SerialConsistency, Consistency};
169
170use crate::policies::load_balancing::LoadBalancingPolicy;
171use crate::policies::retry::RetryPolicy;
172use crate::policies::speculative_execution::SpeculativeExecutionPolicy;
173
174pub(crate) mod defaults {
175    use super::ExecutionProfileInner;
176    use crate::policies::load_balancing::{self, LoadBalancingPolicy};
177    use crate::policies::retry::{DefaultRetryPolicy, RetryPolicy};
178    use crate::policies::speculative_execution::SpeculativeExecutionPolicy;
179    use scylla_cql::frame::types::SerialConsistency;
180    use scylla_cql::Consistency;
181    use std::sync::Arc;
182    use std::time::Duration;
183    pub(crate) fn consistency() -> Consistency {
184        Consistency::LocalQuorum
185    }
186    pub(crate) fn serial_consistency() -> Option<SerialConsistency> {
187        Some(SerialConsistency::LocalSerial)
188    }
189    pub(crate) fn request_timeout() -> Option<Duration> {
190        Some(Duration::from_secs(30))
191    }
192    pub(crate) fn load_balancing_policy() -> Arc<dyn LoadBalancingPolicy> {
193        Arc::new(load_balancing::DefaultPolicy::default())
194    }
195    pub(crate) fn retry_policy() -> Arc<dyn RetryPolicy> {
196        Arc::new(DefaultRetryPolicy::new())
197    }
198    pub(crate) fn speculative_execution_policy() -> Option<Arc<dyn SpeculativeExecutionPolicy>> {
199        None
200    }
201
202    impl Default for ExecutionProfileInner {
203        fn default() -> Self {
204            Self {
205                request_timeout: request_timeout(),
206                consistency: consistency(),
207                serial_consistency: serial_consistency(),
208                load_balancing_policy: load_balancing_policy(),
209                retry_policy: retry_policy(),
210                speculative_execution_policy: speculative_execution_policy(),
211            }
212        }
213    }
214}
215
216/// `ExecutionProfileBuilder` is used to create new `ExecutionProfile`s
217/// # Example
218///
219/// ```
220/// # use scylla::client::execution_profile::ExecutionProfile;
221/// # use scylla::policies::retry::FallthroughRetryPolicy;
222/// # use scylla::statement::Consistency;
223/// # use std::sync::Arc;
224/// # fn example() -> Result<(), Box<dyn std::error::Error>> {
225/// let profile: ExecutionProfile = ExecutionProfile::builder()
226///     .consistency(Consistency::Three) // as this is the number we shall count to
227///     .retry_policy(Arc::new(FallthroughRetryPolicy::new()))
228///     .build();
229/// # Ok(())
230/// # }
231/// ```
232#[derive(Clone, Debug)]
233pub struct ExecutionProfileBuilder {
234    request_timeout: Option<Option<Duration>>,
235    consistency: Option<Consistency>,
236    serial_consistency: Option<Option<SerialConsistency>>,
237    load_balancing_policy: Option<Arc<dyn LoadBalancingPolicy>>,
238    retry_policy: Option<Arc<dyn RetryPolicy>>,
239    speculative_execution_policy: Option<Option<Arc<dyn SpeculativeExecutionPolicy>>>,
240}
241
242impl ExecutionProfileBuilder {
243    /// Changes client-side timeout.
244    /// The default is 30 seconds.
245    ///
246    /// # Example
247    /// ```
248    /// # use scylla::client::execution_profile::ExecutionProfile;
249    /// # use std::time::Duration;
250    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
251    /// let profile: ExecutionProfile = ExecutionProfile::builder()
252    ///     .request_timeout(Some(Duration::from_secs(5)))
253    ///     .build();
254    /// # Ok(())
255    /// # }
256    /// ```
257    pub fn request_timeout(mut self, timeout: Option<Duration>) -> Self {
258        self.request_timeout = Some(timeout);
259        self
260    }
261
262    /// Specify a default consistency to be used for statement executions.
263    /// It's possible to override it by explicitly setting a consistency on the chosen query.
264    pub fn consistency(mut self, consistency: Consistency) -> Self {
265        self.consistency = Some(consistency);
266        self
267    }
268
269    /// Specify a default serial consistency to be used for statement executions.
270    /// It's possible to override it by explicitly setting a serial consistency
271    /// on the chosen statement.
272    pub fn serial_consistency(mut self, serial_consistency: Option<SerialConsistency>) -> Self {
273        self.serial_consistency = Some(serial_consistency);
274        self
275    }
276
277    /// Sets the load balancing policy.
278    /// The default is DefaultPolicy.
279    ///
280    /// # Example
281    /// ```
282    /// # use scylla::client::execution_profile::ExecutionProfile;
283    /// # use scylla::policies::load_balancing::DefaultPolicy;
284    /// # use std::sync::Arc;
285    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
286    /// let profile: ExecutionProfile = ExecutionProfile::builder()
287    ///     .load_balancing_policy(Arc::new(DefaultPolicy::default()))
288    ///     .build();
289    /// # Ok(())
290    /// # }
291    /// ```
292    pub fn load_balancing_policy(
293        mut self,
294        load_balancing_policy: Arc<dyn LoadBalancingPolicy>,
295    ) -> Self {
296        self.load_balancing_policy = Some(load_balancing_policy);
297        self
298    }
299
300    /// Sets the [`RetryPolicy`] to use by default on statements.
301    /// The default is [DefaultRetryPolicy](crate::policies::retry::DefaultRetryPolicy).
302    /// It is possible to implement a custom retry policy by implementing the trait [`RetryPolicy`].
303    ///
304    /// # Example
305    /// ```
306    /// # use scylla::client::execution_profile::ExecutionProfile;
307    /// # use scylla::policies::retry::DefaultRetryPolicy;
308    /// # use std::sync::Arc;
309    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
310    /// let profile: ExecutionProfile = ExecutionProfile::builder()
311    ///     .retry_policy(Arc::new(DefaultRetryPolicy::new()))
312    ///     .build();
313    /// # Ok(())
314    /// # }
315    /// ```
316    pub fn retry_policy(mut self, retry_policy: Arc<dyn RetryPolicy>) -> Self {
317        self.retry_policy = Some(retry_policy);
318        self
319    }
320
321    /// Sets the speculative execution policy.
322    /// The default is None.
323    /// # Example
324    /// ```
325    /// # extern crate scylla;
326    /// # use std::error::Error;
327    /// # fn check_only_compiles() -> Result<(), Box<dyn Error>> {
328    /// use std::{sync::Arc, time::Duration};
329    /// use scylla::{
330    ///     client::execution_profile::ExecutionProfile,
331    ///     policies::speculative_execution::SimpleSpeculativeExecutionPolicy,
332    /// };
333    ///
334    /// let policy = SimpleSpeculativeExecutionPolicy {
335    ///     max_retry_count: 3,
336    ///     retry_interval: Duration::from_millis(100),
337    /// };
338    ///
339    /// let profile: ExecutionProfile = ExecutionProfile::builder()
340    ///     .speculative_execution_policy(Some(Arc::new(policy)))
341    ///     .build();
342    /// # Ok(())
343    /// # }
344    /// ```
345    pub fn speculative_execution_policy(
346        mut self,
347        speculative_execution_policy: Option<Arc<dyn SpeculativeExecutionPolicy>>,
348    ) -> Self {
349        self.speculative_execution_policy = Some(speculative_execution_policy);
350        self
351    }
352
353    /// Builds the ExecutionProfile after setting all the options.
354    ///
355    /// # Example
356    /// ```
357    /// # use scylla::client::execution_profile::ExecutionProfile;
358    /// # use scylla::policies::retry::DefaultRetryPolicy;
359    /// # use std::sync::Arc;
360    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
361    /// let profile: ExecutionProfile = ExecutionProfile::builder()
362    ///     .retry_policy(Arc::new(DefaultRetryPolicy::new()))
363    ///     .build();
364    /// # Ok(())
365    /// # }
366    /// ```
367    pub fn build(self) -> ExecutionProfile {
368        ExecutionProfile(Arc::new(ExecutionProfileInner {
369            request_timeout: self
370                .request_timeout
371                .unwrap_or_else(defaults::request_timeout),
372            consistency: self.consistency.unwrap_or_else(defaults::consistency),
373            serial_consistency: self
374                .serial_consistency
375                .unwrap_or_else(defaults::serial_consistency),
376            load_balancing_policy: self
377                .load_balancing_policy
378                .unwrap_or_else(defaults::load_balancing_policy),
379            retry_policy: self.retry_policy.unwrap_or_else(defaults::retry_policy),
380            speculative_execution_policy: self
381                .speculative_execution_policy
382                .unwrap_or_else(defaults::speculative_execution_policy),
383        }))
384    }
385}
386
387impl Default for ExecutionProfileBuilder {
388    fn default() -> Self {
389        ExecutionProfile::builder()
390    }
391}
392
393/// A profile that groups configurable options regarding statement execution.
394///
395/// Execution profile is immutable as such, but the driver implements double indirection of form:
396/// statement/Session -> [`ExecutionProfileHandle`] -> [`ExecutionProfile`]
397/// which enables on-fly changing the actual profile associated with all entities (statements/Session)
398/// by the same handle.
399#[derive(Debug, Clone)]
400pub struct ExecutionProfile(pub(crate) Arc<ExecutionProfileInner>);
401
402#[derive(Debug)]
403pub(crate) struct ExecutionProfileInner {
404    pub(crate) request_timeout: Option<Duration>,
405
406    pub(crate) consistency: Consistency,
407    pub(crate) serial_consistency: Option<SerialConsistency>,
408
409    pub(crate) load_balancing_policy: Arc<dyn LoadBalancingPolicy>,
410    pub(crate) retry_policy: Arc<dyn RetryPolicy>,
411    pub(crate) speculative_execution_policy: Option<Arc<dyn SpeculativeExecutionPolicy>>,
412}
413
414impl ExecutionProfileInner {
415    /// Creates a builder having all options set to the same as set in this ExecutionProfileInner.
416    pub(crate) fn to_builder(&self) -> ExecutionProfileBuilder {
417        ExecutionProfileBuilder {
418            request_timeout: Some(self.request_timeout),
419            consistency: Some(self.consistency),
420            serial_consistency: Some(self.serial_consistency),
421            load_balancing_policy: Some(self.load_balancing_policy.clone()),
422            retry_policy: Some(self.retry_policy.clone()),
423            speculative_execution_policy: Some(self.speculative_execution_policy.clone()),
424        }
425    }
426}
427
428impl ExecutionProfile {
429    pub(crate) fn new_from_inner(inner: ExecutionProfileInner) -> Self {
430        Self(Arc::new(inner))
431    }
432
433    /// Creates a blank builder that can be used to construct new ExecutionProfile.
434    pub fn builder() -> ExecutionProfileBuilder {
435        ExecutionProfileBuilder {
436            request_timeout: None,
437            consistency: None,
438            serial_consistency: None,
439            load_balancing_policy: None,
440            retry_policy: None,
441            speculative_execution_policy: None,
442        }
443    }
444
445    /// Creates a builder having all options set to the same as set in this ExecutionProfile.
446    pub fn to_builder(&self) -> ExecutionProfileBuilder {
447        self.0.to_builder()
448    }
449
450    /// Returns a new handle to this ExecutionProfile.
451    pub fn into_handle(self) -> ExecutionProfileHandle {
452        ExecutionProfileHandle(Arc::new((ArcSwap::new(self.0), None)))
453    }
454
455    /// Returns a new handle to this ExecutionProfile, tagging the handle with provided label.
456    /// The tag, as its name suggests, is only useful for debugging purposes, while being confused
457    /// about which statement/session is assigned which handle. Identifying handles with tags
458    /// could then help.
459    pub fn into_handle_with_label(self, label: String) -> ExecutionProfileHandle {
460        ExecutionProfileHandle(Arc::new((ArcSwap::new(self.0), Some(label))))
461    }
462
463    /// Gets client timeout associated with this profile.
464    pub fn get_request_timeout(&self) -> Option<Duration> {
465        self.0.request_timeout
466    }
467
468    /// Gets consistency associated with this profile.
469    pub fn get_consistency(&self) -> Consistency {
470        self.0.consistency
471    }
472
473    /// Gets serial consistency (if set) associated with this profile.
474    pub fn get_serial_consistency(&self) -> Option<SerialConsistency> {
475        self.0.serial_consistency
476    }
477
478    /// Gets load balancing policy associated with this profile.
479    pub fn get_load_balancing_policy(&self) -> &Arc<dyn LoadBalancingPolicy> {
480        &self.0.load_balancing_policy
481    }
482
483    /// Gets retry policy associated with this profile.
484    pub fn get_retry_policy(&self) -> &Arc<dyn RetryPolicy> {
485        &self.0.retry_policy
486    }
487
488    /// Gets speculative execution policy associated with this profile.
489    pub fn get_speculative_execution_policy(&self) -> Option<&Arc<dyn SpeculativeExecutionPolicy>> {
490        self.0.speculative_execution_policy.as_ref()
491    }
492}
493
494/// A handle that points to an ExecutionProfile.
495///
496/// Its goal is to enable remapping all associated entities (statement/Session)
497/// to another execution profile at once.
498/// Note: Cloned handles initially point to the same Arc'ed execution profile.
499/// However, as the mapping has yet another level of indirection - through
500/// `Arc<ArcSwap>` - remapping one of them affects all the others, as under the hood
501/// it is done by replacing the Arc held by the ArcSwap, which is shared
502/// by all cloned handles.
503/// The optional String is just for debug purposes. Its purpose is described
504/// in [ExecutionProfile::into_handle_with_label].
505#[derive(Debug, Clone)]
506pub struct ExecutionProfileHandle(Arc<(ArcSwap<ExecutionProfileInner>, Option<String>)>);
507
508impl ExecutionProfileHandle {
509    pub(crate) fn access(&self) -> Arc<ExecutionProfileInner> {
510        self.0 .0.load_full()
511    }
512
513    /// Creates a builder having all options set to the same as set in the ExecutionProfile pointed by this handle.
514    pub fn pointee_to_builder(&self) -> ExecutionProfileBuilder {
515        self.0 .0.load().to_builder()
516    }
517
518    /// Returns execution profile pointed by this handle.
519    pub fn to_profile(&self) -> ExecutionProfile {
520        ExecutionProfile(self.access())
521    }
522
523    /// Makes the handle point to a new execution profile.
524    /// All entities (statements/Session) holding this handle will reflect the change.
525    pub fn map_to_another_profile(&mut self, profile: ExecutionProfile) {
526        self.0 .0.store(profile.0)
527    }
528}