prometheus_parse/
lib.rs

1use chrono::{DateTime, TimeZone, Utc};
2use itertools::Itertools;
3use once_cell::sync::Lazy;
4use regex::Regex;
5
6use std::collections::{BTreeMap, HashMap};
7use std::io;
8use std::ops::Deref;
9
10static HELP_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"^#\s+HELP\s+(\w+)\s+(.+)$").unwrap());
11static TYPE_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"^#\s+TYPE\s+(\w+)\s+(\w+)").unwrap());
12static SAMPLE_RE: Lazy<Regex> = Lazy::new(|| {
13    Regex::new(r"^(?P<name>\w+)(\{(?P<labels>[^}]+)\})?\s+(?P<value>\S+)(\s+(?P<timestamp>\S+))?")
14        .unwrap()
15});
16
17#[derive(Debug, Eq, PartialEq)]
18pub enum LineInfo<'a> {
19    Doc {
20        metric_name: &'a str,
21        doc: &'a str,
22    },
23    Type {
24        metric_name: String,
25        metric_alias: Option<String>,
26        sample_type: SampleType,
27    },
28    Sample {
29        metric_name: &'a str,
30        labels: Option<&'a str>,
31        value: &'a str,
32        timestamp: Option<&'a str>,
33    },
34    Empty,
35    Ignored,
36}
37
38#[derive(Debug, Eq, PartialEq, Clone, Copy)]
39pub enum SampleType {
40    Counter,
41    Gauge,
42    Histogram,
43    Summary,
44    Untyped,
45}
46
47impl SampleType {
48    pub fn parse(s: &str) -> SampleType {
49        match s {
50            "counter" => SampleType::Counter,
51            "gauge" => SampleType::Gauge,
52            "histogram" => SampleType::Histogram,
53            "summary" => SampleType::Summary,
54            _ => SampleType::Untyped,
55        }
56    }
57}
58
59impl<'a> LineInfo<'a> {
60    pub fn parse(line: &'a str) -> LineInfo<'a> {
61        let line = line.trim();
62        if line.is_empty() {
63            return LineInfo::Empty;
64        }
65        if let Some(ref caps) = HELP_RE.captures(line) {
66            return match (caps.get(1), caps.get(2)) {
67                (Some(ref metric_name), Some(ref doc)) => LineInfo::Doc {
68                    metric_name: metric_name.as_str(),
69                    doc: doc.as_str(),
70                },
71                _ => LineInfo::Ignored,
72            };
73        }
74        if let Some(ref caps) = TYPE_RE.captures(line) {
75            return match (caps.get(1), caps.get(2)) {
76                (Some(ref metric_name), Some(ref sample_type)) => {
77                    let sample_type = SampleType::parse(sample_type.as_str());
78                    LineInfo::Type {
79                        metric_name: match sample_type {
80                            SampleType::Histogram => format!("{}_bucket", metric_name.as_str()),
81                            _ => metric_name.as_str().to_string(),
82                        },
83                        metric_alias: match sample_type {
84                            SampleType::Histogram => Some(metric_name.as_str().to_string()),
85                            _ => None,
86                        },
87                        sample_type,
88                    }
89                }
90                _ => LineInfo::Ignored,
91            };
92        }
93        match SAMPLE_RE.captures(line) {
94            Some(ref caps) => {
95                return match (
96                    caps.name("name"),
97                    caps.name("labels"),
98                    caps.name("value"),
99                    caps.name("timestamp"),
100                ) {
101                    (Some(ref name), labels, Some(ref value), timestamp) => LineInfo::Sample {
102                        metric_name: name.as_str(),
103                        labels: labels.map(|c| c.as_str()),
104                        value: value.as_str(),
105                        timestamp: timestamp.map(|c| c.as_str()),
106                    },
107                    _ => LineInfo::Ignored,
108                };
109            }
110            None => LineInfo::Ignored,
111        }
112    }
113}
114
115#[derive(Clone, Debug, PartialEq)]
116pub struct Sample {
117    pub metric: String,
118    pub value: Value,
119    pub labels: Labels,
120    pub timestamp: DateTime<Utc>,
121}
122
123fn parse_bucket(s: &str, label: &str) -> Option<(Labels, f64)> {
124    let mut labs = HashMap::new();
125
126    let mut value = None;
127    for kv in s.split(',') {
128        let kvpair = kv.split('=').collect::<Vec<_>>();
129        if kvpair.len() != 2 || kvpair[0].is_empty() {
130            continue;
131        }
132        let (k, v) = (kvpair[0], kvpair[1].trim_matches('"'));
133        if k == label {
134            value = match parse_golang_float(v) {
135                Ok(v) => Some(v),
136                Err(_) => return None,
137            };
138        } else {
139            labs.insert(k.to_string(), v.to_string());
140        }
141    }
142
143    value.map(|v| (Labels(labs), v))
144}
145
146#[derive(Clone, Debug, PartialEq)]
147pub struct HistogramCount {
148    pub less_than: f64,
149    pub count: f64,
150}
151
152#[derive(Clone, Debug, PartialEq)]
153pub struct SummaryCount {
154    pub quantile: f64,
155    pub count: f64,
156}
157
158#[derive(Clone, Debug, Eq, PartialEq)]
159pub struct Labels(HashMap<String, String>);
160
161impl Labels {
162    fn new() -> Labels {
163        Labels(HashMap::new())
164    }
165
166    fn parse(s: &str) -> Labels {
167        let mut l = HashMap::new();
168        for kv in s.split(',') {
169            let kvpair = kv.split('=').collect::<Vec<_>>();
170            if kvpair.len() != 2 || kvpair[0].is_empty() {
171                continue;
172            }
173            l.insert(
174                kvpair[0].to_string(),
175                kvpair[1].trim_matches('"').to_string(),
176            );
177        }
178        Labels(l)
179    }
180
181    pub fn get(&self, name: &str) -> Option<&str> {
182        self.0.get(name).map(|x| x.as_str())
183    }
184}
185
186impl Deref for Labels {
187    type Target = HashMap<String, String>;
188
189    fn deref(&self) -> &Self::Target {
190        &self.0
191    }
192}
193
194impl core::fmt::Display for Labels {
195    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> {
196        write!(
197            f,
198            "{}",
199            Itertools::intersperse(
200                self.iter()
201                    .collect::<BTreeMap<_, _>>()
202                    .into_iter()
203                    .map(|(k, v)| format!(r#"{}="{}""#, k, v)),
204                ",".to_string()
205            )
206            .collect::<String>()
207        )
208    }
209}
210
211#[derive(Clone, Debug, PartialEq)]
212pub enum Value {
213    Counter(f64),
214    Gauge(f64),
215    Histogram(Vec<HistogramCount>),
216    Summary(Vec<SummaryCount>),
217    Untyped(f64),
218}
219
220impl Value {
221    fn push_histogram(&mut self, h: HistogramCount) {
222        if let &mut Value::Histogram(ref mut hs) = self {
223            hs.push(h)
224        }
225    }
226
227    fn push_summary(&mut self, s: SummaryCount) {
228        if let &mut Value::Summary(ref mut ss) = self {
229            ss.push(s)
230        }
231    }
232}
233
234#[derive(Clone, Debug)]
235pub struct Scrape {
236    pub docs: HashMap<String, String>,
237    pub samples: Vec<Sample>,
238}
239
240fn parse_golang_float(s: &str) -> Result<f64, <f64 as std::str::FromStr>::Err> {
241    match s.to_lowercase().as_str() {
242        "nan" => Ok(std::f64::NAN), // f64::parse doesn't recognize 'nan'
243        s => s.parse::<f64>(),      // f64::parse expects lowercase [+-]inf
244    }
245}
246
247impl Scrape {
248    pub fn parse(lines: impl Iterator<Item = io::Result<String>>) -> io::Result<Scrape> {
249        Scrape::parse_at(lines, Utc::now())
250    }
251
252    pub fn parse_at(
253        lines: impl Iterator<Item = io::Result<String>>,
254        sample_time: DateTime<Utc>,
255    ) -> io::Result<Scrape> {
256        let mut docs: HashMap<String, String> = HashMap::new();
257        let mut types: HashMap<String, SampleType> = HashMap::new();
258        let mut aliases: HashMap<String, String> = HashMap::new();
259        let mut buckets: HashMap<(String, String), Sample> = HashMap::new();
260        let mut samples: Vec<Sample> = vec![];
261
262        for read_line in lines {
263            let line = match read_line {
264                Ok(line) => line,
265                Err(e) => return Err(e),
266            };
267            match LineInfo::parse(&line) {
268                LineInfo::Doc {
269                    ref metric_name,
270                    ref doc,
271                } => {
272                    docs.insert(metric_name.to_string(), doc.to_string());
273                }
274                LineInfo::Type {
275                    ref metric_name,
276                    ref metric_alias,
277                    ref sample_type,
278                } => {
279                    types.insert(metric_name.to_string(), *sample_type);
280                    if let Some(alias) = metric_alias.as_ref() {
281                        aliases.insert(metric_name.to_string(), alias.to_string());
282                    }
283                }
284                LineInfo::Sample {
285                    metric_name,
286                    ref labels,
287                    value,
288                    timestamp,
289                } => {
290                    // Parse value or skip
291                    let fvalue = if let Ok(v) = parse_golang_float(value) {
292                        v
293                    } else {
294                        continue;
295                    };
296                    // Parse timestamp or use given sample time
297                    let timestamp = if let Some(time) = timestamp
298                        .and_then(|x| x.parse::<i64>().ok())
299                        .and_then(|ts_millis| Utc.timestamp_millis_opt(ts_millis).single())
300                    {
301                        time
302                    } else {
303                        sample_time
304                    };
305                    match (types.get(metric_name), labels) {
306                        (Some(SampleType::Histogram), Some(labels)) => {
307                            if let Some((labels, lt)) = parse_bucket(labels, "le") {
308                                let sample = buckets
309                                    .entry((metric_name.to_string(), labels.to_string()))
310                                    .or_insert(Sample {
311                                        metric: aliases
312                                            .get(metric_name)
313                                            .map(ToString::to_string)
314                                            .unwrap_or_else(|| metric_name.to_string()),
315                                        labels,
316                                        value: Value::Histogram(vec![]),
317                                        timestamp,
318                                    });
319                                sample.value.push_histogram(HistogramCount {
320                                    less_than: lt,
321                                    count: fvalue,
322                                })
323                            }
324                        }
325                        (Some(SampleType::Summary), Some(labels)) => {
326                            if let Some((labels, q)) = parse_bucket(labels, "quantile") {
327                                let sample = buckets
328                                    .entry((metric_name.to_string(), labels.to_string()))
329                                    .or_insert(Sample {
330                                        metric: metric_name.to_string(),
331                                        labels,
332                                        value: Value::Summary(vec![]),
333                                        timestamp,
334                                    });
335                                sample.value.push_summary(SummaryCount {
336                                    quantile: q,
337                                    count: fvalue,
338                                })
339                            }
340                        }
341                        (ty, labels) => samples.push(Sample {
342                            metric: metric_name.to_string(),
343                            labels: labels.map_or(Labels::new(), Labels::parse),
344                            value: match ty {
345                                Some(SampleType::Counter) => Value::Counter(fvalue),
346                                Some(SampleType::Gauge) => Value::Gauge(fvalue),
347                                _ => Value::Untyped(fvalue),
348                            },
349                            timestamp,
350                        }),
351                    };
352                }
353                _ => {}
354            }
355        }
356        samples.extend(buckets.drain().map(|(_k, v)| v).collect::<Vec<_>>());
357        Ok(Scrape { docs, samples })
358    }
359}
360
361#[cfg(test)]
362mod tests {
363    use std::io::BufRead;
364
365    use super::*;
366
367    #[test]
368    fn test_lineinfo_parse() {
369        assert_eq!(
370            LineInfo::parse("foo 2"),
371            LineInfo::Sample {
372                metric_name: "foo",
373                value: "2",
374                labels: None,
375                timestamp: None,
376            }
377        );
378        assert_eq!(
379            LineInfo::parse("foo wtf -1"),
380            LineInfo::Sample {
381                metric_name: "foo",
382                value: "wtf",
383                labels: None,
384                timestamp: Some("-1"),
385            }
386        );
387        assert_eq!(LineInfo::parse("foo=2"), LineInfo::Ignored,);
388        assert_eq!(
389            LineInfo::parse("foo 2 1543182234"),
390            LineInfo::Sample {
391                metric_name: "foo",
392                value: "2",
393                labels: None,
394                timestamp: Some("1543182234"),
395            }
396        );
397        assert_eq!(
398            LineInfo::parse("foo{bar=baz} 2 1543182234"),
399            LineInfo::Sample {
400                metric_name: "foo",
401                value: "2",
402                labels: Some("bar=baz"),
403                timestamp: Some("1543182234"),
404            }
405        );
406        assert_eq!(
407            LineInfo::parse("foo{bar=baz,quux=nonce} 2 1543182234"),
408            LineInfo::Sample {
409                metric_name: "foo",
410                value: "2",
411                labels: Some("bar=baz,quux=nonce"),
412                timestamp: Some("1543182234"),
413            }
414        );
415        assert_eq!(
416            LineInfo::parse("# HELP foo this is a docstring"),
417            LineInfo::Doc {
418                metric_name: "foo",
419                doc: "this is a docstring"
420            },
421        );
422        assert_eq!(
423            LineInfo::parse("# TYPE foobar bazquux"),
424            LineInfo::Type {
425                metric_name: "foobar".to_string(),
426                metric_alias: None,
427                sample_type: SampleType::Untyped,
428            },
429        );
430    }
431
432    fn pair_to_string(pair: &(&str, &str)) -> (String, String) {
433        (pair.0.to_string(), pair.1.to_string())
434    }
435
436    #[test]
437    fn test_labels_parse() {
438        assert_eq!(
439            Labels::parse("foo=bar"),
440            Labels([("foo", "bar")].iter().map(pair_to_string).collect())
441        );
442        assert_eq!(
443            Labels::parse("foo=bar,"),
444            Labels([("foo", "bar")].iter().map(pair_to_string).collect())
445        );
446        assert_eq!(
447            Labels::parse(",foo=bar,"),
448            Labels([("foo", "bar")].iter().map(pair_to_string).collect())
449        );
450        assert_eq!(
451            Labels::parse("=,foo=bar,"),
452            Labels([("foo", "bar")].iter().map(pair_to_string).collect())
453        );
454        assert_eq!(
455            Labels::parse(r#"foo="bar""#),
456            Labels([("foo", "bar")].iter().map(pair_to_string).collect())
457        );
458        assert_eq!(
459            Labels::parse(r#"foo="bar",baz="quux""#),
460            Labels(
461                [("foo", "bar"), ("baz", "quux")]
462                    .iter()
463                    .map(pair_to_string)
464                    .collect()
465            )
466        );
467        assert_eq!(
468            Labels::parse(r#"foo="foo bar",baz="baz quux""#),
469            Labels(
470                [("foo", "foo bar"), ("baz", "baz quux")]
471                    .iter()
472                    .map(pair_to_string)
473                    .collect()
474            )
475        );
476        assert_eq!(Labels::parse("==="), Labels(HashMap::new()),);
477    }
478
479    #[test]
480    fn test_golang_float() {
481        assert_eq!(parse_golang_float("1.0"), Ok(1.0f64));
482        assert_eq!(parse_golang_float("-1.0"), Ok(-1.0f64));
483        assert!(parse_golang_float("NaN").unwrap().is_nan());
484        assert_eq!(parse_golang_float("Inf"), Ok(std::f64::INFINITY));
485        assert_eq!(parse_golang_float("+Inf"), Ok(std::f64::INFINITY));
486        assert_eq!(parse_golang_float("-Inf"), Ok(std::f64::NEG_INFINITY));
487    }
488
489    #[test]
490    fn test_parse_samples() {
491        let scrape = r#"
492# HELP http_requests_total The total number of HTTP requests.
493# TYPE http_requests_total counter
494http_requests_total{method="post",code="200"} 1027 1395066363000
495http_requests_total{method="post",code="400"}    3 1395066363000
496
497# Escaping in label values:
498msdos_file_access_time_seconds{path="C:\\DIR\\FILE.TXT",error="Cannot find file:\n\"FILE.TXT\""} 1.458255915e9
499
500# Minimalistic line:
501metric_without_timestamp_and_labels 12.47
502
503# A weird metric from before the epoch:
504something_weird{problem="division by zero"} +Inf -3982045
505
506# A histogram, which has a pretty complex representation in the text format:
507# HELP http_request_duration_seconds A histogram of the request duration.
508# TYPE http_request_duration_seconds histogram
509http_request_duration_seconds_bucket{le="0.05"} 24054
510http_request_duration_seconds_bucket{le="0.1"} 33444
511http_request_duration_seconds_bucket{le="0.2"} 100392
512http_request_duration_seconds_bucket{le="0.5"} 129389
513http_request_duration_seconds_bucket{le="1"} 133988
514http_request_duration_seconds_bucket{le="+Inf"} 144320
515http_request_duration_seconds_sum 53423
516http_request_duration_seconds_count 144320
517
518# Finally a summary, which has a complex representation, too:
519# HELP rpc_duration_seconds A summary of the RPC duration in seconds.
520# TYPE rpc_duration_seconds summary
521rpc_duration_seconds{quantile="0.01"} 3102
522rpc_duration_seconds{quantile="0.05"} 3272
523rpc_duration_seconds{quantile="0.5"} 4773
524rpc_duration_seconds{quantile="0.9"} 9001
525rpc_duration_seconds{quantile="0.99"} 76656
526rpc_duration_seconds_sum 1.7560473e+07
527rpc_duration_seconds_count 2693
528"#;
529        let br = io::BufReader::new(scrape.as_bytes());
530        let s = Scrape::parse(br.lines()).unwrap();
531        assert_eq!(s.samples.len(), 11);
532
533        fn assert_match_sample<'a, F>(samples: &'a Vec<Sample>, f: F) -> &'a Sample
534        where
535            for<'r> F: FnMut(&'r &'a Sample) -> bool,
536        {
537            samples.iter().filter(f).next().as_ref().unwrap()
538        }
539        assert_eq!(
540            assert_match_sample(&s.samples, |s| s.metric == "http_requests_total"
541                && s.labels.get("code") == Some("200")),
542            &Sample {
543                metric: "http_requests_total".to_string(),
544                value: Value::Counter(1027f64),
545                labels: Labels(
546                    [("method", "post"), ("code", "200")]
547                        .iter()
548                        .map(pair_to_string)
549                        .collect()
550                ),
551                timestamp: Utc.timestamp_millis_opt(1395066363000).unwrap(),
552            }
553        );
554        assert_eq!(
555            assert_match_sample(&s.samples, |s| s.metric == "http_requests_total"
556                && s.labels.get("code") == Some("400")),
557            &Sample {
558                metric: "http_requests_total".to_string(),
559                value: Value::Counter(3f64),
560                labels: Labels(
561                    [("method", "post"), ("code", "400")]
562                        .iter()
563                        .map(pair_to_string)
564                        .collect()
565                ),
566                timestamp: Utc.timestamp_millis_opt(1395066363000).unwrap(),
567            }
568        );
569    }
570
571    #[test]
572    fn test_parse_complex_formats_with_labels() {
573        let scrape = r#"
574# A histogram, which has a pretty complex representation in the text format:
575# HELP http_request_duration_seconds A histogram of the request duration.
576# TYPE http_request_duration_seconds histogram
577http_request_duration_seconds_bucket{service="main",code="200",le="0.05"} 24054 1395066363000
578http_request_duration_seconds_bucket{code="200",le="0.1",service="main"} 33444 1395066363000
579http_request_duration_seconds_bucket{code="200",service="main",le="0.2"} 100392 1395066363000
580http_request_duration_seconds_bucket{le="0.5",code="200",service="main"} 129389 1395066363000
581http_request_duration_seconds_bucket{service="main",le="1",code="200"} 133988 1395066363000
582http_request_duration_seconds_bucket{le="+Inf",service="main",code="200"} 144320 1395066363000
583http_request_duration_seconds_sum{service="main",code="200"} 53423 1395066363000
584http_request_duration_seconds_count{service="main",code="200"} 144320 1395066363000
585
586# Finally a summary, which has a complex representation, too:
587# HELP rpc_duration_seconds A summary of the RPC duration in seconds.
588# TYPE rpc_duration_seconds summary
589rpc_duration_seconds{service="backup",code="400",quantile="0.01"} 3102 1395066363000
590rpc_duration_seconds{code="400",service="backup",quantile="0.05"} 3272 1395066363000
591rpc_duration_seconds{code="400",quantile="0.5",service="backup"} 4773 1395066363000
592rpc_duration_seconds{service="backup",quantile="0.9",code="400"} 9001 1395066363000
593rpc_duration_seconds{quantile="0.99",service="backup",code="400"} 76656 1395066363000
594rpc_duration_seconds_sum{service="backup",code="400"} 1.7560473e+07 1395066363000
595rpc_duration_seconds_count{service="backup",code="400"} 2693 1395066363000
596"#;
597        let br = io::BufReader::new(scrape.as_bytes());
598        let s = Scrape::parse(br.lines()).unwrap();
599        assert_eq!(s.samples.len(), 6);
600
601        fn assert_match_sample<'a, F>(samples: &'a Vec<Sample>, f: F) -> &'a Sample
602        where
603            for<'r> F: FnMut(&'r &'a Sample) -> bool,
604        {
605            samples.iter().filter(f).next().as_ref().unwrap()
606        }
607        assert_eq!(
608            assert_match_sample(&s.samples, |s| s.metric == "http_request_duration_seconds"
609                && s.labels.get("service") == Some("main")),
610            &Sample {
611                metric: "http_request_duration_seconds".to_string(),
612                value: Value::Histogram(vec![
613                    HistogramCount {
614                        less_than: 0.05f64,
615                        count: 24054f64,
616                    },
617                    HistogramCount {
618                        less_than: 0.1f64,
619                        count: 33444f64,
620                    },
621                    HistogramCount {
622                        less_than: 0.2f64,
623                        count: 100392f64,
624                    },
625                    HistogramCount {
626                        less_than: 0.5f64,
627                        count: 129389f64,
628                    },
629                    HistogramCount {
630                        less_than: 1.0f64,
631                        count: 133988f64,
632                    },
633                    HistogramCount {
634                        less_than: f64::INFINITY,
635                        count: 144320f64,
636                    },
637                ]),
638                labels: Labels(
639                    [("service", "main"), ("code", "200")]
640                        .iter()
641                        .map(pair_to_string)
642                        .collect()
643                ),
644                timestamp: Utc.timestamp_millis_opt(1395066363000).unwrap(),
645            }
646        );
647        assert_eq!(
648            assert_match_sample(&s.samples, |s| s.metric == "rpc_duration_seconds"
649                && s.labels.get("service") == Some("backup")),
650            &Sample {
651                metric: "rpc_duration_seconds".to_string(),
652                value: Value::Summary(vec![
653                    SummaryCount {
654                        quantile: 0.01f64,
655                        count: 3102f64
656                    },
657                    SummaryCount {
658                        quantile: 0.05f64,
659                        count: 3272f64,
660                    },
661                    SummaryCount {
662                        quantile: 0.5f64,
663                        count: 4773f64,
664                    },
665                    SummaryCount {
666                        quantile: 0.9f64,
667                        count: 9001f64,
668                    },
669                    SummaryCount {
670                        quantile: 0.99f64,
671                        count: 76656f64
672                    }
673                ]),
674                labels: Labels(
675                    [("service", "backup"), ("code", "400")]
676                        .iter()
677                        .map(pair_to_string)
678                        .collect()
679                ),
680                timestamp: Utc.timestamp_millis_opt(1395066363000).unwrap(),
681            }
682        );
683    }
684}