scylla/policies/
timestamp_generator.rs

1use std::{
2    sync::atomic::AtomicI64,
3    time::{SystemTime, UNIX_EPOCH},
4};
5
6use std::sync::atomic::Ordering;
7use std::sync::Mutex;
8use tokio::time::{Duration, Instant};
9use tracing::warn;
10
11/// Trait used to represent a timestamp generator
12pub trait TimestampGenerator: Send + Sync {
13    /// This generates a new timestamp
14    fn next_timestamp(&self) -> i64;
15}
16
17/// Basic timestamp generator. Provides no guarantees, if system clock returns
18/// time before UNIX epoch it panics.
19#[derive(Default)]
20pub struct SimpleTimestampGenerator {}
21
22impl SimpleTimestampGenerator {
23    pub fn new() -> Self {
24        SimpleTimestampGenerator {}
25    }
26}
27
28impl TimestampGenerator for SimpleTimestampGenerator {
29    fn next_timestamp(&self) -> i64 {
30        SystemTime::now()
31            .duration_since(UNIX_EPOCH)
32            .unwrap()
33            .as_micros() as i64
34    }
35}
36
37/// Warning configuration for MonotonicTimestampGenerator
38struct MonotonicTimestampGeneratorWarningsCfg {
39    warning_threshold: Duration,
40    warning_interval: Duration,
41}
42
43/// Monotonic timestamp generator. Guarantees monotonicity of timestamps.
44/// If system clock will not provide an increased timestamp, then the timestamp will
45/// be artificially increased. If the config is provided and the clock skew is bigger than
46/// warning_threshold (by default 1 second), then the user will be warned about
47/// the skew repeatedly, with warning_interval provided in the settings (by default 1 second).
48/// Remember that this generator only guarantees monotonicity within one instance of this struct!
49/// If you create multiple instances the monotonicity guarantee becomes void.
50pub struct MonotonicTimestampGenerator {
51    last: AtomicI64,
52    last_warning: Mutex<Instant>,
53    config: Option<MonotonicTimestampGeneratorWarningsCfg>,
54}
55
56impl MonotonicTimestampGenerator {
57    /// Creates a new monotonic timestamp generator with default settings
58    pub fn new() -> Self {
59        MonotonicTimestampGenerator {
60            last: AtomicI64::new(0),
61            last_warning: Mutex::new(Instant::now()),
62            config: Some(MonotonicTimestampGeneratorWarningsCfg {
63                warning_threshold: Duration::from_secs(1),
64                warning_interval: Duration::from_secs(1),
65            }),
66        }
67    }
68
69    pub fn with_warning_times(
70        mut self,
71        warning_threshold: Duration,
72        warning_interval: Duration,
73    ) -> Self {
74        self.config = Some(MonotonicTimestampGeneratorWarningsCfg {
75            warning_threshold,
76            warning_interval,
77        });
78        self
79    }
80
81    pub fn without_warnings(mut self) -> Self {
82        self.config = None;
83        self
84    }
85
86    // This is guaranteed to return a monotonic timestamp. If clock skew is detected
87    // then this method will increment the last timestamp.
88    fn compute_next(&self, last: i64) -> i64 {
89        let current = SystemTime::now().duration_since(UNIX_EPOCH);
90        if let Ok(cur_time) = current {
91            // We have generated a valid timestamp
92            let u_cur = cur_time.as_micros() as i64;
93            if u_cur > last {
94                // We have generated a valid, monotonic timestamp
95                return u_cur;
96            } else if let Some(cfg) = self.config.as_ref() {
97                // We have detected clock skew, we will increment the last timestamp, and check if we should warn the user
98                if last - u_cur > cfg.warning_threshold.as_micros() as i64 {
99                    // We have detected a clock skew bigger than the threshold, we check if we warned the user recently
100                    let mut last_warn = self.last_warning.lock().unwrap();
101                    let now = Instant::now();
102                    if now >= last_warn.checked_add(cfg.warning_interval).unwrap() {
103                        // We have not warned the user recently, we will warn the user
104                        *last_warn = now;
105                        drop(last_warn);
106                        warn!(
107                            "Clock skew detected. The current time ({}) was {} \
108                    microseconds behind the last generated timestamp ({}). \
109                    The next generated timestamp will be artificially incremented \
110                    to guarantee monotonicity.",
111                            u_cur,
112                            last - u_cur,
113                            last
114                        )
115                    }
116                }
117            }
118        } else {
119            // We have generated a timestamp before UNIX epoch, we will warn the user and increment the last timestamp
120            warn!("Clock skew detected. The current time was behind UNIX epoch.");
121        }
122
123        last + 1
124    }
125}
126
127impl Default for MonotonicTimestampGenerator {
128    fn default() -> Self {
129        Self::new()
130    }
131}
132
133impl TimestampGenerator for MonotonicTimestampGenerator {
134    fn next_timestamp(&self) -> i64 {
135        loop {
136            let last = self.last.load(Ordering::SeqCst);
137            let cur = self.compute_next(last);
138            if self
139                .last
140                .compare_exchange(last, cur, Ordering::SeqCst, Ordering::SeqCst)
141                .is_ok()
142            {
143                return cur;
144            }
145        }
146    }
147}
148
149#[test]
150fn monotonic_timestamp_generator_is_monotonic() {
151    const NUMBER_OF_ITERATIONS: u32 = 1000;
152
153    let mut prev = None;
154    let mut cur;
155    let generator = MonotonicTimestampGenerator::new();
156    for _ in 0..NUMBER_OF_ITERATIONS {
157        cur = generator.next_timestamp();
158        if let Some(prev_val) = prev {
159            assert!(cur > prev_val);
160        }
161        prev = Some(cur);
162    }
163}
164
165#[test]
166fn monotonic_timestamp_generator_is_monotonic_with_concurrency() {
167    use std::collections::HashSet;
168    use std::sync::Arc;
169
170    const NUMBER_OF_ITERATIONS: usize = 1000;
171    const NUMBER_OF_THREADS: usize = 10;
172    let generator = Arc::new(MonotonicTimestampGenerator::new());
173    let timestamps_sets: Vec<_> = std::thread::scope(|s| {
174        (0..NUMBER_OF_THREADS)
175            .map(|_| {
176                s.spawn(|| {
177                    let timestamps: Vec<i64> = (0..NUMBER_OF_ITERATIONS)
178                        .map(|_| generator.next_timestamp())
179                        .collect();
180                    assert!(timestamps.windows(2).all(|w| w[0] < w[1]));
181                    let timestamps_set: HashSet<i64> = HashSet::from_iter(timestamps);
182                    assert_eq!(
183                        timestamps_set.len(),
184                        NUMBER_OF_ITERATIONS,
185                        "Colliding values in a single thread"
186                    );
187                    timestamps_set
188                })
189            })
190            .map(|handle| handle.join().unwrap())
191            .collect()
192    });
193
194    let full_set: HashSet<i64> = timestamps_sets.iter().flatten().copied().collect();
195    assert_eq!(
196        full_set.len(),
197        NUMBER_OF_ITERATIONS * NUMBER_OF_THREADS,
198        "Colliding values between threads"
199    );
200}