1use std::{borrow::Cow, collections::HashSet, error::Error, sync::Arc};
2
3use opentelemetry::{
4 metrics::{AsyncInstrument, SyncInstrument},
5 InstrumentationScope, Key, KeyValue,
6};
7
8use crate::metrics::{aggregation::Aggregation, internal::Measure};
9
10use super::meter::{
11 INSTRUMENT_NAME_EMPTY, INSTRUMENT_NAME_FIRST_ALPHABETIC, INSTRUMENT_NAME_INVALID_CHAR,
12 INSTRUMENT_NAME_LENGTH, INSTRUMENT_UNIT_INVALID_CHAR, INSTRUMENT_UNIT_LENGTH,
13};
14
15use super::Temporality;
16
17#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
19pub enum InstrumentKind {
20 Counter,
23 UpDownCounter,
26 Histogram,
29 ObservableCounter,
32 ObservableUpDownCounter,
35
36 Gauge,
39 ObservableGauge,
42}
43
44impl InstrumentKind {
45 pub(crate) fn temporality_preference(&self, temporality: Temporality) -> Temporality {
49 match temporality {
50 Temporality::Cumulative => Temporality::Cumulative,
51 Temporality::Delta => match self {
52 Self::Counter
53 | Self::Histogram
54 | Self::ObservableCounter
55 | Self::Gauge
56 | Self::ObservableGauge => Temporality::Delta,
57 Self::UpDownCounter | InstrumentKind::ObservableUpDownCounter => {
58 Temporality::Cumulative
59 }
60 },
61 Temporality::LowMemory => match self {
62 Self::Counter | InstrumentKind::Histogram => Temporality::Delta,
63 Self::ObservableCounter
64 | Self::Gauge
65 | Self::ObservableGauge
66 | Self::UpDownCounter
67 | Self::ObservableUpDownCounter => Temporality::Cumulative,
68 },
69 }
70 }
71}
72#[derive(Clone, Debug, PartialEq)]
97pub struct Instrument {
98 pub(crate) name: Cow<'static, str>,
100 pub(crate) description: Cow<'static, str>,
102 pub(crate) kind: InstrumentKind,
104 pub(crate) unit: Cow<'static, str>,
106 pub(crate) scope: InstrumentationScope,
108}
109
110impl Instrument {
111 pub fn name(&self) -> &str {
113 self.name.as_ref()
114 }
115
116 pub fn kind(&self) -> InstrumentKind {
118 self.kind
119 }
120
121 pub fn unit(&self) -> &str {
123 self.unit.as_ref()
124 }
125
126 pub fn scope(&self) -> &InstrumentationScope {
128 &self.scope
129 }
130}
131
132#[derive(Default, Debug)]
148pub struct StreamBuilder {
149 name: Option<Cow<'static, str>>,
150 description: Option<Cow<'static, str>>,
151 unit: Option<Cow<'static, str>>,
152 aggregation: Option<Aggregation>,
153 allowed_attribute_keys: Option<Arc<HashSet<Key>>>,
154 cardinality_limit: Option<usize>,
155}
156
157impl StreamBuilder {
158 pub(crate) fn new() -> Self {
160 StreamBuilder::default()
161 }
162
163 pub fn with_name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
165 self.name = Some(name.into());
166 self
167 }
168
169 pub fn with_description(mut self, description: impl Into<Cow<'static, str>>) -> Self {
171 self.description = Some(description.into());
172 self
173 }
174
175 pub fn with_unit(mut self, unit: impl Into<Cow<'static, str>>) -> Self {
177 self.unit = Some(unit.into());
178 self
179 }
180
181 #[cfg(feature = "spec_unstable_metrics_views")]
182 pub fn with_aggregation(mut self, aggregation: Aggregation) -> Self {
185 self.aggregation = Some(aggregation);
186 self
187 }
188
189 #[cfg(feature = "spec_unstable_metrics_views")]
190 pub fn with_allowed_attribute_keys(
196 mut self,
197 attribute_keys: impl IntoIterator<Item = Key>,
198 ) -> Self {
199 self.allowed_attribute_keys = Some(Arc::new(attribute_keys.into_iter().collect()));
200 self
201 }
202
203 pub fn with_cardinality_limit(mut self, limit: usize) -> Self {
205 self.cardinality_limit = Some(limit);
206 self
207 }
208
209 pub fn build(self) -> Result<Stream, Box<dyn Error>> {
215 if let Some(name) = &self.name {
223 if name.is_empty() {
224 return Err(INSTRUMENT_NAME_EMPTY.into());
225 }
226
227 if name.len() > super::meter::INSTRUMENT_NAME_MAX_LENGTH {
228 return Err(INSTRUMENT_NAME_LENGTH.into());
229 }
230
231 if name.starts_with(|c: char| !c.is_ascii_alphabetic()) {
232 return Err(INSTRUMENT_NAME_FIRST_ALPHABETIC.into());
233 }
234
235 if name.contains(|c: char| {
236 !c.is_ascii_alphanumeric()
237 && !super::meter::INSTRUMENT_NAME_ALLOWED_NON_ALPHANUMERIC_CHARS.contains(&c)
238 }) {
239 return Err(INSTRUMENT_NAME_INVALID_CHAR.into());
240 }
241 }
242
243 if let Some(unit) = &self.unit {
245 if unit.len() > super::meter::INSTRUMENT_UNIT_NAME_MAX_LENGTH {
246 return Err(INSTRUMENT_UNIT_LENGTH.into());
247 }
248
249 if unit.contains(|c: char| !c.is_ascii()) {
250 return Err(INSTRUMENT_UNIT_INVALID_CHAR.into());
251 }
252 }
253
254 if let Some(limit) = self.cardinality_limit {
256 if limit == 0 {
257 return Err("Cardinality limit must be greater than 0".into());
258 }
259 }
260
261 if let Some(Aggregation::ExplicitBucketHistogram { boundaries, .. }) = &self.aggregation {
263 validate_bucket_boundaries(boundaries)?;
264 }
265
266 Ok(Stream {
267 name: self.name,
268 description: self.description,
269 unit: self.unit,
270 aggregation: self.aggregation,
271 allowed_attribute_keys: self.allowed_attribute_keys,
272 cardinality_limit: self.cardinality_limit,
273 })
274 }
275}
276
277fn validate_bucket_boundaries(boundaries: &[f64]) -> Result<(), String> {
278 for boundary in boundaries {
280 if boundary.is_nan() || boundary.is_infinite() {
281 return Err(
282 "Bucket boundaries must not contain NaN, Infinity, or -Infinity".to_string(),
283 );
284 }
285 }
286
287 for i in 1..boundaries.len() {
289 if boundaries[i] <= boundaries[i - 1] {
290 return Err(
291 "Bucket boundaries must be sorted and not contain any duplicates".to_string(),
292 );
293 }
294 }
295
296 Ok(())
297}
298
299#[derive(Default, Debug)]
302pub struct Stream {
303 pub(crate) name: Option<Cow<'static, str>>,
305 pub(crate) description: Option<Cow<'static, str>>,
307 pub(crate) unit: Option<Cow<'static, str>>,
309 pub(crate) aggregation: Option<Aggregation>,
311 pub(crate) allowed_attribute_keys: Option<Arc<HashSet<Key>>>,
317
318 pub(crate) cardinality_limit: Option<usize>,
320}
321
322impl Stream {
323 pub fn builder() -> StreamBuilder {
325 StreamBuilder::new()
326 }
327}
328
329#[derive(Debug, PartialEq, Eq, Hash)]
331pub(crate) struct InstrumentId {
332 pub(crate) name: Cow<'static, str>,
334 pub(crate) description: Cow<'static, str>,
336 pub(crate) kind: InstrumentKind,
338 pub(crate) unit: Cow<'static, str>,
340 pub(crate) number: Cow<'static, str>,
342}
343
344impl InstrumentId {
345 pub(crate) fn normalize(&mut self) {
354 if self.name.chars().any(|c| c.is_ascii_uppercase()) {
355 self.name = self.name.to_ascii_lowercase().into();
356 }
357 }
358}
359
360pub(crate) struct ResolvedMeasures<T> {
361 pub(crate) measures: Vec<Arc<dyn Measure<T>>>,
362}
363
364impl<T: Copy + 'static> SyncInstrument<T> for ResolvedMeasures<T> {
365 fn measure(&self, val: T, attrs: &[KeyValue]) {
366 for measure in &self.measures {
367 measure.call(val, attrs)
368 }
369 }
370}
371
372#[derive(Clone)]
373pub(crate) struct Observable<T> {
374 measures: Vec<Arc<dyn Measure<T>>>,
375}
376
377impl<T> Observable<T> {
378 pub(crate) fn new(measures: Vec<Arc<dyn Measure<T>>>) -> Self {
379 Self { measures }
380 }
381}
382
383impl<T: Copy + Send + Sync + 'static> AsyncInstrument<T> for Observable<T> {
384 fn observe(&self, measurement: T, attrs: &[KeyValue]) {
385 for measure in &self.measures {
386 measure.call(measurement, attrs)
387 }
388 }
389}
390
391#[cfg(test)]
392mod tests {
393 use super::StreamBuilder;
394 use crate::metrics::meter::{
395 INSTRUMENT_NAME_EMPTY, INSTRUMENT_NAME_FIRST_ALPHABETIC, INSTRUMENT_NAME_INVALID_CHAR,
396 INSTRUMENT_NAME_LENGTH, INSTRUMENT_UNIT_INVALID_CHAR, INSTRUMENT_UNIT_LENGTH,
397 };
398
399 #[test]
400 fn stream_name_validation() {
401 let stream_name_test_cases = vec![
403 ("validateName", ""),
404 ("_startWithNoneAlphabet", INSTRUMENT_NAME_FIRST_ALPHABETIC),
405 ("utf8char锈", INSTRUMENT_NAME_INVALID_CHAR),
406 ("a".repeat(255).leak(), ""),
407 ("a".repeat(256).leak(), INSTRUMENT_NAME_LENGTH),
408 ("invalid name", INSTRUMENT_NAME_INVALID_CHAR),
409 ("allow/slash", ""),
410 ("allow_under_score", ""),
411 ("allow.dots.ok", ""),
412 ("", INSTRUMENT_NAME_EMPTY),
413 ("\\allow\\slash /sec", INSTRUMENT_NAME_FIRST_ALPHABETIC),
414 ("\\allow\\$$slash /sec", INSTRUMENT_NAME_FIRST_ALPHABETIC),
415 ("Total $ Count", INSTRUMENT_NAME_INVALID_CHAR),
416 (
417 "\\test\\UsagePercent(Total) > 80%",
418 INSTRUMENT_NAME_FIRST_ALPHABETIC,
419 ),
420 ("/not / allowed", INSTRUMENT_NAME_FIRST_ALPHABETIC),
421 ];
422
423 for (name, expected_error) in stream_name_test_cases {
424 let builder = StreamBuilder::new().with_name(name);
425 let result = builder.build();
426
427 if expected_error.is_empty() {
428 assert!(
429 result.is_ok(),
430 "Expected successful build for name '{}', but got error: {:?}",
431 name,
432 result.err()
433 );
434 } else {
435 let err = result.err().unwrap();
436 let err_str = err.to_string();
437 assert!(
438 err_str == expected_error,
439 "For name '{}', expected error '{}', but got '{}'",
440 name,
441 expected_error,
442 err_str
443 );
444 }
445 }
446 }
447
448 #[test]
449 fn stream_unit_validation() {
450 let stream_unit_test_cases = vec![
452 (
453 "0123456789012345678901234567890123456789012345678901234567890123",
454 INSTRUMENT_UNIT_LENGTH,
455 ),
456 ("utf8char锈", INSTRUMENT_UNIT_INVALID_CHAR),
457 ("kb", ""),
458 ("Kb/sec", ""),
459 ("%", ""),
460 ("", ""),
461 ];
462
463 for (unit, expected_error) in stream_unit_test_cases {
464 let builder = StreamBuilder::new().with_name("valid_name").with_unit(unit);
466
467 let result = builder.build();
468
469 if expected_error.is_empty() {
470 assert!(
471 result.is_ok(),
472 "Expected successful build for unit '{}', but got error: {:?}",
473 unit,
474 result.err()
475 );
476 } else {
477 let err = result.err().unwrap();
478 let err_str = err.to_string();
479 assert!(
480 err_str == expected_error,
481 "For unit '{}', expected error '{}', but got '{}'",
482 unit,
483 expected_error,
484 err_str
485 );
486 }
487 }
488 }
489
490 #[test]
491 fn stream_cardinality_limit_validation() {
492 let builder = StreamBuilder::new()
494 .with_name("valid_name")
495 .with_cardinality_limit(0);
496
497 let result = builder.build();
498 assert!(result.is_err(), "Expected error for zero cardinality limit");
499 assert_eq!(
500 result.err().unwrap().to_string(),
501 "Cardinality limit must be greater than 0",
502 "Expected cardinality limit validation error message"
503 );
504
505 let valid_limits = vec![1, 10, 100, 1000];
507 for limit in valid_limits {
508 let builder = StreamBuilder::new()
509 .with_name("valid_name")
510 .with_cardinality_limit(limit);
511
512 let result = builder.build();
513 assert!(
514 result.is_ok(),
515 "Expected successful build for cardinality limit {}, but got error: {:?}",
516 limit,
517 result.err()
518 );
519 }
520 }
521
522 #[test]
523 fn stream_valid_build() {
524 let stream = StreamBuilder::new()
526 .with_name("valid_name")
527 .with_description("Valid description")
528 .with_unit("ms")
529 .with_cardinality_limit(100)
530 .build();
531
532 assert!(
533 stream.is_ok(),
534 "Expected valid Stream to be built successfully"
535 );
536 }
537
538 #[cfg(feature = "spec_unstable_metrics_views")]
539 #[test]
540 fn stream_histogram_bucket_validation() {
541 use super::Aggregation;
542
543 let valid_boundaries = vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0];
545 let builder = StreamBuilder::new()
546 .with_name("valid_histogram")
547 .with_aggregation(Aggregation::ExplicitBucketHistogram {
548 boundaries: valid_boundaries.clone(),
549 record_min_max: true,
550 });
551
552 let result = builder.build();
553 assert!(
554 result.is_ok(),
555 "Expected successful build with valid bucket boundaries"
556 );
557
558 let invalid_nan_boundaries = vec![1.0, 2.0, f64::NAN, 10.0];
562
563 let builder = StreamBuilder::new()
564 .with_name("invalid_histogram_nan")
565 .with_aggregation(Aggregation::ExplicitBucketHistogram {
566 boundaries: invalid_nan_boundaries,
567 record_min_max: true,
568 });
569
570 let result = builder.build();
571 assert!(
572 result.is_err(),
573 "Expected error for NaN in bucket boundaries"
574 );
575 assert_eq!(
576 result.err().unwrap().to_string(),
577 "Bucket boundaries must not contain NaN, Infinity, or -Infinity",
578 "Expected correct validation error for NaN"
579 );
580
581 let invalid_inf_boundaries = vec![1.0, 5.0, f64::INFINITY, 100.0];
583
584 let builder = StreamBuilder::new()
585 .with_name("invalid_histogram_inf")
586 .with_aggregation(Aggregation::ExplicitBucketHistogram {
587 boundaries: invalid_inf_boundaries,
588 record_min_max: true,
589 });
590
591 let result = builder.build();
592 assert!(
593 result.is_err(),
594 "Expected error for Infinity in bucket boundaries"
595 );
596 assert_eq!(
597 result.err().unwrap().to_string(),
598 "Bucket boundaries must not contain NaN, Infinity, or -Infinity",
599 "Expected correct validation error for Infinity"
600 );
601
602 let invalid_neg_inf_boundaries = vec![f64::NEG_INFINITY, 5.0, 10.0, 100.0];
604
605 let builder = StreamBuilder::new()
606 .with_name("invalid_histogram_neg_inf")
607 .with_aggregation(Aggregation::ExplicitBucketHistogram {
608 boundaries: invalid_neg_inf_boundaries,
609 record_min_max: true,
610 });
611
612 let result = builder.build();
613 assert!(
614 result.is_err(),
615 "Expected error for negative Infinity in bucket boundaries"
616 );
617 assert_eq!(
618 result.err().unwrap().to_string(),
619 "Bucket boundaries must not contain NaN, Infinity, or -Infinity",
620 "Expected correct validation error for negative Infinity"
621 );
622
623 let unsorted_boundaries = vec![1.0, 5.0, 2.0, 10.0]; let builder = StreamBuilder::new()
627 .with_name("unsorted_histogram")
628 .with_aggregation(Aggregation::ExplicitBucketHistogram {
629 boundaries: unsorted_boundaries,
630 record_min_max: true,
631 });
632
633 let result = builder.build();
634 assert!(
635 result.is_err(),
636 "Expected error for unsorted bucket boundaries"
637 );
638 assert_eq!(
639 result.err().unwrap().to_string(),
640 "Bucket boundaries must be sorted and not contain any duplicates",
641 "Expected correct validation error for unsorted boundaries"
642 );
643
644 let duplicate_boundaries = vec![1.0, 2.0, 5.0, 5.0, 10.0]; let builder = StreamBuilder::new()
648 .with_name("duplicate_histogram")
649 .with_aggregation(Aggregation::ExplicitBucketHistogram {
650 boundaries: duplicate_boundaries,
651 record_min_max: true,
652 });
653
654 let result = builder.build();
655 assert!(
656 result.is_err(),
657 "Expected error for duplicate bucket boundaries"
658 );
659 assert_eq!(
660 result.err().unwrap().to_string(),
661 "Bucket boundaries must be sorted and not contain any duplicates",
662 "Expected correct validation error for duplicate boundaries"
663 );
664 }
665}