opentelemetry_sdk/metrics/
instrument.rs

1use std::{borrow::Cow, collections::HashSet, error::Error, sync::Arc};
2
3use opentelemetry::{
4    metrics::{AsyncInstrument, SyncInstrument},
5    InstrumentationScope, Key, KeyValue,
6};
7
8use crate::metrics::{aggregation::Aggregation, internal::Measure};
9
10use super::meter::{
11    INSTRUMENT_NAME_EMPTY, INSTRUMENT_NAME_FIRST_ALPHABETIC, INSTRUMENT_NAME_INVALID_CHAR,
12    INSTRUMENT_NAME_LENGTH, INSTRUMENT_UNIT_INVALID_CHAR, INSTRUMENT_UNIT_LENGTH,
13};
14
15use super::Temporality;
16
17/// The identifier of a group of instruments that all perform the same function.
18#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
19pub enum InstrumentKind {
20    /// Identifies a group of instruments that record increasing values synchronously
21    /// with the code path they are measuring.
22    Counter,
23    /// A group of instruments that record increasing and decreasing values
24    /// synchronously with the code path they are measuring.
25    UpDownCounter,
26    /// A group of instruments that record a distribution of values synchronously with
27    /// the code path they are measuring.
28    Histogram,
29    /// A group of instruments that record increasing values in an asynchronous
30    /// callback.
31    ObservableCounter,
32    /// A group of instruments that record increasing and decreasing values in an
33    /// asynchronous callback.
34    ObservableUpDownCounter,
35
36    /// a group of instruments that record current value synchronously with
37    /// the code path they are measuring.
38    Gauge,
39    ///
40    /// a group of instruments that record current values in an asynchronous callback.
41    ObservableGauge,
42}
43
44impl InstrumentKind {
45    /// Select the [Temporality] preference based on [InstrumentKind]
46    ///
47    /// [exporter-docs]: https://github.com/open-telemetry/opentelemetry-specification/blob/a1c13d59bb7d0fb086df2b3e1eaec9df9efef6cc/specification/metrics/sdk_exporters/otlp.md#additional-configuration
48    pub(crate) fn temporality_preference(&self, temporality: Temporality) -> Temporality {
49        match temporality {
50            Temporality::Cumulative => Temporality::Cumulative,
51            Temporality::Delta => match self {
52                Self::Counter
53                | Self::Histogram
54                | Self::ObservableCounter
55                | Self::Gauge
56                | Self::ObservableGauge => Temporality::Delta,
57                Self::UpDownCounter | InstrumentKind::ObservableUpDownCounter => {
58                    Temporality::Cumulative
59                }
60            },
61            Temporality::LowMemory => match self {
62                Self::Counter | InstrumentKind::Histogram => Temporality::Delta,
63                Self::ObservableCounter
64                | Self::Gauge
65                | Self::ObservableGauge
66                | Self::UpDownCounter
67                | Self::ObservableUpDownCounter => Temporality::Cumulative,
68            },
69        }
70    }
71}
72/// Describes the properties of an instrument at creation, used for filtering in
73/// views. This is utilized in the `with_view` methods on `MeterProviderBuilder`
74/// to customize metric output.
75///
76/// Users can use a reference to `Instrument` to select which instrument(s) a
77/// [Stream] should be applied to.
78///
79/// # Example
80///
81/// ```rust
82/// use opentelemetry_sdk::metrics::{Instrument, Stream};
83///
84/// let my_view_change_cardinality = |i: &Instrument| {
85///     if i.name() == "my_second_histogram" {
86///         // Note: If Stream is invalid, `build()` will return an error. By
87///         // calling `.ok()`, any such error is ignored and treated as if the
88///         // view does not match the instrument. If this is not the desired
89///         // behavior, consider handling the error explicitly.
90///         Stream::builder().with_cardinality_limit(2).build().ok()
91///     } else {
92///         None
93///     }
94/// };
95/// ```
96#[derive(Clone, Debug, PartialEq)]
97pub struct Instrument {
98    /// The human-readable identifier of the instrument.
99    pub(crate) name: Cow<'static, str>,
100    /// describes the purpose of the instrument.
101    pub(crate) description: Cow<'static, str>,
102    /// The functional group of the instrument.
103    pub(crate) kind: InstrumentKind,
104    /// Unit is the unit of measurement recorded by the instrument.
105    pub(crate) unit: Cow<'static, str>,
106    /// The instrumentation that created the instrument.
107    pub(crate) scope: InstrumentationScope,
108}
109
110impl Instrument {
111    /// Instrument name.
112    pub fn name(&self) -> &str {
113        self.name.as_ref()
114    }
115
116    /// Instrument kind.
117    pub fn kind(&self) -> InstrumentKind {
118        self.kind
119    }
120
121    /// Instrument unit.
122    pub fn unit(&self) -> &str {
123        self.unit.as_ref()
124    }
125
126    /// Instrument scope.
127    pub fn scope(&self) -> &InstrumentationScope {
128        &self.scope
129    }
130}
131
132/// A builder for creating Stream objects.
133///
134/// # Example
135///
136/// ```
137/// use opentelemetry_sdk::metrics::{Aggregation, Stream};
138/// use opentelemetry::Key;
139///
140/// let stream = Stream::builder()
141///     .with_name("my_stream")
142///     .with_aggregation(Aggregation::Sum)
143///     .with_cardinality_limit(100)
144///     .build()
145///     .unwrap();
146/// ```
147#[derive(Default, Debug)]
148pub struct StreamBuilder {
149    name: Option<Cow<'static, str>>,
150    description: Option<Cow<'static, str>>,
151    unit: Option<Cow<'static, str>>,
152    aggregation: Option<Aggregation>,
153    allowed_attribute_keys: Option<Arc<HashSet<Key>>>,
154    cardinality_limit: Option<usize>,
155}
156
157impl StreamBuilder {
158    /// Create a new stream builder with default values.
159    pub(crate) fn new() -> Self {
160        StreamBuilder::default()
161    }
162
163    /// Set the stream name. If this is not set, name provide while creating the instrument will be used.
164    pub fn with_name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
165        self.name = Some(name.into());
166        self
167    }
168
169    /// Set the stream description. If this is not set, description provided while creating the instrument will be used.
170    pub fn with_description(mut self, description: impl Into<Cow<'static, str>>) -> Self {
171        self.description = Some(description.into());
172        self
173    }
174
175    /// Set the stream unit. If this is not set, unit provided while creating the instrument will be used.
176    pub fn with_unit(mut self, unit: impl Into<Cow<'static, str>>) -> Self {
177        self.unit = Some(unit.into());
178        self
179    }
180
181    #[cfg(feature = "spec_unstable_metrics_views")]
182    /// Set the stream aggregation. This is used to customize the aggregation.
183    /// If not set, the default aggregation based on the instrument kind will be used.
184    pub fn with_aggregation(mut self, aggregation: Aggregation) -> Self {
185        self.aggregation = Some(aggregation);
186        self
187    }
188
189    #[cfg(feature = "spec_unstable_metrics_views")]
190    /// Set the stream allowed attribute keys.
191    ///
192    /// Any attribute recorded for the stream with a key not in this set will be
193    /// dropped. If the set is empty, all attributes will be dropped.
194    /// If this method is not used, all attributes will be kept.
195    pub fn with_allowed_attribute_keys(
196        mut self,
197        attribute_keys: impl IntoIterator<Item = Key>,
198    ) -> Self {
199        self.allowed_attribute_keys = Some(Arc::new(attribute_keys.into_iter().collect()));
200        self
201    }
202
203    /// Set the stream cardinality limit. If this is not set, the default limit of 2000 will be used.
204    pub fn with_cardinality_limit(mut self, limit: usize) -> Self {
205        self.cardinality_limit = Some(limit);
206        self
207    }
208
209    /// Build a new Stream instance using the configuration in this builder.
210    ///
211    /// # Returns
212    ///
213    /// A Result containing the new Stream instance or an error if the build failed.
214    pub fn build(self) -> Result<Stream, Box<dyn Error>> {
215        // TODO: Avoid copying the validation logic from meter.rs,
216        // and instead move it to a common place and do it once.
217        // It is a bug that validations are done in meter.rs
218        // as it'll not allow users to fix instrumentation mistakes
219        // using views.
220
221        // Validate name if provided
222        if let Some(name) = &self.name {
223            if name.is_empty() {
224                return Err(INSTRUMENT_NAME_EMPTY.into());
225            }
226
227            if name.len() > super::meter::INSTRUMENT_NAME_MAX_LENGTH {
228                return Err(INSTRUMENT_NAME_LENGTH.into());
229            }
230
231            if name.starts_with(|c: char| !c.is_ascii_alphabetic()) {
232                return Err(INSTRUMENT_NAME_FIRST_ALPHABETIC.into());
233            }
234
235            if name.contains(|c: char| {
236                !c.is_ascii_alphanumeric()
237                    && !super::meter::INSTRUMENT_NAME_ALLOWED_NON_ALPHANUMERIC_CHARS.contains(&c)
238            }) {
239                return Err(INSTRUMENT_NAME_INVALID_CHAR.into());
240            }
241        }
242
243        // Validate unit if provided
244        if let Some(unit) = &self.unit {
245            if unit.len() > super::meter::INSTRUMENT_UNIT_NAME_MAX_LENGTH {
246                return Err(INSTRUMENT_UNIT_LENGTH.into());
247            }
248
249            if unit.contains(|c: char| !c.is_ascii()) {
250                return Err(INSTRUMENT_UNIT_INVALID_CHAR.into());
251            }
252        }
253
254        // Validate cardinality limit
255        if let Some(limit) = self.cardinality_limit {
256            if limit == 0 {
257                return Err("Cardinality limit must be greater than 0".into());
258            }
259        }
260
261        // Validate bucket boundaries if using ExplicitBucketHistogram
262        if let Some(Aggregation::ExplicitBucketHistogram { boundaries, .. }) = &self.aggregation {
263            validate_bucket_boundaries(boundaries)?;
264        }
265
266        Ok(Stream {
267            name: self.name,
268            description: self.description,
269            unit: self.unit,
270            aggregation: self.aggregation,
271            allowed_attribute_keys: self.allowed_attribute_keys,
272            cardinality_limit: self.cardinality_limit,
273        })
274    }
275}
276
277fn validate_bucket_boundaries(boundaries: &[f64]) -> Result<(), String> {
278    // Validate boundaries do not contain f64::NAN, f64::INFINITY, or f64::NEG_INFINITY
279    for boundary in boundaries {
280        if boundary.is_nan() || boundary.is_infinite() {
281            return Err(
282                "Bucket boundaries must not contain NaN, Infinity, or -Infinity".to_string(),
283            );
284        }
285    }
286
287    // validate that buckets are sorted and non-duplicate
288    for i in 1..boundaries.len() {
289        if boundaries[i] <= boundaries[i - 1] {
290            return Err(
291                "Bucket boundaries must be sorted and not contain any duplicates".to_string(),
292            );
293        }
294    }
295
296    Ok(())
297}
298
299/// Describes the stream of data an instrument produces. Used in `with_view`
300/// methods on `MeterProviderBuilder` to customize the metric output.
301#[derive(Default, Debug)]
302pub struct Stream {
303    /// The human-readable identifier of the stream.
304    pub(crate) name: Option<Cow<'static, str>>,
305    /// Describes the purpose of the data.
306    pub(crate) description: Option<Cow<'static, str>>,
307    /// the unit of measurement recorded.
308    pub(crate) unit: Option<Cow<'static, str>>,
309    /// Aggregation the stream uses for an instrument.
310    pub(crate) aggregation: Option<Aggregation>,
311    /// An allow-list of attribute keys that will be preserved for the stream.
312    ///
313    /// Any attribute recorded for the stream with a key not in this set will be
314    /// dropped. If the set is empty, all attributes will be dropped, if `None` all
315    /// attributes will be kept.
316    pub(crate) allowed_attribute_keys: Option<Arc<HashSet<Key>>>,
317
318    /// Cardinality limit for the stream.
319    pub(crate) cardinality_limit: Option<usize>,
320}
321
322impl Stream {
323    /// Create a new stream builder with default values.
324    pub fn builder() -> StreamBuilder {
325        StreamBuilder::new()
326    }
327}
328
329/// The identifying properties of an instrument.
330#[derive(Debug, PartialEq, Eq, Hash)]
331pub(crate) struct InstrumentId {
332    /// The human-readable identifier of the instrument.
333    pub(crate) name: Cow<'static, str>,
334    /// Describes the purpose of the data.
335    pub(crate) description: Cow<'static, str>,
336    /// Defines the functional group of the instrument.
337    pub(crate) kind: InstrumentKind,
338    /// The unit of measurement recorded.
339    pub(crate) unit: Cow<'static, str>,
340    /// Number is the underlying data type of the instrument.
341    pub(crate) number: Cow<'static, str>,
342}
343
344impl InstrumentId {
345    /// Instrument names are considered case-insensitive ASCII.
346    ///
347    /// Standardize the instrument name to always be lowercase so it can be compared
348    /// via hash.
349    ///
350    /// See [naming syntax] for full requirements.
351    ///
352    /// [naming syntax]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.21.0/specification/metrics/api.md#instrument-name-syntax
353    pub(crate) fn normalize(&mut self) {
354        if self.name.chars().any(|c| c.is_ascii_uppercase()) {
355            self.name = self.name.to_ascii_lowercase().into();
356        }
357    }
358}
359
360pub(crate) struct ResolvedMeasures<T> {
361    pub(crate) measures: Vec<Arc<dyn Measure<T>>>,
362}
363
364impl<T: Copy + 'static> SyncInstrument<T> for ResolvedMeasures<T> {
365    fn measure(&self, val: T, attrs: &[KeyValue]) {
366        for measure in &self.measures {
367            measure.call(val, attrs)
368        }
369    }
370}
371
372#[derive(Clone)]
373pub(crate) struct Observable<T> {
374    measures: Vec<Arc<dyn Measure<T>>>,
375}
376
377impl<T> Observable<T> {
378    pub(crate) fn new(measures: Vec<Arc<dyn Measure<T>>>) -> Self {
379        Self { measures }
380    }
381}
382
383impl<T: Copy + Send + Sync + 'static> AsyncInstrument<T> for Observable<T> {
384    fn observe(&self, measurement: T, attrs: &[KeyValue]) {
385        for measure in &self.measures {
386            measure.call(measurement, attrs)
387        }
388    }
389}
390
391#[cfg(test)]
392mod tests {
393    use super::StreamBuilder;
394    use crate::metrics::meter::{
395        INSTRUMENT_NAME_EMPTY, INSTRUMENT_NAME_FIRST_ALPHABETIC, INSTRUMENT_NAME_INVALID_CHAR,
396        INSTRUMENT_NAME_LENGTH, INSTRUMENT_UNIT_INVALID_CHAR, INSTRUMENT_UNIT_LENGTH,
397    };
398
399    #[test]
400    fn stream_name_validation() {
401        // (name, expected error)
402        let stream_name_test_cases = vec![
403            ("validateName", ""),
404            ("_startWithNoneAlphabet", INSTRUMENT_NAME_FIRST_ALPHABETIC),
405            ("utf8char锈", INSTRUMENT_NAME_INVALID_CHAR),
406            ("a".repeat(255).leak(), ""),
407            ("a".repeat(256).leak(), INSTRUMENT_NAME_LENGTH),
408            ("invalid name", INSTRUMENT_NAME_INVALID_CHAR),
409            ("allow/slash", ""),
410            ("allow_under_score", ""),
411            ("allow.dots.ok", ""),
412            ("", INSTRUMENT_NAME_EMPTY),
413            ("\\allow\\slash /sec", INSTRUMENT_NAME_FIRST_ALPHABETIC),
414            ("\\allow\\$$slash /sec", INSTRUMENT_NAME_FIRST_ALPHABETIC),
415            ("Total $ Count", INSTRUMENT_NAME_INVALID_CHAR),
416            (
417                "\\test\\UsagePercent(Total) > 80%",
418                INSTRUMENT_NAME_FIRST_ALPHABETIC,
419            ),
420            ("/not / allowed", INSTRUMENT_NAME_FIRST_ALPHABETIC),
421        ];
422
423        for (name, expected_error) in stream_name_test_cases {
424            let builder = StreamBuilder::new().with_name(name);
425            let result = builder.build();
426
427            if expected_error.is_empty() {
428                assert!(
429                    result.is_ok(),
430                    "Expected successful build for name '{}', but got error: {:?}",
431                    name,
432                    result.err()
433                );
434            } else {
435                let err = result.err().unwrap();
436                let err_str = err.to_string();
437                assert!(
438                    err_str == expected_error,
439                    "For name '{}', expected error '{}', but got '{}'",
440                    name,
441                    expected_error,
442                    err_str
443                );
444            }
445        }
446    }
447
448    #[test]
449    fn stream_unit_validation() {
450        // (unit, expected error)
451        let stream_unit_test_cases = vec![
452            (
453                "0123456789012345678901234567890123456789012345678901234567890123",
454                INSTRUMENT_UNIT_LENGTH,
455            ),
456            ("utf8char锈", INSTRUMENT_UNIT_INVALID_CHAR),
457            ("kb", ""),
458            ("Kb/sec", ""),
459            ("%", ""),
460            ("", ""),
461        ];
462
463        for (unit, expected_error) in stream_unit_test_cases {
464            // Use a valid name to isolate unit validation
465            let builder = StreamBuilder::new().with_name("valid_name").with_unit(unit);
466
467            let result = builder.build();
468
469            if expected_error.is_empty() {
470                assert!(
471                    result.is_ok(),
472                    "Expected successful build for unit '{}', but got error: {:?}",
473                    unit,
474                    result.err()
475                );
476            } else {
477                let err = result.err().unwrap();
478                let err_str = err.to_string();
479                assert!(
480                    err_str == expected_error,
481                    "For unit '{}', expected error '{}', but got '{}'",
482                    unit,
483                    expected_error,
484                    err_str
485                );
486            }
487        }
488    }
489
490    #[test]
491    fn stream_cardinality_limit_validation() {
492        // Test zero cardinality limit (invalid)
493        let builder = StreamBuilder::new()
494            .with_name("valid_name")
495            .with_cardinality_limit(0);
496
497        let result = builder.build();
498        assert!(result.is_err(), "Expected error for zero cardinality limit");
499        assert_eq!(
500            result.err().unwrap().to_string(),
501            "Cardinality limit must be greater than 0",
502            "Expected cardinality limit validation error message"
503        );
504
505        // Test valid cardinality limits
506        let valid_limits = vec![1, 10, 100, 1000];
507        for limit in valid_limits {
508            let builder = StreamBuilder::new()
509                .with_name("valid_name")
510                .with_cardinality_limit(limit);
511
512            let result = builder.build();
513            assert!(
514                result.is_ok(),
515                "Expected successful build for cardinality limit {}, but got error: {:?}",
516                limit,
517                result.err()
518            );
519        }
520    }
521
522    #[test]
523    fn stream_valid_build() {
524        // Test with valid configuration
525        let stream = StreamBuilder::new()
526            .with_name("valid_name")
527            .with_description("Valid description")
528            .with_unit("ms")
529            .with_cardinality_limit(100)
530            .build();
531
532        assert!(
533            stream.is_ok(),
534            "Expected valid Stream to be built successfully"
535        );
536    }
537
538    #[cfg(feature = "spec_unstable_metrics_views")]
539    #[test]
540    fn stream_histogram_bucket_validation() {
541        use super::Aggregation;
542
543        // Test with valid bucket boundaries
544        let valid_boundaries = vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0];
545        let builder = StreamBuilder::new()
546            .with_name("valid_histogram")
547            .with_aggregation(Aggregation::ExplicitBucketHistogram {
548                boundaries: valid_boundaries.clone(),
549                record_min_max: true,
550            });
551
552        let result = builder.build();
553        assert!(
554            result.is_ok(),
555            "Expected successful build with valid bucket boundaries"
556        );
557
558        // Test with invalid bucket boundaries (NaN and Infinity)
559
560        // Test with NaN
561        let invalid_nan_boundaries = vec![1.0, 2.0, f64::NAN, 10.0];
562
563        let builder = StreamBuilder::new()
564            .with_name("invalid_histogram_nan")
565            .with_aggregation(Aggregation::ExplicitBucketHistogram {
566                boundaries: invalid_nan_boundaries,
567                record_min_max: true,
568            });
569
570        let result = builder.build();
571        assert!(
572            result.is_err(),
573            "Expected error for NaN in bucket boundaries"
574        );
575        assert_eq!(
576            result.err().unwrap().to_string(),
577            "Bucket boundaries must not contain NaN, Infinity, or -Infinity",
578            "Expected correct validation error for NaN"
579        );
580
581        // Test with infinity
582        let invalid_inf_boundaries = vec![1.0, 5.0, f64::INFINITY, 100.0];
583
584        let builder = StreamBuilder::new()
585            .with_name("invalid_histogram_inf")
586            .with_aggregation(Aggregation::ExplicitBucketHistogram {
587                boundaries: invalid_inf_boundaries,
588                record_min_max: true,
589            });
590
591        let result = builder.build();
592        assert!(
593            result.is_err(),
594            "Expected error for Infinity in bucket boundaries"
595        );
596        assert_eq!(
597            result.err().unwrap().to_string(),
598            "Bucket boundaries must not contain NaN, Infinity, or -Infinity",
599            "Expected correct validation error for Infinity"
600        );
601
602        // Test with negative infinity
603        let invalid_neg_inf_boundaries = vec![f64::NEG_INFINITY, 5.0, 10.0, 100.0];
604
605        let builder = StreamBuilder::new()
606            .with_name("invalid_histogram_neg_inf")
607            .with_aggregation(Aggregation::ExplicitBucketHistogram {
608                boundaries: invalid_neg_inf_boundaries,
609                record_min_max: true,
610            });
611
612        let result = builder.build();
613        assert!(
614            result.is_err(),
615            "Expected error for negative Infinity in bucket boundaries"
616        );
617        assert_eq!(
618            result.err().unwrap().to_string(),
619            "Bucket boundaries must not contain NaN, Infinity, or -Infinity",
620            "Expected correct validation error for negative Infinity"
621        );
622
623        // Test with unsorted bucket boundaries
624        let unsorted_boundaries = vec![1.0, 5.0, 2.0, 10.0]; // 2.0 comes after 5.0, which is incorrect
625
626        let builder = StreamBuilder::new()
627            .with_name("unsorted_histogram")
628            .with_aggregation(Aggregation::ExplicitBucketHistogram {
629                boundaries: unsorted_boundaries,
630                record_min_max: true,
631            });
632
633        let result = builder.build();
634        assert!(
635            result.is_err(),
636            "Expected error for unsorted bucket boundaries"
637        );
638        assert_eq!(
639            result.err().unwrap().to_string(),
640            "Bucket boundaries must be sorted and not contain any duplicates",
641            "Expected correct validation error for unsorted boundaries"
642        );
643
644        // Test with duplicate bucket boundaries
645        let duplicate_boundaries = vec![1.0, 2.0, 5.0, 5.0, 10.0]; // 5.0 appears twice
646
647        let builder = StreamBuilder::new()
648            .with_name("duplicate_histogram")
649            .with_aggregation(Aggregation::ExplicitBucketHistogram {
650                boundaries: duplicate_boundaries,
651                record_min_max: true,
652            });
653
654        let result = builder.build();
655        assert!(
656            result.is_err(),
657            "Expected error for duplicate bucket boundaries"
658        );
659        assert_eq!(
660            result.err().unwrap().to_string(),
661            "Bucket boundaries must be sorted and not contain any duplicates",
662            "Expected correct validation error for duplicate boundaries"
663        );
664    }
665}