tower/retry/budget/
tps_budget.rs1use std::{
4 fmt,
5 sync::{
6 atomic::{AtomicIsize, Ordering},
7 Mutex,
8 },
9 time::Duration,
10};
11use tokio::time::Instant;
12
13use super::Budget;
14
15pub struct TpsBudget {
27 generation: Mutex<Generation>,
28 reserve: isize,
30 slots: Box<[AtomicIsize]>,
32 window: Duration,
34 writer: AtomicIsize,
37 deposit_amount: isize,
39 withdraw_amount: isize,
41}
42
43#[derive(Debug)]
44struct Generation {
45 index: usize,
47 time: Instant,
49}
50
51impl TpsBudget {
54 pub fn new(ttl: Duration, min_per_sec: u32, retry_percent: f32) -> Self {
70 assert!(ttl >= Duration::from_secs(1));
72 assert!(ttl <= Duration::from_secs(60));
73 assert!(retry_percent >= 0.0);
74 assert!(retry_percent <= 1000.0);
75 assert!(min_per_sec < ::std::i32::MAX as u32);
76
77 let (deposit_amount, withdraw_amount) = if retry_percent == 0.0 {
78 (0, 1)
81 } else if retry_percent <= 1.0 {
82 (1, (1.0 / retry_percent) as isize)
83 } else {
84 (1000, (1000.0 / retry_percent) as isize)
88 };
89 let reserve = (min_per_sec as isize)
90 .saturating_mul(ttl.as_secs() as isize) .saturating_mul(withdraw_amount);
92
93 let windows = 10u32;
95 let mut slots = Vec::with_capacity(windows as usize);
96 for _ in 0..windows {
97 slots.push(AtomicIsize::new(0));
98 }
99
100 TpsBudget {
101 generation: Mutex::new(Generation {
102 index: 0,
103 time: Instant::now(),
104 }),
105 reserve,
106 slots: slots.into_boxed_slice(),
107 window: ttl / windows,
108 writer: AtomicIsize::new(0),
109 deposit_amount,
110 withdraw_amount,
111 }
112 }
113
114 fn expire(&self) {
115 let mut gen = self.generation.lock().expect("generation lock");
116
117 let now = Instant::now();
118 let diff = now.saturating_duration_since(gen.time);
119 if diff < self.window {
120 return;
122 }
123
124 let to_commit = self.writer.swap(0, Ordering::SeqCst);
125 self.slots[gen.index].store(to_commit, Ordering::SeqCst);
126
127 let mut diff = diff;
128 let mut idx = (gen.index + 1) % self.slots.len();
129 while diff > self.window {
130 self.slots[idx].store(0, Ordering::SeqCst);
131 diff -= self.window;
132 idx = (idx + 1) % self.slots.len();
133 }
134
135 gen.index = idx;
136 gen.time = now;
137 }
138
139 fn sum(&self) -> isize {
140 let current = self.writer.load(Ordering::SeqCst);
141 let windowed_sum: isize = self
142 .slots
143 .iter()
144 .map(|slot| slot.load(Ordering::SeqCst))
145 .fold(0, isize::saturating_add);
147
148 current
149 .saturating_add(windowed_sum)
150 .saturating_add(self.reserve)
151 }
152
153 fn put(&self, amt: isize) {
154 self.expire();
155 self.writer.fetch_add(amt, Ordering::SeqCst);
156 }
157
158 fn try_get(&self, amt: isize) -> bool {
159 debug_assert!(amt >= 0);
160
161 self.expire();
162
163 let sum = self.sum();
164 if sum >= amt {
165 self.writer.fetch_add(-amt, Ordering::SeqCst);
166 true
167 } else {
168 false
169 }
170 }
171}
172
173impl Budget for TpsBudget {
174 fn deposit(&self) {
175 self.put(self.deposit_amount)
176 }
177
178 fn withdraw(&self) -> bool {
179 self.try_get(self.withdraw_amount)
180 }
181}
182
183impl Default for TpsBudget {
184 fn default() -> Self {
185 TpsBudget::new(Duration::from_secs(10), 10, 0.2)
186 }
187}
188
189impl fmt::Debug for TpsBudget {
190 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191 f.debug_struct("Budget")
192 .field("deposit", &self.deposit_amount)
193 .field("withdraw", &self.withdraw_amount)
194 .field("balance", &self.sum())
195 .finish()
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use crate::retry::budget::Budget;
202
203 use super::*;
204 use tokio::time;
205
206 #[test]
207 fn tps_empty() {
208 let bgt = TpsBudget::new(Duration::from_secs(1), 0, 1.0);
209 assert!(!bgt.withdraw());
210 }
211
212 #[tokio::test]
213 async fn tps_leaky() {
214 time::pause();
215
216 let bgt = TpsBudget::new(Duration::from_secs(1), 0, 1.0);
217 bgt.deposit();
218
219 time::advance(Duration::from_secs(3)).await;
220
221 assert!(!bgt.withdraw());
222 }
223
224 #[tokio::test]
225 async fn tps_slots() {
226 time::pause();
227
228 let bgt = TpsBudget::new(Duration::from_secs(1), 0, 0.5);
229 bgt.deposit();
230 bgt.deposit();
231 time::advance(Duration::from_millis(901)).await;
232 assert!(bgt.withdraw());
234
235 time::advance(Duration::from_millis(2001)).await;
237
238 bgt.deposit();
239 time::advance(Duration::from_millis(301)).await;
240 bgt.deposit();
241 time::advance(Duration::from_millis(801)).await;
242 bgt.deposit();
243
244 assert!(bgt.withdraw());
247 }
248
249 #[tokio::test]
250 async fn tps_reserve() {
251 let bgt = TpsBudget::new(Duration::from_secs(1), 5, 1.0);
252 assert!(bgt.withdraw());
253 assert!(bgt.withdraw());
254 assert!(bgt.withdraw());
255 assert!(bgt.withdraw());
256 assert!(bgt.withdraw());
257
258 assert!(!bgt.withdraw());
259 }
260}