hdrhistogram/sync/
mod.rs

1//! Synchronized types that allow access to a `Histogram` from multiple threads.
2
3use crate::errors::*;
4use crate::{Counter, Histogram};
5use std::borrow::Borrow;
6use std::borrow::BorrowMut;
7use std::marker::PhantomData;
8use std::ops::{AddAssign, Deref, DerefMut};
9use std::sync::{atomic, Arc, Mutex};
10use std::time;
11
12/// A write-only handle to a [`SyncHistogram`].
13///
14/// This handle allows you to record samples from multiple threads, each with its own `Recorder`,
15/// concurrently. Writes to a `Recorder` are wait-free and scalable except for when the
16/// [`SyncHistogram`] initiates a _phase shift_. During a phase shift, the next write on each
17/// associated `Recorder` merges its results into a shared [`Histogram`] that is then made
18/// available to the [`SyncHistogram`] once the phase shift completes. Phase shifts should also be
19/// relatively cheap for writers, as they mainly need to perform a channel send on an unbounded,
20/// lock-free channel.
21///
22/// An idle `Recorder` will hold up a phase shift indefinitely, or until it times out (is using
23/// [`SyncHistogram::refresh_timeout`]. If a `Recorder` will remain idle for extended periods of
24/// time, it should call [`Recorder::idle`], which will tell the reader not to wait for this
25/// particular writer.
26///
27/// When a `Recorder` is dropped, all samples are made visible to the next
28/// [`SyncHistogram::refresh`].
29#[derive(Debug)]
30pub struct Recorder<C: Counter> {
31    local: Histogram<C>,
32    shared: Arc<Shared<C>>,
33    last_phase: usize,
34}
35
36// make it more ergonomic to record samples
37impl<C: Counter> AddAssign<u64> for Recorder<C> {
38    fn add_assign(&mut self, value: u64) {
39        self.record(value).unwrap();
40    }
41}
42
43impl<C: Counter> Clone for Recorder<C> {
44    fn clone(&self) -> Self {
45        // reader will have to wait for one more recorder
46        {
47            let mut truth = self.shared.truth.lock().unwrap();
48            truth.recorders += 1;
49        }
50
51        // new recorder starts at the same phase as we do with an empty histogram
52        Recorder {
53            local: Histogram::new_from(&self.local),
54            shared: self.shared.clone(),
55            last_phase: self.last_phase,
56        }
57    }
58}
59
60impl<C: Counter> Drop for Recorder<C> {
61    fn drop(&mut self) {
62        // we'll need to decrement the # of recorders
63        let mut truth = self.shared.truth.lock().unwrap();
64        truth.recorders -= 1;
65
66        // we also want to communicate the remainder of our samples to the reader
67        // we do this under the lock so that if the reader reads .recorders after we update it
68        // above, it is guaranteed to see the samples from this recorder.
69        // we also _have_ to do it at some point during drop as the reader _may_ have read
70        // .recorders _before_ we decremented it above, in which case it's blocking on us!
71        // note that we cannot call self.update() here as it would drop the mutex guard
72        let h = Histogram::new_from(&self.local);
73        let h = std::mem::replace(&mut self.local, h);
74        let _ = self.shared.sender.send(h).is_ok(); // if this is err, the reader went away
75
76        // explicitly drop guard to ensure we don't accidentally drop it above
77        drop(truth);
78    }
79}
80
81#[derive(Debug)]
82struct Critical {
83    recorders: usize,
84}
85
86#[derive(Debug)]
87struct Shared<C: Counter> {
88    truth: Mutex<Critical>,
89    sender: crossbeam_channel::Sender<Histogram<C>>,
90    phase: atomic::AtomicUsize,
91}
92
93/// See [`IdleRecorder`]. This guard borrows the idle [`Recorder`].
94pub type IdleRecorderGuard<'a, C> = IdleRecorder<&'a mut Recorder<C>, C>;
95
96/// This guard denotes that a [`Recorder`] is currently idle, and should not be waited on by a
97/// [`SyncHistogram`] phase-shift.
98#[derive(Debug)]
99pub struct IdleRecorder<T, C: Counter>
100where
101    T: BorrowMut<Recorder<C>>,
102{
103    recorder: Option<T>,
104    c: PhantomData<C>,
105}
106
107impl<T, C: Counter> IdleRecorder<T, C>
108where
109    T: BorrowMut<Recorder<C>>,
110{
111    fn reactivate(&mut self) {
112        let recorder = if let Some(ref mut r) = self.recorder {
113            r
114        } else {
115            // already reactivated
116            return;
117        };
118
119        let recorder = recorder.borrow_mut();
120
121        // the Recorder is no longer idle, so the reader has to wait for us again
122        // this basically means re-incrementing .recorders
123        let mut crit = recorder.shared.truth.lock().unwrap();
124        crit.recorders += 1;
125
126        // we need to figure out what phase we're joining
127        // the easiest way to do that is to adopt the current phase
128        //
129        // note that we have to load the phase while holding the lock.
130        // if we did not, the reader could come along, read our ++'d .recorders (and so wait for us
131        // to send), and bump the phase, all before we read it, which would lead us to believe that
132        // we were already synchronized when in reality we were not, which would stall the reader
133        // even if we issued more writes.
134        recorder.last_phase = recorder.shared.phase.load(atomic::Ordering::Acquire);
135
136        // explicitly drop guard to ensure we don't accidentally drop it above
137        drop(crit);
138    }
139}
140
141impl<C: Counter> IdleRecorder<Recorder<C>, C> {
142    /// Mark the wrapped [`Recorder`] as active again and return it.
143    pub fn activate(mut self) -> Recorder<C> {
144        self.reactivate();
145        self.recorder.take().unwrap()
146    }
147
148    /// Clone the wrapped [`Recorder`].
149    pub fn recorder(&self) -> Recorder<C> {
150        self.recorder.as_ref().unwrap().clone()
151    }
152}
153
154impl<T, C: Counter> Drop for IdleRecorder<T, C>
155where
156    T: BorrowMut<Recorder<C>>,
157{
158    fn drop(&mut self) {
159        self.reactivate()
160    }
161}
162
163impl<C: Counter> Recorder<C> {
164    fn with_hist<F, R>(&mut self, f: F) -> R
165    where
166        F: FnOnce(&mut Histogram<C>) -> R,
167    {
168        let r = f(&mut self.local);
169        let phase = self.shared.phase.load(atomic::Ordering::Acquire);
170        if phase != self.last_phase {
171            self.update();
172            self.last_phase = phase;
173        }
174        r
175    }
176
177    // return our current histogram and leave a cleared one in its place
178    fn shed(&mut self) -> Histogram<C> {
179        let h = Histogram::new_from(&self.local);
180        std::mem::replace(&mut self.local, h)
181    }
182
183    fn update(&mut self) {
184        let h = self.shed();
185        let _ = self.shared.sender.send(h).is_ok(); // if this is err, the reader went away
186    }
187
188    fn deactivate(&mut self) {
189        let phase;
190        {
191            // we're leaving rotation, so we need to decrement .recorders
192            let mut crit = self.shared.truth.lock().unwrap();
193            crit.recorders -= 1;
194
195            // make sure we don't hold up the current phase shift (if any)
196            phase = self.shared.phase.load(atomic::Ordering::Acquire);
197            if phase != self.last_phase {
198                // can't call self.update() due to borrow of self.shared above
199                let h = Histogram::new_from(&self.local);
200                let h = std::mem::replace(&mut self.local, h);
201                let _ = self.shared.sender.send(h).is_ok(); // if this is err, the reader went away
202            }
203        }
204        self.last_phase = phase;
205    }
206
207    /// Call this method if the Recorder will be idle for a while.
208    ///
209    /// Until the returned guard is dropped, the associated [`SyncHistogram`] will not wait for
210    /// this recorder on a phase shift.
211    pub fn idle(&mut self) -> IdleRecorderGuard<C> {
212        self.deactivate();
213        IdleRecorder {
214            recorder: Some(self),
215            c: PhantomData,
216        }
217    }
218
219    /// Mark this `Recorder` as inactive.
220    ///
221    /// Until the returned guard is consumed, either by calling [`IdleRecorder::activate`] or by
222    /// dropping it, the associated [`SyncHistogram`] will not wait for this recorder on a phase
223    /// shift.
224    pub fn into_idle(mut self) -> IdleRecorder<Self, C> {
225        self.deactivate();
226        IdleRecorder {
227            recorder: Some(self),
228            c: PhantomData,
229        }
230    }
231
232    /// See [`Histogram::add`].
233    pub fn add<B: Borrow<Histogram<C>>>(&mut self, source: B) -> Result<(), AdditionError> {
234        self.with_hist(move |h| h.add(source))
235    }
236
237    /// See [`Histogram::add_correct`].
238    pub fn add_correct<B: Borrow<Histogram<C>>>(
239        &mut self,
240        source: B,
241        interval: u64,
242    ) -> Result<(), RecordError> {
243        self.with_hist(move |h| h.add_correct(source, interval))
244    }
245
246    /// See [`Histogram::subtract`].
247    pub fn subtract<B: Borrow<Histogram<C>>>(
248        &mut self,
249        subtrahend: B,
250    ) -> Result<(), SubtractionError> {
251        self.with_hist(move |h| h.subtract(subtrahend))
252    }
253
254    /// See [`Histogram::record`].
255    pub fn record(&mut self, value: u64) -> Result<(), RecordError> {
256        self.with_hist(move |h| h.record(value))
257    }
258
259    /// See [`Histogram::saturating_record`].
260    pub fn saturating_record(&mut self, value: u64) {
261        self.with_hist(move |h| h.saturating_record(value))
262    }
263
264    /// See [`Histogram::record_n`].
265    pub fn record_n(&mut self, value: u64, count: C) -> Result<(), RecordError> {
266        self.with_hist(move |h| h.record_n(value, count))
267    }
268
269    /// See [`Histogram::saturating_record_n`].
270    pub fn saturating_record_n(&mut self, value: u64, count: C) {
271        self.with_hist(move |h| h.saturating_record_n(value, count))
272    }
273
274    /// See [`Histogram::record_correct`].
275    pub fn record_correct(&mut self, value: u64, interval: u64) -> Result<(), RecordError> {
276        self.with_hist(move |h| h.record_correct(value, interval))
277    }
278
279    /// See [`Histogram::record_n_correct`].
280    pub fn record_n_correct(
281        &mut self,
282        value: u64,
283        count: C,
284        interval: u64,
285    ) -> Result<(), RecordError> {
286        self.with_hist(move |h| h.record_n_correct(value, count, interval))
287    }
288}
289
290/// A `Histogram` that can be written to by multiple threads concurrently.
291///
292/// Each writer thread should have a [`Recorder`], which allows it to record new samples without
293/// synchronization. New recorded samples are made available through this histogram by calling
294/// [`SyncHistogram::refresh`], which blocks until it has synchronized with every recorder.
295#[derive(Debug)]
296pub struct SyncHistogram<C: Counter> {
297    merged: Histogram<C>,
298    shared: Arc<Shared<C>>,
299    receiver: crossbeam_channel::Receiver<Histogram<C>>,
300}
301
302impl<C: Counter> SyncHistogram<C> {
303    fn refresh_inner(&mut self, timeout: Option<time::Duration>) {
304        let end = timeout.map(|dur| time::Instant::now() + dur);
305
306        // time to start a phase change
307        // we first want to drain any histograms left over by dropped recorders
308        // note that we do this _before_ incrementing the phase, so we know they're "old"
309        while let Ok(h) = self.receiver.try_recv() {
310            self.merged
311                .add(&h)
312                .expect("TODO: failed to merge histogram");
313        }
314
315        // make sure no recorders can join or leave in the middle of this
316        let recorders = self.shared.truth.lock().unwrap().recorders;
317
318        // then, we tell writers to phase
319        let _ = self.shared.phase.fetch_add(1, atomic::Ordering::AcqRel);
320
321        // we want to wait for writers to all have phased
322        let mut phased = 0;
323
324        // at this point, we expect to get at least truth.recorders histograms
325        while phased < recorders {
326            let h = if let Some(end) = end {
327                let now = time::Instant::now();
328                if now > end {
329                    break;
330                }
331
332                match self.receiver.recv_timeout(end - now) {
333                    Ok(h) => h,
334                    Err(crossbeam_channel::RecvTimeoutError::Timeout) => break,
335                    Err(crossbeam_channel::RecvTimeoutError::Disconnected) => unreachable!(),
336                }
337            } else {
338                self.receiver
339                    .recv()
340                    .expect("SyncHistogram has an Arc<Shared> with a Receiver")
341            };
342
343            self.merged
344                .add(&h)
345                .expect("TODO: failed to merge histogram");
346            phased += 1;
347        }
348
349        // we also gobble up extra histograms we may have been sent from more dropped writers
350        while let Ok(h) = self.receiver.try_recv() {
351            self.merged
352                .add(&h)
353                .expect("TODO: failed to merge histogram");
354        }
355    }
356
357    /// Block until writes from all [`Recorder`] instances for this histogram have been
358    /// incorporated.
359    pub fn refresh(&mut self) {
360        self.refresh_inner(None)
361    }
362
363    /// Block until writes from all [`Recorder`] instances for this histogram have been
364    /// incorporated, or until the given amount of time has passed.
365    pub fn refresh_timeout(&mut self, timeout: time::Duration) {
366        self.refresh_inner(Some(timeout))
367    }
368
369    /// Obtain another multi-threaded writer for this histogram.
370    ///
371    /// Note that writes made to the `Recorder` will not be visible until the next call to
372    /// [`SyncHistogram::refresh`].
373    pub fn recorder(&self) -> Recorder<C> {
374        // we will have to wait for one more recorder
375        {
376            let mut truth = self.shared.truth.lock().unwrap();
377            truth.recorders += 1;
378        }
379
380        // new recorder starts at the current phase with an empty histogram
381        Recorder {
382            local: Histogram::new_from(&self.merged),
383            shared: self.shared.clone(),
384            last_phase: self.shared.phase.load(atomic::Ordering::Acquire),
385        }
386    }
387}
388
389impl<C: Counter> From<Histogram<C>> for SyncHistogram<C> {
390    fn from(h: Histogram<C>) -> Self {
391        let (tx, rx) = crossbeam_channel::unbounded();
392        SyncHistogram {
393            merged: h,
394            receiver: rx,
395            shared: Arc::new(Shared {
396                truth: Mutex::new(Critical { recorders: 0 }),
397                sender: tx,
398                phase: atomic::AtomicUsize::new(0),
399            }),
400        }
401    }
402}
403
404impl<C: Counter> Deref for SyncHistogram<C> {
405    type Target = Histogram<C>;
406    fn deref(&self) -> &Self::Target {
407        &self.merged
408    }
409}
410
411impl<C: Counter> DerefMut for SyncHistogram<C> {
412    fn deref_mut(&mut self) -> &mut Self::Target {
413        &mut self.merged
414    }
415}