hdrhistogram/serialization/
v2_deflate_serializer.rs

1use super::v2_serializer::{V2SerializeError, V2Serializer};
2use super::{Serializer, V2_COMPRESSED_COOKIE};
3use crate::core::counter::Counter;
4use crate::Histogram;
5use byteorder::{BigEndian, WriteBytesExt};
6use flate2::write::ZlibEncoder;
7use flate2::Compression;
8use std::io::{self, Write};
9use std::{self, error, fmt};
10
11/// Errors that occur during serialization.
12#[derive(Debug)]
13pub enum V2DeflateSerializeError {
14    /// The underlying serialization failed
15    InternalSerializationError(V2SerializeError),
16    /// An i/o operation failed.
17    IoError(io::Error),
18}
19
20impl std::convert::From<std::io::Error> for V2DeflateSerializeError {
21    fn from(e: std::io::Error) -> Self {
22        V2DeflateSerializeError::IoError(e)
23    }
24}
25
26impl fmt::Display for V2DeflateSerializeError {
27    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
28        match self {
29            V2DeflateSerializeError::InternalSerializationError(e) => {
30                write!(f, "The underlying serialization failed: {}", e)
31            }
32            V2DeflateSerializeError::IoError(e) => {
33                write!(f, "The underlying serialization failed: {}", e)
34            }
35        }
36    }
37}
38
39impl error::Error for V2DeflateSerializeError {
40    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
41        match self {
42            V2DeflateSerializeError::InternalSerializationError(e) => Some(e),
43            V2DeflateSerializeError::IoError(e) => Some(e),
44        }
45    }
46}
47
48/// Serializer for the V2 + DEFLATE binary format.
49///
50/// It's called "deflate" to stay consistent with the naming used in the Java implementation, but
51/// it actually uses zlib's wrapper format around plain DEFLATE.
52pub struct V2DeflateSerializer {
53    uncompressed_buf: Vec<u8>,
54    compressed_buf: Vec<u8>,
55    v2_serializer: V2Serializer,
56}
57
58impl Default for V2DeflateSerializer {
59    fn default() -> Self {
60        Self::new()
61    }
62}
63
64impl V2DeflateSerializer {
65    /// Create a new serializer.
66    pub fn new() -> V2DeflateSerializer {
67        V2DeflateSerializer {
68            uncompressed_buf: Vec::new(),
69            compressed_buf: Vec::new(),
70            v2_serializer: V2Serializer::new(),
71        }
72    }
73}
74
75impl Serializer for V2DeflateSerializer {
76    type SerializeError = V2DeflateSerializeError;
77
78    fn serialize<T: Counter, W: Write>(
79        &mut self,
80        h: &Histogram<T>,
81        writer: &mut W,
82    ) -> Result<usize, V2DeflateSerializeError> {
83        // TODO benchmark serializing in chunks rather than all at once: each uncompressed v2 chunk
84        // could be compressed and written to the compressed buf, possibly using an approach like
85        // that of https://github.com/HdrHistogram/HdrHistogram_rust/issues/32#issuecomment-287583055.
86        // This would reduce the overall buffer size needed for plain v2 serialization, and be
87        // more cache friendly.
88
89        self.uncompressed_buf.clear();
90        self.compressed_buf.clear();
91        // TODO serialize directly into uncompressed_buf without the buffering inside v2_serializer
92        let uncompressed_len = self
93            .v2_serializer
94            .serialize(h, &mut self.uncompressed_buf)
95            .map_err(V2DeflateSerializeError::InternalSerializationError)?;
96
97        debug_assert_eq!(self.uncompressed_buf.len(), uncompressed_len);
98        // On randomized test histograms we get about 10% compression, but of course random data
99        // doesn't compress well. Real-world data may compress better, so let's assume a more
100        // optimistic 50% compression as a baseline to reserve. If we're overly optimistic that's
101        // still only one more allocation the first time it's needed.
102        self.compressed_buf.reserve(self.uncompressed_buf.len() / 2);
103
104        self.compressed_buf
105            .write_u32::<BigEndian>(V2_COMPRESSED_COOKIE)?;
106        // placeholder for length
107        self.compressed_buf.write_u32::<BigEndian>(0)?;
108
109        // TODO pluggable compressors? configurable compression levels?
110        // TODO benchmark https://github.com/sile/libflate
111        // TODO if uncompressed_len is near the limit of 16-bit usize, and compression grows the
112        // data instead of shrinking it (which we cannot really predict), writing to compressed_buf
113        // could panic as Vec overflows its internal `usize`.
114
115        {
116            // TODO reuse deflate buf, or switch to lower-level flate2::Compress
117            let mut compressor = ZlibEncoder::new(&mut self.compressed_buf, Compression::default());
118            compressor.write_all(&self.uncompressed_buf[0..uncompressed_len])?;
119            let _ = compressor.finish()?;
120        }
121
122        // fill in length placeholder. Won't underflow since length is always at least 8, and won't
123        // overflow u32 as the largest array is about 6 million entries, so about 54MiB encoded (if
124        // counter is u64).
125        let total_compressed_len = self.compressed_buf.len();
126        (&mut self.compressed_buf[4..8])
127            .write_u32::<BigEndian>((total_compressed_len as u32) - 8)?;
128
129        writer.write_all(&self.compressed_buf)?;
130
131        Ok(total_compressed_len)
132    }
133}