1use chrono::{DateTime, TimeZone, Utc};
2use itertools::Itertools;
3use once_cell::sync::Lazy;
4use regex::Regex;
5
6use std::collections::{BTreeMap, HashMap};
7use std::io;
8use std::ops::Deref;
9
10static HELP_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"^#\s+HELP\s+(\w+)\s+(.+)$").unwrap());
11static TYPE_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"^#\s+TYPE\s+(\w+)\s+(\w+)").unwrap());
12static SAMPLE_RE: Lazy<Regex> = Lazy::new(|| {
13 Regex::new(r"^(?P<name>\w+)(\{(?P<labels>[^}]+)\})?\s+(?P<value>\S+)(\s+(?P<timestamp>\S+))?")
14 .unwrap()
15});
16
17#[derive(Debug, Eq, PartialEq)]
18pub enum LineInfo<'a> {
19 Doc {
20 metric_name: &'a str,
21 doc: &'a str,
22 },
23 Type {
24 metric_name: String,
25 metric_alias: Option<String>,
26 sample_type: SampleType,
27 },
28 Sample {
29 metric_name: &'a str,
30 labels: Option<&'a str>,
31 value: &'a str,
32 timestamp: Option<&'a str>,
33 },
34 Empty,
35 Ignored,
36}
37
38#[derive(Debug, Eq, PartialEq, Clone, Copy)]
39pub enum SampleType {
40 Counter,
41 Gauge,
42 Histogram,
43 Summary,
44 Untyped,
45}
46
47impl SampleType {
48 pub fn parse(s: &str) -> SampleType {
49 match s {
50 "counter" => SampleType::Counter,
51 "gauge" => SampleType::Gauge,
52 "histogram" => SampleType::Histogram,
53 "summary" => SampleType::Summary,
54 _ => SampleType::Untyped,
55 }
56 }
57}
58
59impl<'a> LineInfo<'a> {
60 pub fn parse(line: &'a str) -> LineInfo<'a> {
61 let line = line.trim();
62 if line.is_empty() {
63 return LineInfo::Empty;
64 }
65 if let Some(ref caps) = HELP_RE.captures(line) {
66 return match (caps.get(1), caps.get(2)) {
67 (Some(ref metric_name), Some(ref doc)) => LineInfo::Doc {
68 metric_name: metric_name.as_str(),
69 doc: doc.as_str(),
70 },
71 _ => LineInfo::Ignored,
72 };
73 }
74 if let Some(ref caps) = TYPE_RE.captures(line) {
75 return match (caps.get(1), caps.get(2)) {
76 (Some(ref metric_name), Some(ref sample_type)) => {
77 let sample_type = SampleType::parse(sample_type.as_str());
78 LineInfo::Type {
79 metric_name: match sample_type {
80 SampleType::Histogram => format!("{}_bucket", metric_name.as_str()),
81 _ => metric_name.as_str().to_string(),
82 },
83 metric_alias: match sample_type {
84 SampleType::Histogram => Some(metric_name.as_str().to_string()),
85 _ => None,
86 },
87 sample_type,
88 }
89 }
90 _ => LineInfo::Ignored,
91 };
92 }
93 match SAMPLE_RE.captures(line) {
94 Some(ref caps) => {
95 return match (
96 caps.name("name"),
97 caps.name("labels"),
98 caps.name("value"),
99 caps.name("timestamp"),
100 ) {
101 (Some(ref name), labels, Some(ref value), timestamp) => LineInfo::Sample {
102 metric_name: name.as_str(),
103 labels: labels.map(|c| c.as_str()),
104 value: value.as_str(),
105 timestamp: timestamp.map(|c| c.as_str()),
106 },
107 _ => LineInfo::Ignored,
108 };
109 }
110 None => LineInfo::Ignored,
111 }
112 }
113}
114
115#[derive(Clone, Debug, PartialEq)]
116pub struct Sample {
117 pub metric: String,
118 pub value: Value,
119 pub labels: Labels,
120 pub timestamp: DateTime<Utc>,
121}
122
123fn parse_bucket(s: &str, label: &str) -> Option<(Labels, f64)> {
124 let mut labs = HashMap::new();
125
126 let mut value = None;
127 for kv in s.split(',') {
128 let kvpair = kv.split('=').collect::<Vec<_>>();
129 if kvpair.len() != 2 || kvpair[0].is_empty() {
130 continue;
131 }
132 let (k, v) = (kvpair[0], kvpair[1].trim_matches('"'));
133 if k == label {
134 value = match parse_golang_float(v) {
135 Ok(v) => Some(v),
136 Err(_) => return None,
137 };
138 } else {
139 labs.insert(k.to_string(), v.to_string());
140 }
141 }
142
143 value.map(|v| (Labels(labs), v))
144}
145
146#[derive(Clone, Debug, PartialEq)]
147pub struct HistogramCount {
148 pub less_than: f64,
149 pub count: f64,
150}
151
152#[derive(Clone, Debug, PartialEq)]
153pub struct SummaryCount {
154 pub quantile: f64,
155 pub count: f64,
156}
157
158#[derive(Clone, Debug, Eq, PartialEq)]
159pub struct Labels(HashMap<String, String>);
160
161impl Labels {
162 fn new() -> Labels {
163 Labels(HashMap::new())
164 }
165
166 fn parse(s: &str) -> Labels {
167 let mut l = HashMap::new();
168 for kv in s.split(',') {
169 let kvpair = kv.split('=').collect::<Vec<_>>();
170 if kvpair.len() != 2 || kvpair[0].is_empty() {
171 continue;
172 }
173 l.insert(
174 kvpair[0].to_string(),
175 kvpair[1].trim_matches('"').to_string(),
176 );
177 }
178 Labels(l)
179 }
180
181 pub fn get(&self, name: &str) -> Option<&str> {
182 self.0.get(name).map(|x| x.as_str())
183 }
184}
185
186impl Deref for Labels {
187 type Target = HashMap<String, String>;
188
189 fn deref(&self) -> &Self::Target {
190 &self.0
191 }
192}
193
194impl core::fmt::Display for Labels {
195 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> {
196 write!(
197 f,
198 "{}",
199 Itertools::intersperse(
200 self.iter()
201 .collect::<BTreeMap<_, _>>()
202 .into_iter()
203 .map(|(k, v)| format!(r#"{}="{}""#, k, v)),
204 ",".to_string()
205 )
206 .collect::<String>()
207 )
208 }
209}
210
211#[derive(Clone, Debug, PartialEq)]
212pub enum Value {
213 Counter(f64),
214 Gauge(f64),
215 Histogram(Vec<HistogramCount>),
216 Summary(Vec<SummaryCount>),
217 Untyped(f64),
218}
219
220impl Value {
221 fn push_histogram(&mut self, h: HistogramCount) {
222 if let &mut Value::Histogram(ref mut hs) = self {
223 hs.push(h)
224 }
225 }
226
227 fn push_summary(&mut self, s: SummaryCount) {
228 if let &mut Value::Summary(ref mut ss) = self {
229 ss.push(s)
230 }
231 }
232}
233
234#[derive(Clone, Debug)]
235pub struct Scrape {
236 pub docs: HashMap<String, String>,
237 pub samples: Vec<Sample>,
238}
239
240fn parse_golang_float(s: &str) -> Result<f64, <f64 as std::str::FromStr>::Err> {
241 match s.to_lowercase().as_str() {
242 "nan" => Ok(std::f64::NAN), s => s.parse::<f64>(), }
245}
246
247impl Scrape {
248 pub fn parse(lines: impl Iterator<Item = io::Result<String>>) -> io::Result<Scrape> {
249 Scrape::parse_at(lines, Utc::now())
250 }
251
252 pub fn parse_at(
253 lines: impl Iterator<Item = io::Result<String>>,
254 sample_time: DateTime<Utc>,
255 ) -> io::Result<Scrape> {
256 let mut docs: HashMap<String, String> = HashMap::new();
257 let mut types: HashMap<String, SampleType> = HashMap::new();
258 let mut aliases: HashMap<String, String> = HashMap::new();
259 let mut buckets: HashMap<(String, String), Sample> = HashMap::new();
260 let mut samples: Vec<Sample> = vec![];
261
262 for read_line in lines {
263 let line = match read_line {
264 Ok(line) => line,
265 Err(e) => return Err(e),
266 };
267 match LineInfo::parse(&line) {
268 LineInfo::Doc {
269 ref metric_name,
270 ref doc,
271 } => {
272 docs.insert(metric_name.to_string(), doc.to_string());
273 }
274 LineInfo::Type {
275 ref metric_name,
276 ref metric_alias,
277 ref sample_type,
278 } => {
279 types.insert(metric_name.to_string(), *sample_type);
280 if let Some(alias) = metric_alias.as_ref() {
281 aliases.insert(metric_name.to_string(), alias.to_string());
282 }
283 }
284 LineInfo::Sample {
285 metric_name,
286 ref labels,
287 value,
288 timestamp,
289 } => {
290 let fvalue = if let Ok(v) = parse_golang_float(value) {
292 v
293 } else {
294 continue;
295 };
296 let timestamp = if let Some(time) = timestamp
298 .and_then(|x| x.parse::<i64>().ok())
299 .and_then(|ts_millis| Utc.timestamp_millis_opt(ts_millis).single())
300 {
301 time
302 } else {
303 sample_time
304 };
305 match (types.get(metric_name), labels) {
306 (Some(SampleType::Histogram), Some(labels)) => {
307 if let Some((labels, lt)) = parse_bucket(labels, "le") {
308 let sample = buckets
309 .entry((metric_name.to_string(), labels.to_string()))
310 .or_insert(Sample {
311 metric: aliases
312 .get(metric_name)
313 .map(ToString::to_string)
314 .unwrap_or_else(|| metric_name.to_string()),
315 labels,
316 value: Value::Histogram(vec![]),
317 timestamp,
318 });
319 sample.value.push_histogram(HistogramCount {
320 less_than: lt,
321 count: fvalue,
322 })
323 }
324 }
325 (Some(SampleType::Summary), Some(labels)) => {
326 if let Some((labels, q)) = parse_bucket(labels, "quantile") {
327 let sample = buckets
328 .entry((metric_name.to_string(), labels.to_string()))
329 .or_insert(Sample {
330 metric: metric_name.to_string(),
331 labels,
332 value: Value::Summary(vec![]),
333 timestamp,
334 });
335 sample.value.push_summary(SummaryCount {
336 quantile: q,
337 count: fvalue,
338 })
339 }
340 }
341 (ty, labels) => samples.push(Sample {
342 metric: metric_name.to_string(),
343 labels: labels.map_or(Labels::new(), Labels::parse),
344 value: match ty {
345 Some(SampleType::Counter) => Value::Counter(fvalue),
346 Some(SampleType::Gauge) => Value::Gauge(fvalue),
347 _ => Value::Untyped(fvalue),
348 },
349 timestamp,
350 }),
351 };
352 }
353 _ => {}
354 }
355 }
356 samples.extend(buckets.drain().map(|(_k, v)| v).collect::<Vec<_>>());
357 Ok(Scrape { docs, samples })
358 }
359}
360
361#[cfg(test)]
362mod tests {
363 use std::io::BufRead;
364
365 use super::*;
366
367 #[test]
368 fn test_lineinfo_parse() {
369 assert_eq!(
370 LineInfo::parse("foo 2"),
371 LineInfo::Sample {
372 metric_name: "foo",
373 value: "2",
374 labels: None,
375 timestamp: None,
376 }
377 );
378 assert_eq!(
379 LineInfo::parse("foo wtf -1"),
380 LineInfo::Sample {
381 metric_name: "foo",
382 value: "wtf",
383 labels: None,
384 timestamp: Some("-1"),
385 }
386 );
387 assert_eq!(LineInfo::parse("foo=2"), LineInfo::Ignored,);
388 assert_eq!(
389 LineInfo::parse("foo 2 1543182234"),
390 LineInfo::Sample {
391 metric_name: "foo",
392 value: "2",
393 labels: None,
394 timestamp: Some("1543182234"),
395 }
396 );
397 assert_eq!(
398 LineInfo::parse("foo{bar=baz} 2 1543182234"),
399 LineInfo::Sample {
400 metric_name: "foo",
401 value: "2",
402 labels: Some("bar=baz"),
403 timestamp: Some("1543182234"),
404 }
405 );
406 assert_eq!(
407 LineInfo::parse("foo{bar=baz,quux=nonce} 2 1543182234"),
408 LineInfo::Sample {
409 metric_name: "foo",
410 value: "2",
411 labels: Some("bar=baz,quux=nonce"),
412 timestamp: Some("1543182234"),
413 }
414 );
415 assert_eq!(
416 LineInfo::parse("# HELP foo this is a docstring"),
417 LineInfo::Doc {
418 metric_name: "foo",
419 doc: "this is a docstring"
420 },
421 );
422 assert_eq!(
423 LineInfo::parse("# TYPE foobar bazquux"),
424 LineInfo::Type {
425 metric_name: "foobar".to_string(),
426 metric_alias: None,
427 sample_type: SampleType::Untyped,
428 },
429 );
430 }
431
432 fn pair_to_string(pair: &(&str, &str)) -> (String, String) {
433 (pair.0.to_string(), pair.1.to_string())
434 }
435
436 #[test]
437 fn test_labels_parse() {
438 assert_eq!(
439 Labels::parse("foo=bar"),
440 Labels([("foo", "bar")].iter().map(pair_to_string).collect())
441 );
442 assert_eq!(
443 Labels::parse("foo=bar,"),
444 Labels([("foo", "bar")].iter().map(pair_to_string).collect())
445 );
446 assert_eq!(
447 Labels::parse(",foo=bar,"),
448 Labels([("foo", "bar")].iter().map(pair_to_string).collect())
449 );
450 assert_eq!(
451 Labels::parse("=,foo=bar,"),
452 Labels([("foo", "bar")].iter().map(pair_to_string).collect())
453 );
454 assert_eq!(
455 Labels::parse(r#"foo="bar""#),
456 Labels([("foo", "bar")].iter().map(pair_to_string).collect())
457 );
458 assert_eq!(
459 Labels::parse(r#"foo="bar",baz="quux""#),
460 Labels(
461 [("foo", "bar"), ("baz", "quux")]
462 .iter()
463 .map(pair_to_string)
464 .collect()
465 )
466 );
467 assert_eq!(
468 Labels::parse(r#"foo="foo bar",baz="baz quux""#),
469 Labels(
470 [("foo", "foo bar"), ("baz", "baz quux")]
471 .iter()
472 .map(pair_to_string)
473 .collect()
474 )
475 );
476 assert_eq!(Labels::parse("==="), Labels(HashMap::new()),);
477 }
478
479 #[test]
480 fn test_golang_float() {
481 assert_eq!(parse_golang_float("1.0"), Ok(1.0f64));
482 assert_eq!(parse_golang_float("-1.0"), Ok(-1.0f64));
483 assert!(parse_golang_float("NaN").unwrap().is_nan());
484 assert_eq!(parse_golang_float("Inf"), Ok(std::f64::INFINITY));
485 assert_eq!(parse_golang_float("+Inf"), Ok(std::f64::INFINITY));
486 assert_eq!(parse_golang_float("-Inf"), Ok(std::f64::NEG_INFINITY));
487 }
488
489 #[test]
490 fn test_parse_samples() {
491 let scrape = r#"
492# HELP http_requests_total The total number of HTTP requests.
493# TYPE http_requests_total counter
494http_requests_total{method="post",code="200"} 1027 1395066363000
495http_requests_total{method="post",code="400"} 3 1395066363000
496
497# Escaping in label values:
498msdos_file_access_time_seconds{path="C:\\DIR\\FILE.TXT",error="Cannot find file:\n\"FILE.TXT\""} 1.458255915e9
499
500# Minimalistic line:
501metric_without_timestamp_and_labels 12.47
502
503# A weird metric from before the epoch:
504something_weird{problem="division by zero"} +Inf -3982045
505
506# A histogram, which has a pretty complex representation in the text format:
507# HELP http_request_duration_seconds A histogram of the request duration.
508# TYPE http_request_duration_seconds histogram
509http_request_duration_seconds_bucket{le="0.05"} 24054
510http_request_duration_seconds_bucket{le="0.1"} 33444
511http_request_duration_seconds_bucket{le="0.2"} 100392
512http_request_duration_seconds_bucket{le="0.5"} 129389
513http_request_duration_seconds_bucket{le="1"} 133988
514http_request_duration_seconds_bucket{le="+Inf"} 144320
515http_request_duration_seconds_sum 53423
516http_request_duration_seconds_count 144320
517
518# Finally a summary, which has a complex representation, too:
519# HELP rpc_duration_seconds A summary of the RPC duration in seconds.
520# TYPE rpc_duration_seconds summary
521rpc_duration_seconds{quantile="0.01"} 3102
522rpc_duration_seconds{quantile="0.05"} 3272
523rpc_duration_seconds{quantile="0.5"} 4773
524rpc_duration_seconds{quantile="0.9"} 9001
525rpc_duration_seconds{quantile="0.99"} 76656
526rpc_duration_seconds_sum 1.7560473e+07
527rpc_duration_seconds_count 2693
528"#;
529 let br = io::BufReader::new(scrape.as_bytes());
530 let s = Scrape::parse(br.lines()).unwrap();
531 assert_eq!(s.samples.len(), 11);
532
533 fn assert_match_sample<'a, F>(samples: &'a Vec<Sample>, f: F) -> &'a Sample
534 where
535 for<'r> F: FnMut(&'r &'a Sample) -> bool,
536 {
537 samples.iter().filter(f).next().as_ref().unwrap()
538 }
539 assert_eq!(
540 assert_match_sample(&s.samples, |s| s.metric == "http_requests_total"
541 && s.labels.get("code") == Some("200")),
542 &Sample {
543 metric: "http_requests_total".to_string(),
544 value: Value::Counter(1027f64),
545 labels: Labels(
546 [("method", "post"), ("code", "200")]
547 .iter()
548 .map(pair_to_string)
549 .collect()
550 ),
551 timestamp: Utc.timestamp_millis_opt(1395066363000).unwrap(),
552 }
553 );
554 assert_eq!(
555 assert_match_sample(&s.samples, |s| s.metric == "http_requests_total"
556 && s.labels.get("code") == Some("400")),
557 &Sample {
558 metric: "http_requests_total".to_string(),
559 value: Value::Counter(3f64),
560 labels: Labels(
561 [("method", "post"), ("code", "400")]
562 .iter()
563 .map(pair_to_string)
564 .collect()
565 ),
566 timestamp: Utc.timestamp_millis_opt(1395066363000).unwrap(),
567 }
568 );
569 }
570
571 #[test]
572 fn test_parse_complex_formats_with_labels() {
573 let scrape = r#"
574# A histogram, which has a pretty complex representation in the text format:
575# HELP http_request_duration_seconds A histogram of the request duration.
576# TYPE http_request_duration_seconds histogram
577http_request_duration_seconds_bucket{service="main",code="200",le="0.05"} 24054 1395066363000
578http_request_duration_seconds_bucket{code="200",le="0.1",service="main"} 33444 1395066363000
579http_request_duration_seconds_bucket{code="200",service="main",le="0.2"} 100392 1395066363000
580http_request_duration_seconds_bucket{le="0.5",code="200",service="main"} 129389 1395066363000
581http_request_duration_seconds_bucket{service="main",le="1",code="200"} 133988 1395066363000
582http_request_duration_seconds_bucket{le="+Inf",service="main",code="200"} 144320 1395066363000
583http_request_duration_seconds_sum{service="main",code="200"} 53423 1395066363000
584http_request_duration_seconds_count{service="main",code="200"} 144320 1395066363000
585
586# Finally a summary, which has a complex representation, too:
587# HELP rpc_duration_seconds A summary of the RPC duration in seconds.
588# TYPE rpc_duration_seconds summary
589rpc_duration_seconds{service="backup",code="400",quantile="0.01"} 3102 1395066363000
590rpc_duration_seconds{code="400",service="backup",quantile="0.05"} 3272 1395066363000
591rpc_duration_seconds{code="400",quantile="0.5",service="backup"} 4773 1395066363000
592rpc_duration_seconds{service="backup",quantile="0.9",code="400"} 9001 1395066363000
593rpc_duration_seconds{quantile="0.99",service="backup",code="400"} 76656 1395066363000
594rpc_duration_seconds_sum{service="backup",code="400"} 1.7560473e+07 1395066363000
595rpc_duration_seconds_count{service="backup",code="400"} 2693 1395066363000
596"#;
597 let br = io::BufReader::new(scrape.as_bytes());
598 let s = Scrape::parse(br.lines()).unwrap();
599 assert_eq!(s.samples.len(), 6);
600
601 fn assert_match_sample<'a, F>(samples: &'a Vec<Sample>, f: F) -> &'a Sample
602 where
603 for<'r> F: FnMut(&'r &'a Sample) -> bool,
604 {
605 samples.iter().filter(f).next().as_ref().unwrap()
606 }
607 assert_eq!(
608 assert_match_sample(&s.samples, |s| s.metric == "http_request_duration_seconds"
609 && s.labels.get("service") == Some("main")),
610 &Sample {
611 metric: "http_request_duration_seconds".to_string(),
612 value: Value::Histogram(vec![
613 HistogramCount {
614 less_than: 0.05f64,
615 count: 24054f64,
616 },
617 HistogramCount {
618 less_than: 0.1f64,
619 count: 33444f64,
620 },
621 HistogramCount {
622 less_than: 0.2f64,
623 count: 100392f64,
624 },
625 HistogramCount {
626 less_than: 0.5f64,
627 count: 129389f64,
628 },
629 HistogramCount {
630 less_than: 1.0f64,
631 count: 133988f64,
632 },
633 HistogramCount {
634 less_than: f64::INFINITY,
635 count: 144320f64,
636 },
637 ]),
638 labels: Labels(
639 [("service", "main"), ("code", "200")]
640 .iter()
641 .map(pair_to_string)
642 .collect()
643 ),
644 timestamp: Utc.timestamp_millis_opt(1395066363000).unwrap(),
645 }
646 );
647 assert_eq!(
648 assert_match_sample(&s.samples, |s| s.metric == "rpc_duration_seconds"
649 && s.labels.get("service") == Some("backup")),
650 &Sample {
651 metric: "rpc_duration_seconds".to_string(),
652 value: Value::Summary(vec![
653 SummaryCount {
654 quantile: 0.01f64,
655 count: 3102f64
656 },
657 SummaryCount {
658 quantile: 0.05f64,
659 count: 3272f64,
660 },
661 SummaryCount {
662 quantile: 0.5f64,
663 count: 4773f64,
664 },
665 SummaryCount {
666 quantile: 0.9f64,
667 count: 9001f64,
668 },
669 SummaryCount {
670 quantile: 0.99f64,
671 count: 76656f64
672 }
673 ]),
674 labels: Labels(
675 [("service", "backup"), ("code", "400")]
676 .iter()
677 .map(pair_to_string)
678 .collect()
679 ),
680 timestamp: Utc.timestamp_millis_opt(1395066363000).unwrap(),
681 }
682 );
683 }
684}