scylla/client/execution_profile.rs
1//! `ExecutionProfile` is a grouping of configurable options regarding CQL statement execution.
2//!
3//! Profiles can be created to represent different workloads, which thanks to them
4//! can be run conveniently on a single session.
5//!
6//! There are two classes of objects related to execution profiles: `ExecutionProfile` and `ExecutionProfileHandle`.
7//! The former is simply an immutable set of the settings. The latter is a handle that at particular moment points
8//! to some `ExecutionProfile` (but during its lifetime, it can change the profile it points at).
9//! Handles are assigned to `Sessions` and `Statements`.
10//! At any moment, handles point to another `ExecutionProfile`. This allows convenient switching between workloads
11//! for all `Sessions` and/or `Statements` that, for instance, share common characteristics.
12//!
13//! ### Example
14//! To create an `ExecutionProfile` and attach it as default for `Session`:
15//! ```
16//! # extern crate scylla;
17//! # use std::error::Error;
18//! # async fn check_only_compiles() -> Result<(), Box<dyn Error>> {
19//! use scylla::client::session::Session;
20//! use scylla::client::session_builder::SessionBuilder;
21//! use scylla::statement::Consistency;
22//! use scylla::client::execution_profile::ExecutionProfile;
23//!
24//! let profile = ExecutionProfile::builder()
25//! .consistency(Consistency::LocalOne)
26//! .request_timeout(None) // no request timeout
27//! .build();
28//!
29//! let handle = profile.into_handle();
30//!
31//! let session: Session = SessionBuilder::new()
32//! .known_node("127.0.0.1:9042")
33//! .default_execution_profile_handle(handle)
34//! .build()
35//! .await?;
36//! # Ok(())
37//! # }
38//! ```
39//!
40//! ### Example
41//! To create an [`ExecutionProfile`] and attach it to a [`Statement`](crate::statement::Statement):
42//! ```
43//! # extern crate scylla;
44//! # use std::error::Error;
45//! # async fn check_only_compiles() -> Result<(), Box<dyn Error>> {
46//! use scylla::statement::unprepared::Statement;
47//! use scylla::statement::Consistency;
48//! use scylla::client::execution_profile::ExecutionProfile;
49//! use std::time::Duration;
50//!
51//! let profile = ExecutionProfile::builder()
52//! .consistency(Consistency::All)
53//! .request_timeout(Some(Duration::from_secs(30)))
54//! .build();
55//!
56//! let handle = profile.into_handle();
57//!
58//! let mut query1 = Statement::from("SELECT * FROM ks.table");
59//! query1.set_execution_profile_handle(Some(handle.clone()));
60//!
61//! let mut query2 = Statement::from("SELECT pk FROM ks.table WHERE pk = ?");
62//! query2.set_execution_profile_handle(Some(handle));
63//! # Ok(())
64//! # }
65//! ```
66//!
67//! ### Example
68//! To create an `ExecutionProfile` with config options defaulting
69//! to those set on another profile:
70//! ```
71//! # extern crate scylla;
72//! # use std::error::Error;
73//! # async fn check_only_compiles() -> Result<(), Box<dyn Error>> {
74//! use scylla::statement::Consistency;
75//! use scylla::client::execution_profile::ExecutionProfile;
76//! use std::time::Duration;
77//!
78//! let base_profile = ExecutionProfile::builder()
79//! .request_timeout(Some(Duration::from_secs(30)))
80//! .build();
81//!
82//! let profile = base_profile.to_builder()
83//! .consistency(Consistency::All)
84//! .build();
85//!
86//! # Ok(())
87//! # }
88//! ```
89//!
90//! `ExecutionProfileHandle`s can be remapped to another `ExecutionProfile`, and the change affects all sessions and statements that have been assigned that handle. This enables quick workload switches.
91//!
92//! Example mapping:
93//! * session1 -> handle1 -> profile1
94//! * statement1 -> handle1 -> profile1
95//! * statement2 -> handle2 -> profile2
96//!
97//! We can now remap handle2 to profile1, so that the mapping for statement2 becomes as follows:
98//! * statement2 -> handle2 -> profile1
99//!
100//! We can also change statement1's handle to handle2, and remap handle1 to profile2, yielding:
101//! * session1 -> handle1 -> profile2
102//! * statement1 -> handle2 -> profile1
103//! * statement2 -> handle2 -> profile1
104//!
105//! As you can see, profiles are a powerful and convenient way to define and modify your workloads.
106//!
107//! ### Example
108//! Below, the remaps described above are followed in code.
109//! ```
110//! # extern crate scylla;
111//! # use std::error::Error;
112//! # async fn check_only_compiles() -> Result<(), Box<dyn Error>> {
113//! use scylla::client::session::Session;
114//! use scylla::client::session_builder::SessionBuilder;
115//! use scylla::statement::unprepared::Statement;
116//! use scylla::statement::Consistency;
117//! use scylla::client::execution_profile::ExecutionProfile;
118//!
119//! let profile1 = ExecutionProfile::builder()
120//! .consistency(Consistency::One)
121//! .build();
122//!
123//! let profile2 = ExecutionProfile::builder()
124//! .consistency(Consistency::Two)
125//! .build();
126//!
127//! let mut handle1 = profile1.clone().into_handle();
128//! let mut handle2 = profile2.clone().into_handle();
129//!
130//! let session: Session = SessionBuilder::new()
131//! .known_node("127.0.0.1:9042")
132//! .default_execution_profile_handle(handle1.clone())
133//! .build()
134//! .await?;
135//!
136//! let mut query1 = Statement::from("SELECT * FROM ks.table");
137//! let mut query2 = Statement::from("SELECT pk FROM ks.table WHERE pk = ?");
138//!
139//! query1.set_execution_profile_handle(Some(handle1.clone()));
140//! query2.set_execution_profile_handle(Some(handle2.clone()));
141//!
142//! // session1 -> handle1 -> profile1
143//! // query1 -> handle1 -> profile1
144//! // query2 -> handle2 -> profile2
145//!
146//! // We can now remap handle2 to profile1:
147//! handle2.map_to_another_profile(profile1);
148//! // ...so that the mapping for query2 becomes as follows:
149//! // query2 -> handle2 -> profile1
150//!
151//! // We can also change query1's handle to handle2:
152//! query1.set_execution_profile_handle(Some(handle2.clone()));
153//! // ...and remap handle1 to profile2:
154//! handle1.map_to_another_profile(profile2);
155//! // ...yielding:
156//! // session1 -> handle1 -> profile2
157//! // query1 -> handle2 -> profile1
158//! // query2 -> handle2 -> profile1
159//!
160//! # Ok(())
161//! # }
162//! ```
163//!
164
165use std::{fmt::Debug, sync::Arc, time::Duration};
166
167use arc_swap::ArcSwap;
168use scylla_cql::{frame::types::SerialConsistency, Consistency};
169
170use crate::policies::load_balancing::LoadBalancingPolicy;
171use crate::policies::retry::RetryPolicy;
172use crate::policies::speculative_execution::SpeculativeExecutionPolicy;
173
174pub(crate) mod defaults {
175 use super::ExecutionProfileInner;
176 use crate::policies::load_balancing::{self, LoadBalancingPolicy};
177 use crate::policies::retry::{DefaultRetryPolicy, RetryPolicy};
178 use crate::policies::speculative_execution::SpeculativeExecutionPolicy;
179 use scylla_cql::frame::types::SerialConsistency;
180 use scylla_cql::Consistency;
181 use std::sync::Arc;
182 use std::time::Duration;
183 pub(crate) fn consistency() -> Consistency {
184 Consistency::LocalQuorum
185 }
186 pub(crate) fn serial_consistency() -> Option<SerialConsistency> {
187 Some(SerialConsistency::LocalSerial)
188 }
189 pub(crate) fn request_timeout() -> Option<Duration> {
190 Some(Duration::from_secs(30))
191 }
192 pub(crate) fn load_balancing_policy() -> Arc<dyn LoadBalancingPolicy> {
193 Arc::new(load_balancing::DefaultPolicy::default())
194 }
195 pub(crate) fn retry_policy() -> Arc<dyn RetryPolicy> {
196 Arc::new(DefaultRetryPolicy::new())
197 }
198 pub(crate) fn speculative_execution_policy() -> Option<Arc<dyn SpeculativeExecutionPolicy>> {
199 None
200 }
201
202 impl Default for ExecutionProfileInner {
203 fn default() -> Self {
204 Self {
205 request_timeout: request_timeout(),
206 consistency: consistency(),
207 serial_consistency: serial_consistency(),
208 load_balancing_policy: load_balancing_policy(),
209 retry_policy: retry_policy(),
210 speculative_execution_policy: speculative_execution_policy(),
211 }
212 }
213 }
214}
215
216/// `ExecutionProfileBuilder` is used to create new `ExecutionProfile`s
217/// # Example
218///
219/// ```
220/// # use scylla::client::execution_profile::ExecutionProfile;
221/// # use scylla::policies::retry::FallthroughRetryPolicy;
222/// # use scylla::statement::Consistency;
223/// # use std::sync::Arc;
224/// # fn example() -> Result<(), Box<dyn std::error::Error>> {
225/// let profile: ExecutionProfile = ExecutionProfile::builder()
226/// .consistency(Consistency::Three) // as this is the number we shall count to
227/// .retry_policy(Arc::new(FallthroughRetryPolicy::new()))
228/// .build();
229/// # Ok(())
230/// # }
231/// ```
232#[derive(Clone, Debug)]
233pub struct ExecutionProfileBuilder {
234 request_timeout: Option<Option<Duration>>,
235 consistency: Option<Consistency>,
236 serial_consistency: Option<Option<SerialConsistency>>,
237 load_balancing_policy: Option<Arc<dyn LoadBalancingPolicy>>,
238 retry_policy: Option<Arc<dyn RetryPolicy>>,
239 speculative_execution_policy: Option<Option<Arc<dyn SpeculativeExecutionPolicy>>>,
240}
241
242impl ExecutionProfileBuilder {
243 /// Changes client-side timeout.
244 /// The default is 30 seconds.
245 ///
246 /// # Example
247 /// ```
248 /// # use scylla::client::execution_profile::ExecutionProfile;
249 /// # use std::time::Duration;
250 /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
251 /// let profile: ExecutionProfile = ExecutionProfile::builder()
252 /// .request_timeout(Some(Duration::from_secs(5)))
253 /// .build();
254 /// # Ok(())
255 /// # }
256 /// ```
257 pub fn request_timeout(mut self, timeout: Option<Duration>) -> Self {
258 self.request_timeout = Some(timeout);
259 self
260 }
261
262 /// Specify a default consistency to be used for statement executions.
263 /// It's possible to override it by explicitly setting a consistency on the chosen query.
264 pub fn consistency(mut self, consistency: Consistency) -> Self {
265 self.consistency = Some(consistency);
266 self
267 }
268
269 /// Specify a default serial consistency to be used for statement executions.
270 /// It's possible to override it by explicitly setting a serial consistency
271 /// on the chosen statement.
272 pub fn serial_consistency(mut self, serial_consistency: Option<SerialConsistency>) -> Self {
273 self.serial_consistency = Some(serial_consistency);
274 self
275 }
276
277 /// Sets the load balancing policy.
278 /// The default is DefaultPolicy.
279 ///
280 /// # Example
281 /// ```
282 /// # use scylla::client::execution_profile::ExecutionProfile;
283 /// # use scylla::policies::load_balancing::DefaultPolicy;
284 /// # use std::sync::Arc;
285 /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
286 /// let profile: ExecutionProfile = ExecutionProfile::builder()
287 /// .load_balancing_policy(Arc::new(DefaultPolicy::default()))
288 /// .build();
289 /// # Ok(())
290 /// # }
291 /// ```
292 pub fn load_balancing_policy(
293 mut self,
294 load_balancing_policy: Arc<dyn LoadBalancingPolicy>,
295 ) -> Self {
296 self.load_balancing_policy = Some(load_balancing_policy);
297 self
298 }
299
300 /// Sets the [`RetryPolicy`] to use by default on statements.
301 /// The default is [DefaultRetryPolicy](crate::policies::retry::DefaultRetryPolicy).
302 /// It is possible to implement a custom retry policy by implementing the trait [`RetryPolicy`].
303 ///
304 /// # Example
305 /// ```
306 /// # use scylla::client::execution_profile::ExecutionProfile;
307 /// # use scylla::policies::retry::DefaultRetryPolicy;
308 /// # use std::sync::Arc;
309 /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
310 /// let profile: ExecutionProfile = ExecutionProfile::builder()
311 /// .retry_policy(Arc::new(DefaultRetryPolicy::new()))
312 /// .build();
313 /// # Ok(())
314 /// # }
315 /// ```
316 pub fn retry_policy(mut self, retry_policy: Arc<dyn RetryPolicy>) -> Self {
317 self.retry_policy = Some(retry_policy);
318 self
319 }
320
321 /// Sets the speculative execution policy.
322 /// The default is None.
323 /// # Example
324 /// ```
325 /// # extern crate scylla;
326 /// # use std::error::Error;
327 /// # fn check_only_compiles() -> Result<(), Box<dyn Error>> {
328 /// use std::{sync::Arc, time::Duration};
329 /// use scylla::{
330 /// client::execution_profile::ExecutionProfile,
331 /// policies::speculative_execution::SimpleSpeculativeExecutionPolicy,
332 /// };
333 ///
334 /// let policy = SimpleSpeculativeExecutionPolicy {
335 /// max_retry_count: 3,
336 /// retry_interval: Duration::from_millis(100),
337 /// };
338 ///
339 /// let profile: ExecutionProfile = ExecutionProfile::builder()
340 /// .speculative_execution_policy(Some(Arc::new(policy)))
341 /// .build();
342 /// # Ok(())
343 /// # }
344 /// ```
345 pub fn speculative_execution_policy(
346 mut self,
347 speculative_execution_policy: Option<Arc<dyn SpeculativeExecutionPolicy>>,
348 ) -> Self {
349 self.speculative_execution_policy = Some(speculative_execution_policy);
350 self
351 }
352
353 /// Builds the ExecutionProfile after setting all the options.
354 ///
355 /// # Example
356 /// ```
357 /// # use scylla::client::execution_profile::ExecutionProfile;
358 /// # use scylla::policies::retry::DefaultRetryPolicy;
359 /// # use std::sync::Arc;
360 /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
361 /// let profile: ExecutionProfile = ExecutionProfile::builder()
362 /// .retry_policy(Arc::new(DefaultRetryPolicy::new()))
363 /// .build();
364 /// # Ok(())
365 /// # }
366 /// ```
367 pub fn build(self) -> ExecutionProfile {
368 ExecutionProfile(Arc::new(ExecutionProfileInner {
369 request_timeout: self
370 .request_timeout
371 .unwrap_or_else(defaults::request_timeout),
372 consistency: self.consistency.unwrap_or_else(defaults::consistency),
373 serial_consistency: self
374 .serial_consistency
375 .unwrap_or_else(defaults::serial_consistency),
376 load_balancing_policy: self
377 .load_balancing_policy
378 .unwrap_or_else(defaults::load_balancing_policy),
379 retry_policy: self.retry_policy.unwrap_or_else(defaults::retry_policy),
380 speculative_execution_policy: self
381 .speculative_execution_policy
382 .unwrap_or_else(defaults::speculative_execution_policy),
383 }))
384 }
385}
386
387impl Default for ExecutionProfileBuilder {
388 fn default() -> Self {
389 ExecutionProfile::builder()
390 }
391}
392
393/// A profile that groups configurable options regarding statement execution.
394///
395/// Execution profile is immutable as such, but the driver implements double indirection of form:
396/// statement/Session -> [`ExecutionProfileHandle`] -> [`ExecutionProfile`]
397/// which enables on-fly changing the actual profile associated with all entities (statements/Session)
398/// by the same handle.
399#[derive(Debug, Clone)]
400pub struct ExecutionProfile(pub(crate) Arc<ExecutionProfileInner>);
401
402#[derive(Debug)]
403pub(crate) struct ExecutionProfileInner {
404 pub(crate) request_timeout: Option<Duration>,
405
406 pub(crate) consistency: Consistency,
407 pub(crate) serial_consistency: Option<SerialConsistency>,
408
409 pub(crate) load_balancing_policy: Arc<dyn LoadBalancingPolicy>,
410 pub(crate) retry_policy: Arc<dyn RetryPolicy>,
411 pub(crate) speculative_execution_policy: Option<Arc<dyn SpeculativeExecutionPolicy>>,
412}
413
414impl ExecutionProfileInner {
415 /// Creates a builder having all options set to the same as set in this ExecutionProfileInner.
416 pub(crate) fn to_builder(&self) -> ExecutionProfileBuilder {
417 ExecutionProfileBuilder {
418 request_timeout: Some(self.request_timeout),
419 consistency: Some(self.consistency),
420 serial_consistency: Some(self.serial_consistency),
421 load_balancing_policy: Some(self.load_balancing_policy.clone()),
422 retry_policy: Some(self.retry_policy.clone()),
423 speculative_execution_policy: Some(self.speculative_execution_policy.clone()),
424 }
425 }
426}
427
428impl ExecutionProfile {
429 pub(crate) fn new_from_inner(inner: ExecutionProfileInner) -> Self {
430 Self(Arc::new(inner))
431 }
432
433 /// Creates a blank builder that can be used to construct new ExecutionProfile.
434 pub fn builder() -> ExecutionProfileBuilder {
435 ExecutionProfileBuilder {
436 request_timeout: None,
437 consistency: None,
438 serial_consistency: None,
439 load_balancing_policy: None,
440 retry_policy: None,
441 speculative_execution_policy: None,
442 }
443 }
444
445 /// Creates a builder having all options set to the same as set in this ExecutionProfile.
446 pub fn to_builder(&self) -> ExecutionProfileBuilder {
447 self.0.to_builder()
448 }
449
450 /// Returns a new handle to this ExecutionProfile.
451 pub fn into_handle(self) -> ExecutionProfileHandle {
452 ExecutionProfileHandle(Arc::new((ArcSwap::new(self.0), None)))
453 }
454
455 /// Returns a new handle to this ExecutionProfile, tagging the handle with provided label.
456 /// The tag, as its name suggests, is only useful for debugging purposes, while being confused
457 /// about which statement/session is assigned which handle. Identifying handles with tags
458 /// could then help.
459 pub fn into_handle_with_label(self, label: String) -> ExecutionProfileHandle {
460 ExecutionProfileHandle(Arc::new((ArcSwap::new(self.0), Some(label))))
461 }
462
463 /// Gets client timeout associated with this profile.
464 pub fn get_request_timeout(&self) -> Option<Duration> {
465 self.0.request_timeout
466 }
467
468 /// Gets consistency associated with this profile.
469 pub fn get_consistency(&self) -> Consistency {
470 self.0.consistency
471 }
472
473 /// Gets serial consistency (if set) associated with this profile.
474 pub fn get_serial_consistency(&self) -> Option<SerialConsistency> {
475 self.0.serial_consistency
476 }
477
478 /// Gets load balancing policy associated with this profile.
479 pub fn get_load_balancing_policy(&self) -> &Arc<dyn LoadBalancingPolicy> {
480 &self.0.load_balancing_policy
481 }
482
483 /// Gets retry policy associated with this profile.
484 pub fn get_retry_policy(&self) -> &Arc<dyn RetryPolicy> {
485 &self.0.retry_policy
486 }
487
488 /// Gets speculative execution policy associated with this profile.
489 pub fn get_speculative_execution_policy(&self) -> Option<&Arc<dyn SpeculativeExecutionPolicy>> {
490 self.0.speculative_execution_policy.as_ref()
491 }
492}
493
494/// A handle that points to an ExecutionProfile.
495///
496/// Its goal is to enable remapping all associated entities (statement/Session)
497/// to another execution profile at once.
498/// Note: Cloned handles initially point to the same Arc'ed execution profile.
499/// However, as the mapping has yet another level of indirection - through
500/// `Arc<ArcSwap>` - remapping one of them affects all the others, as under the hood
501/// it is done by replacing the Arc held by the ArcSwap, which is shared
502/// by all cloned handles.
503/// The optional String is just for debug purposes. Its purpose is described
504/// in [ExecutionProfile::into_handle_with_label].
505#[derive(Debug, Clone)]
506pub struct ExecutionProfileHandle(Arc<(ArcSwap<ExecutionProfileInner>, Option<String>)>);
507
508impl ExecutionProfileHandle {
509 pub(crate) fn access(&self) -> Arc<ExecutionProfileInner> {
510 self.0 .0.load_full()
511 }
512
513 /// Creates a builder having all options set to the same as set in the ExecutionProfile pointed by this handle.
514 pub fn pointee_to_builder(&self) -> ExecutionProfileBuilder {
515 self.0 .0.load().to_builder()
516 }
517
518 /// Returns execution profile pointed by this handle.
519 pub fn to_profile(&self) -> ExecutionProfile {
520 ExecutionProfile(self.access())
521 }
522
523 /// Makes the handle point to a new execution profile.
524 /// All entities (statements/Session) holding this handle will reflect the change.
525 pub fn map_to_another_profile(&mut self, profile: ExecutionProfile) {
526 self.0 .0.store(profile.0)
527 }
528}