1use crate::errors::*;
4use crate::{Counter, Histogram};
5use std::borrow::Borrow;
6use std::borrow::BorrowMut;
7use std::marker::PhantomData;
8use std::ops::{AddAssign, Deref, DerefMut};
9use std::sync::{atomic, Arc, Mutex};
10use std::time;
11
12#[derive(Debug)]
30pub struct Recorder<C: Counter> {
31 local: Histogram<C>,
32 shared: Arc<Shared<C>>,
33 last_phase: usize,
34}
35
36impl<C: Counter> AddAssign<u64> for Recorder<C> {
38 fn add_assign(&mut self, value: u64) {
39 self.record(value).unwrap();
40 }
41}
42
43impl<C: Counter> Clone for Recorder<C> {
44 fn clone(&self) -> Self {
45 {
47 let mut truth = self.shared.truth.lock().unwrap();
48 truth.recorders += 1;
49 }
50
51 Recorder {
53 local: Histogram::new_from(&self.local),
54 shared: self.shared.clone(),
55 last_phase: self.last_phase,
56 }
57 }
58}
59
60impl<C: Counter> Drop for Recorder<C> {
61 fn drop(&mut self) {
62 let mut truth = self.shared.truth.lock().unwrap();
64 truth.recorders -= 1;
65
66 let h = Histogram::new_from(&self.local);
73 let h = std::mem::replace(&mut self.local, h);
74 let _ = self.shared.sender.send(h).is_ok(); drop(truth);
78 }
79}
80
81#[derive(Debug)]
82struct Critical {
83 recorders: usize,
84}
85
86#[derive(Debug)]
87struct Shared<C: Counter> {
88 truth: Mutex<Critical>,
89 sender: crossbeam_channel::Sender<Histogram<C>>,
90 phase: atomic::AtomicUsize,
91}
92
93pub type IdleRecorderGuard<'a, C> = IdleRecorder<&'a mut Recorder<C>, C>;
95
96#[derive(Debug)]
99pub struct IdleRecorder<T, C: Counter>
100where
101 T: BorrowMut<Recorder<C>>,
102{
103 recorder: Option<T>,
104 c: PhantomData<C>,
105}
106
107impl<T, C: Counter> IdleRecorder<T, C>
108where
109 T: BorrowMut<Recorder<C>>,
110{
111 fn reactivate(&mut self) {
112 let recorder = if let Some(ref mut r) = self.recorder {
113 r
114 } else {
115 return;
117 };
118
119 let recorder = recorder.borrow_mut();
120
121 let mut crit = recorder.shared.truth.lock().unwrap();
124 crit.recorders += 1;
125
126 recorder.last_phase = recorder.shared.phase.load(atomic::Ordering::Acquire);
135
136 drop(crit);
138 }
139}
140
141impl<C: Counter> IdleRecorder<Recorder<C>, C> {
142 pub fn activate(mut self) -> Recorder<C> {
144 self.reactivate();
145 self.recorder.take().unwrap()
146 }
147
148 pub fn recorder(&self) -> Recorder<C> {
150 self.recorder.as_ref().unwrap().clone()
151 }
152}
153
154impl<T, C: Counter> Drop for IdleRecorder<T, C>
155where
156 T: BorrowMut<Recorder<C>>,
157{
158 fn drop(&mut self) {
159 self.reactivate()
160 }
161}
162
163impl<C: Counter> Recorder<C> {
164 fn with_hist<F, R>(&mut self, f: F) -> R
165 where
166 F: FnOnce(&mut Histogram<C>) -> R,
167 {
168 let r = f(&mut self.local);
169 let phase = self.shared.phase.load(atomic::Ordering::Acquire);
170 if phase != self.last_phase {
171 self.update();
172 self.last_phase = phase;
173 }
174 r
175 }
176
177 fn shed(&mut self) -> Histogram<C> {
179 let h = Histogram::new_from(&self.local);
180 std::mem::replace(&mut self.local, h)
181 }
182
183 fn update(&mut self) {
184 let h = self.shed();
185 let _ = self.shared.sender.send(h).is_ok(); }
187
188 fn deactivate(&mut self) {
189 let phase;
190 {
191 let mut crit = self.shared.truth.lock().unwrap();
193 crit.recorders -= 1;
194
195 phase = self.shared.phase.load(atomic::Ordering::Acquire);
197 if phase != self.last_phase {
198 let h = Histogram::new_from(&self.local);
200 let h = std::mem::replace(&mut self.local, h);
201 let _ = self.shared.sender.send(h).is_ok(); }
203 }
204 self.last_phase = phase;
205 }
206
207 pub fn idle(&mut self) -> IdleRecorderGuard<C> {
212 self.deactivate();
213 IdleRecorder {
214 recorder: Some(self),
215 c: PhantomData,
216 }
217 }
218
219 pub fn into_idle(mut self) -> IdleRecorder<Self, C> {
225 self.deactivate();
226 IdleRecorder {
227 recorder: Some(self),
228 c: PhantomData,
229 }
230 }
231
232 pub fn add<B: Borrow<Histogram<C>>>(&mut self, source: B) -> Result<(), AdditionError> {
234 self.with_hist(move |h| h.add(source))
235 }
236
237 pub fn add_correct<B: Borrow<Histogram<C>>>(
239 &mut self,
240 source: B,
241 interval: u64,
242 ) -> Result<(), RecordError> {
243 self.with_hist(move |h| h.add_correct(source, interval))
244 }
245
246 pub fn subtract<B: Borrow<Histogram<C>>>(
248 &mut self,
249 subtrahend: B,
250 ) -> Result<(), SubtractionError> {
251 self.with_hist(move |h| h.subtract(subtrahend))
252 }
253
254 pub fn record(&mut self, value: u64) -> Result<(), RecordError> {
256 self.with_hist(move |h| h.record(value))
257 }
258
259 pub fn saturating_record(&mut self, value: u64) {
261 self.with_hist(move |h| h.saturating_record(value))
262 }
263
264 pub fn record_n(&mut self, value: u64, count: C) -> Result<(), RecordError> {
266 self.with_hist(move |h| h.record_n(value, count))
267 }
268
269 pub fn saturating_record_n(&mut self, value: u64, count: C) {
271 self.with_hist(move |h| h.saturating_record_n(value, count))
272 }
273
274 pub fn record_correct(&mut self, value: u64, interval: u64) -> Result<(), RecordError> {
276 self.with_hist(move |h| h.record_correct(value, interval))
277 }
278
279 pub fn record_n_correct(
281 &mut self,
282 value: u64,
283 count: C,
284 interval: u64,
285 ) -> Result<(), RecordError> {
286 self.with_hist(move |h| h.record_n_correct(value, count, interval))
287 }
288}
289
290#[derive(Debug)]
296pub struct SyncHistogram<C: Counter> {
297 merged: Histogram<C>,
298 shared: Arc<Shared<C>>,
299 receiver: crossbeam_channel::Receiver<Histogram<C>>,
300}
301
302impl<C: Counter> SyncHistogram<C> {
303 fn refresh_inner(&mut self, timeout: Option<time::Duration>) {
304 let end = timeout.map(|dur| time::Instant::now() + dur);
305
306 while let Ok(h) = self.receiver.try_recv() {
310 self.merged
311 .add(&h)
312 .expect("TODO: failed to merge histogram");
313 }
314
315 let recorders = self.shared.truth.lock().unwrap().recorders;
317
318 let _ = self.shared.phase.fetch_add(1, atomic::Ordering::AcqRel);
320
321 let mut phased = 0;
323
324 while phased < recorders {
326 let h = if let Some(end) = end {
327 let now = time::Instant::now();
328 if now > end {
329 break;
330 }
331
332 match self.receiver.recv_timeout(end - now) {
333 Ok(h) => h,
334 Err(crossbeam_channel::RecvTimeoutError::Timeout) => break,
335 Err(crossbeam_channel::RecvTimeoutError::Disconnected) => unreachable!(),
336 }
337 } else {
338 self.receiver
339 .recv()
340 .expect("SyncHistogram has an Arc<Shared> with a Receiver")
341 };
342
343 self.merged
344 .add(&h)
345 .expect("TODO: failed to merge histogram");
346 phased += 1;
347 }
348
349 while let Ok(h) = self.receiver.try_recv() {
351 self.merged
352 .add(&h)
353 .expect("TODO: failed to merge histogram");
354 }
355 }
356
357 pub fn refresh(&mut self) {
360 self.refresh_inner(None)
361 }
362
363 pub fn refresh_timeout(&mut self, timeout: time::Duration) {
366 self.refresh_inner(Some(timeout))
367 }
368
369 pub fn recorder(&self) -> Recorder<C> {
374 {
376 let mut truth = self.shared.truth.lock().unwrap();
377 truth.recorders += 1;
378 }
379
380 Recorder {
382 local: Histogram::new_from(&self.merged),
383 shared: self.shared.clone(),
384 last_phase: self.shared.phase.load(atomic::Ordering::Acquire),
385 }
386 }
387}
388
389impl<C: Counter> From<Histogram<C>> for SyncHistogram<C> {
390 fn from(h: Histogram<C>) -> Self {
391 let (tx, rx) = crossbeam_channel::unbounded();
392 SyncHistogram {
393 merged: h,
394 receiver: rx,
395 shared: Arc::new(Shared {
396 truth: Mutex::new(Critical { recorders: 0 }),
397 sender: tx,
398 phase: atomic::AtomicUsize::new(0),
399 }),
400 }
401 }
402}
403
404impl<C: Counter> Deref for SyncHistogram<C> {
405 type Target = Histogram<C>;
406 fn deref(&self) -> &Self::Target {
407 &self.merged
408 }
409}
410
411impl<C: Counter> DerefMut for SyncHistogram<C> {
412 fn deref_mut(&mut self) -> &mut Self::Target {
413 &mut self.merged
414 }
415}