scylla/policies/
timestamp_generator.rs1use std::{
2 sync::atomic::AtomicI64,
3 time::{SystemTime, UNIX_EPOCH},
4};
5
6use std::sync::atomic::Ordering;
7use std::sync::Mutex;
8use tokio::time::{Duration, Instant};
9use tracing::warn;
10
11pub trait TimestampGenerator: Send + Sync {
13 fn next_timestamp(&self) -> i64;
15}
16
17#[derive(Default)]
20pub struct SimpleTimestampGenerator {}
21
22impl SimpleTimestampGenerator {
23 pub fn new() -> Self {
24 SimpleTimestampGenerator {}
25 }
26}
27
28impl TimestampGenerator for SimpleTimestampGenerator {
29 fn next_timestamp(&self) -> i64 {
30 SystemTime::now()
31 .duration_since(UNIX_EPOCH)
32 .unwrap()
33 .as_micros() as i64
34 }
35}
36
37struct MonotonicTimestampGeneratorWarningsCfg {
39 warning_threshold: Duration,
40 warning_interval: Duration,
41}
42
43pub struct MonotonicTimestampGenerator {
51 last: AtomicI64,
52 last_warning: Mutex<Instant>,
53 config: Option<MonotonicTimestampGeneratorWarningsCfg>,
54}
55
56impl MonotonicTimestampGenerator {
57 pub fn new() -> Self {
59 MonotonicTimestampGenerator {
60 last: AtomicI64::new(0),
61 last_warning: Mutex::new(Instant::now()),
62 config: Some(MonotonicTimestampGeneratorWarningsCfg {
63 warning_threshold: Duration::from_secs(1),
64 warning_interval: Duration::from_secs(1),
65 }),
66 }
67 }
68
69 pub fn with_warning_times(
70 mut self,
71 warning_threshold: Duration,
72 warning_interval: Duration,
73 ) -> Self {
74 self.config = Some(MonotonicTimestampGeneratorWarningsCfg {
75 warning_threshold,
76 warning_interval,
77 });
78 self
79 }
80
81 pub fn without_warnings(mut self) -> Self {
82 self.config = None;
83 self
84 }
85
86 fn compute_next(&self, last: i64) -> i64 {
89 let current = SystemTime::now().duration_since(UNIX_EPOCH);
90 if let Ok(cur_time) = current {
91 let u_cur = cur_time.as_micros() as i64;
93 if u_cur > last {
94 return u_cur;
96 } else if let Some(cfg) = self.config.as_ref() {
97 if last - u_cur > cfg.warning_threshold.as_micros() as i64 {
99 let mut last_warn = self.last_warning.lock().unwrap();
101 let now = Instant::now();
102 if now >= last_warn.checked_add(cfg.warning_interval).unwrap() {
103 *last_warn = now;
105 drop(last_warn);
106 warn!(
107 "Clock skew detected. The current time ({}) was {} \
108 microseconds behind the last generated timestamp ({}). \
109 The next generated timestamp will be artificially incremented \
110 to guarantee monotonicity.",
111 u_cur,
112 last - u_cur,
113 last
114 )
115 }
116 }
117 }
118 } else {
119 warn!("Clock skew detected. The current time was behind UNIX epoch.");
121 }
122
123 last + 1
124 }
125}
126
127impl Default for MonotonicTimestampGenerator {
128 fn default() -> Self {
129 Self::new()
130 }
131}
132
133impl TimestampGenerator for MonotonicTimestampGenerator {
134 fn next_timestamp(&self) -> i64 {
135 loop {
136 let last = self.last.load(Ordering::SeqCst);
137 let cur = self.compute_next(last);
138 if self
139 .last
140 .compare_exchange(last, cur, Ordering::SeqCst, Ordering::SeqCst)
141 .is_ok()
142 {
143 return cur;
144 }
145 }
146 }
147}
148
149#[test]
150fn monotonic_timestamp_generator_is_monotonic() {
151 const NUMBER_OF_ITERATIONS: u32 = 1000;
152
153 let mut prev = None;
154 let mut cur;
155 let generator = MonotonicTimestampGenerator::new();
156 for _ in 0..NUMBER_OF_ITERATIONS {
157 cur = generator.next_timestamp();
158 if let Some(prev_val) = prev {
159 assert!(cur > prev_val);
160 }
161 prev = Some(cur);
162 }
163}
164
165#[test]
166fn monotonic_timestamp_generator_is_monotonic_with_concurrency() {
167 use std::collections::HashSet;
168 use std::sync::Arc;
169
170 const NUMBER_OF_ITERATIONS: usize = 1000;
171 const NUMBER_OF_THREADS: usize = 10;
172 let generator = Arc::new(MonotonicTimestampGenerator::new());
173 let timestamps_sets: Vec<_> = std::thread::scope(|s| {
174 (0..NUMBER_OF_THREADS)
175 .map(|_| {
176 s.spawn(|| {
177 let timestamps: Vec<i64> = (0..NUMBER_OF_ITERATIONS)
178 .map(|_| generator.next_timestamp())
179 .collect();
180 assert!(timestamps.windows(2).all(|w| w[0] < w[1]));
181 let timestamps_set: HashSet<i64> = HashSet::from_iter(timestamps);
182 assert_eq!(
183 timestamps_set.len(),
184 NUMBER_OF_ITERATIONS,
185 "Colliding values in a single thread"
186 );
187 timestamps_set
188 })
189 })
190 .map(|handle| handle.join().unwrap())
191 .collect()
192 });
193
194 let full_set: HashSet<i64> = timestamps_sets.iter().flatten().copied().collect();
195 assert_eq!(
196 full_set.len(),
197 NUMBER_OF_ITERATIONS * NUMBER_OF_THREADS,
198 "Colliding values between threads"
199 );
200}